From 3dfe63f5563ab8d7bb7f87fd7ab2dd57b10733d2 Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Sat, 27 Dec 2025 12:04:59 -0800 Subject: [PATCH] Enhance concurrency control in CrewAgentExecutorFlow by introducing a threading lock to prevent concurrent executions. This change ensures that the executor instance cannot be invoked while already running, improving stability and reliability during flow execution. --- .../experimental/crew_agent_executor_flow.py | 38 ++++++++----------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/lib/crewai/src/crewai/experimental/crew_agent_executor_flow.py b/lib/crewai/src/crewai/experimental/crew_agent_executor_flow.py index 973174a63..70679e815 100644 --- a/lib/crewai/src/crewai/experimental/crew_agent_executor_flow.py +++ b/lib/crewai/src/crewai/experimental/crew_agent_executor_flow.py @@ -1,6 +1,7 @@ from __future__ import annotations from collections.abc import Callable +import threading from typing import TYPE_CHECKING, Any, Literal, cast from uuid import uuid4 @@ -156,6 +157,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): self._last_context_error: Exception | None = None # Execution guard to prevent concurrent/duplicate executions + self._execution_lock = threading.Lock() self._is_executing: bool = False self._has_been_invoked: bool = False self._flow_initialized: bool = False @@ -216,7 +218,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): @property def messages(self) -> list[LLMMessage]: """Compatibility property for mixin - returns state messages.""" - return list(self._state.messages) + return self._state.messages @property def iterations(self) -> int: @@ -381,13 +383,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): def increment_and_continue(self) -> str: """Increment iteration counter and loop back for next iteration.""" self.state.iterations += 1 - inc_text = Text() - inc_text.append("+ increment_and_continue: ", style="magenta bold") - inc_text.append( - f"Incremented to iteration {self.state.iterations}, looping back", - style="magenta", - ) - self._console.print(inc_text) return "initialized" @listen(or_("agent_finished", "tool_result_is_final")) @@ -463,14 +458,14 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): """ self._ensure_flow_initialized() - if self._is_executing: - raise RuntimeError( - "Executor is already running. " - "Cannot invoke the same executor instance concurrently." - ) - - self._is_executing = True - self._has_been_invoked = True + with self._execution_lock: + if self._is_executing: + raise RuntimeError( + "Executor is already running. " + "Cannot invoke the same executor instance concurrently." + ) + self._is_executing = True + self._has_been_invoked = True try: # Reset state for fresh execution @@ -480,18 +475,15 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): self.state.is_finished = False if "system" in self.prompt: - system_prompt = self._format_prompt( - cast(str, self.prompt.get("system", "")), inputs - ) - user_prompt = self._format_prompt( - cast(str, self.prompt.get("user", "")), inputs - ) + prompt = cast("SystemPromptResult", self.prompt) + system_prompt = self._format_prompt(prompt["system"], inputs) + user_prompt = self._format_prompt(prompt["user"], inputs) self.state.messages.append( format_message_for_llm(system_prompt, role="system") ) self.state.messages.append(format_message_for_llm(user_prompt)) else: - user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs) + user_prompt = self._format_prompt(self.prompt["prompt"], inputs) self.state.messages.append(format_message_for_llm(user_prompt)) self.state.ask_for_human_input = bool(