Enhance concurrency control in CrewAgentExecutorFlow by introducing a threading lock to prevent concurrent executions. This change ensures that the executor instance cannot be invoked while already running, improving stability and reliability during flow execution.

This commit is contained in:
lorenzejay
2025-12-27 12:04:59 -08:00
parent 2e938233be
commit 3dfe63f556

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from collections.abc import Callable
import threading
from typing import TYPE_CHECKING, Any, Literal, cast
from uuid import uuid4
@@ -156,6 +157,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self._last_context_error: Exception | None = None
# Execution guard to prevent concurrent/duplicate executions
self._execution_lock = threading.Lock()
self._is_executing: bool = False
self._has_been_invoked: bool = False
self._flow_initialized: bool = False
@@ -216,7 +218,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@property
def messages(self) -> list[LLMMessage]:
"""Compatibility property for mixin - returns state messages."""
return list(self._state.messages)
return self._state.messages
@property
def iterations(self) -> int:
@@ -381,13 +383,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def increment_and_continue(self) -> str:
"""Increment iteration counter and loop back for next iteration."""
self.state.iterations += 1
inc_text = Text()
inc_text.append("+ increment_and_continue: ", style="magenta bold")
inc_text.append(
f"Incremented to iteration {self.state.iterations}, looping back",
style="magenta",
)
self._console.print(inc_text)
return "initialized"
@listen(or_("agent_finished", "tool_result_is_final"))
@@ -463,14 +458,14 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
"""
self._ensure_flow_initialized()
if self._is_executing:
raise RuntimeError(
"Executor is already running. "
"Cannot invoke the same executor instance concurrently."
)
self._is_executing = True
self._has_been_invoked = True
with self._execution_lock:
if self._is_executing:
raise RuntimeError(
"Executor is already running. "
"Cannot invoke the same executor instance concurrently."
)
self._is_executing = True
self._has_been_invoked = True
try:
# Reset state for fresh execution
@@ -480,18 +475,15 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.is_finished = False
if "system" in self.prompt:
system_prompt = self._format_prompt(
cast(str, self.prompt.get("system", "")), inputs
)
user_prompt = self._format_prompt(
cast(str, self.prompt.get("user", "")), inputs
)
prompt = cast("SystemPromptResult", self.prompt)
system_prompt = self._format_prompt(prompt["system"], inputs)
user_prompt = self._format_prompt(prompt["user"], inputs)
self.state.messages.append(
format_message_for_llm(system_prompt, role="system")
)
self.state.messages.append(format_message_for_llm(user_prompt))
else:
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
self.state.messages.append(format_message_for_llm(user_prompt))
self.state.ask_for_human_input = bool(