refactor: clean up comments and improve code clarity in agent executor flow

- Removed outdated comments and unnecessary explanations in  and  classes to enhance code readability.
- Simplified parameter updates in the agent executor to avoid confusion regarding executor recreation.
- Improved clarity in the  method to ensure proper handling of non-final answers without raising errors.
This commit is contained in:
lorenzejay
2025-11-25 12:43:08 -08:00
parent 5379ae624c
commit 5a589c8e4e
4 changed files with 42 additions and 62 deletions

View File

@@ -636,9 +636,7 @@ class Agent(BaseAgent):
self._rpm_controller.check_or_wait if self._rpm_controller else None
)
# Update existing executor or create new one (avoid recreation overhead)
if self.agent_executor is not None:
# Update task-varying parameters on existing executor
self._update_executor_parameters(
task=task,
tools=parsed_tools,
@@ -650,7 +648,7 @@ class Agent(BaseAgent):
else:
self.agent_executor = CrewAgentExecutorFlow(
llm=self.llm,
task=task, # type: ignore[arg-type]
task=task,
agent=self,
crew=self.crew,
tools=parsed_tools,
@@ -688,8 +686,7 @@ class Agent(BaseAgent):
stop_words: Stop words list.
rpm_limit_fn: RPM limit callback function.
"""
# Update task-specific parameters
self.agent_executor.task = task # type: ignore[arg-type]
self.agent_executor.task = task
self.agent_executor.tools = tools
self.agent_executor.original_tools = raw_tools
self.agent_executor.prompt = prompt
@@ -698,11 +695,9 @@ class Agent(BaseAgent):
self.agent_executor.tools_description = render_text_description_and_args(tools)
self.agent_executor.response_model = task.response_model if task else None
# Update potentially-changed dependencies
self.agent_executor.tools_handler = self.tools_handler
self.agent_executor.request_within_rpm_limit = rpm_limit_fn
# Update LLM stop words
if self.agent_executor.llm:
existing_stop = getattr(self.agent_executor.llm, "stop", [])
self.agent_executor.llm.stop = list(

View File

@@ -448,7 +448,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
if self.cache:
self.cache_handler = cache_handler
self.tools_handler.cache = cache_handler
# self.create_agent_executor()
# TODO: we should do if agent_executor, then we update as we were re-creating the agent_executor which is not ideal
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.
@@ -458,7 +458,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
# self.create_agent_executor()
# TODO: we should do if agent_executor, then we update as we were re-creating the agent_executor which is not ideal
def set_knowledge(self, crew_embedder: EmbedderConfig | None = None) -> None:
pass

View File

@@ -1,9 +1,3 @@
"""Flow-based agent executor for crew AI agents.
Implements the ReAct pattern using Flow's event-driven architecture
as an alternative to the imperative while-loop pattern.
"""
from __future__ import annotations
from collections.abc import Callable
@@ -133,9 +127,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
callbacks: Optional callbacks list.
response_model: Optional Pydantic model for structured outputs.
"""
# Store all parameters as instance variables BEFORE calling super().__init__()
# This is required because Flow.__init__ calls getattr() which may trigger
# @property decorators that reference these attributes
self._i18n: I18N = get_i18n()
self.llm = llm
self.task = task
@@ -165,18 +156,15 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
# Execution guard to prevent concurrent/duplicate executions
self._is_executing: bool = False
self._has_been_invoked: bool = False
self._flow_initialized: bool = False # Track if Flow.__init__ was called
self._flow_initialized: bool = False
# Debug: Track instance creation
self._instance_id = str(uuid4())[:8]
# Initialize hooks
self.before_llm_call_hooks: list[Callable] = []
self.after_llm_call_hooks: list[Callable] = []
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
self.after_llm_call_hooks.extend(get_after_llm_call_hooks())
# Configure LLM stop words
if self.llm:
existing_stop = getattr(self.llm, "stop", [])
self.llm.stop = list(
@@ -187,7 +175,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
)
)
# Create a temporary minimal state for property access before Flow init
self._state = AgentReActState()
def _ensure_flow_initialized(self) -> None:
@@ -480,8 +467,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
Flow Event: -> "completed" (END)
"""
# Guard: Only finalize if we actually have a valid final answer
# This prevents finalization during initialization or intermediate states
if self.state.current_answer is None:
self._printer.print(
content="⚠️ Finalize called but no answer in state - skipping",
@@ -489,10 +474,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
)
return "skipped"
# Validate we have an AgentFinish (lines 307-311)
if not isinstance(self.state.current_answer, AgentFinish):
# This can happen if Flow is triggered during initialization
# Don't raise error, just log and skip
self._printer.print(
content=f"⚠️ Finalize called with {type(self.state.current_answer).__name__} instead of AgentFinish - skipping",
color="yellow",
@@ -501,7 +483,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.is_finished = True
# Show logs (line 312)
self._show_logs(self.state.current_answer)
return "completed"
@@ -523,14 +504,11 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
printer=self._printer,
)
# If error handler returns an answer, use it
if formatted_answer:
self.state.current_answer = formatted_answer
# Increment iterations (finally block)
self.state.iterations += 1
# Loop back via initialized event
return "initialized"
@listen("context_error")
@@ -551,10 +529,8 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
i18n=self._i18n,
)
# Increment iterations (finally block)
self.state.iterations += 1
# Loop back via initialized event
return "initialized"
def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
@@ -572,8 +548,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
Returns:
Dictionary with agent output.
"""
# Guard: Prevent concurrent executions
# Ensure Flow is initialized before execution
self._ensure_flow_initialized()
if self._is_executing:
@@ -585,7 +559,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self._is_executing = True
self._has_been_invoked = True
# Debug: Track invoke calls
self._printer.print(
content=f"🚀 FlowExecutor.invoke() called on instance: {self._instance_id}",
color="green",
@@ -600,7 +573,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.current_answer = None
self.state.is_finished = False
# Format and initialize messages (lines 170-181)
if "system" in self.prompt:
system_prompt = self._format_prompt(
cast(str, self.prompt.get("system", "")), inputs
@@ -616,15 +588,12 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
self.state.messages.append(format_message_for_llm(user_prompt))
# Set human input flag (line 185)
self.state.ask_for_human_input = bool(
inputs.get("ask_for_human_input", False)
)
# Run the flow (replaces _invoke_loop call at line 188)
self.kickoff()
# Extract final answer from state
formatted_answer = self.state.current_answer
if not isinstance(formatted_answer, AgentFinish):
@@ -632,11 +601,9 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
"Agent execution ended without reaching a final answer."
)
# Handle human feedback if needed (lines 199-200)
if self.state.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
# Create memories (lines 202-204)
self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
self._create_external_memory(formatted_answer)
@@ -653,7 +620,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
handle_unknown_error(self._printer, e)
raise
finally:
# Always reset execution flag
self._is_executing = False
def _handle_agent_action(
@@ -670,7 +636,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
Returns:
Updated action or final answer.
"""
# Special case for add_image_tool
add_image_tool = self._i18n.tools("add_image")
if (
isinstance(add_image_tool, dict)
@@ -685,7 +650,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
return handle_agent_action_core(
formatted_answer=formatted_answer,
tool_result=tool_result,
messages=list(self.state.messages), # Pass copy
messages=list(self.state.messages),
step_callback=self.step_callback,
show_logs=self._show_logs,
)

View File

@@ -14,7 +14,6 @@ from crewai.agents.crew_agent_executor_flow import (
)
from crewai.agents.parser import AgentAction, AgentFinish
class TestAgentReActState:
"""Test AgentReActState Pydantic model."""
@@ -168,14 +167,17 @@ class TestCrewAgentExecutorFlow:
mock_show_logs.assert_called_once()
def test_finalize_failure(self, mock_dependencies):
"""Test finalize raises error without AgentFinish."""
"""Test finalize skips when given AgentAction instead of AgentFinish."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="search", tool_input="query", text="action text"
)
with pytest.raises(RuntimeError, match="without reaching a final answer"):
executor.finalize()
result = executor.finalize()
# Should return "skipped" and not set is_finished
assert result == "skipped"
assert executor.state.is_finished is False
def test_format_prompt(self, mock_dependencies):
"""Test prompt formatting."""
@@ -409,9 +411,14 @@ class TestFlowInvoke:
):
"""Test successful invoke without human feedback."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentFinish(
thought="final thinking", output="Final result", text="complete"
)
# Mock kickoff to set the final answer in state
def mock_kickoff_side_effect():
executor.state.current_answer = AgentFinish(
thought="final thinking", output="Final result", text="complete"
)
mock_kickoff.side_effect = mock_kickoff_side_effect
inputs = {"input": "test", "tool_names": "", "tools": ""}
result = executor.invoke(inputs)
@@ -436,24 +443,37 @@ class TestFlowInvoke:
executor.invoke(inputs)
@patch.object(CrewAgentExecutorFlow, "kickoff")
def test_invoke_with_system_prompt(self, mock_kickoff, mock_dependencies):
@patch.object(CrewAgentExecutorFlow, "_create_short_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_long_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_external_memory")
def test_invoke_with_system_prompt(
self,
mock_external_memory,
mock_long_term_memory,
mock_short_term_memory,
mock_kickoff,
mock_dependencies,
):
"""Test invoke with system prompt configuration."""
mock_dependencies["prompt"] = {
"system": "System: {input}",
"user": "User: {input} {tool_names} {tools}",
}
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentFinish(
thought="final thoughts", output="Done", text="complete"
)
def mock_kickoff_side_effect():
executor.state.current_answer = AgentFinish(
thought="final thoughts", output="Done", text="complete"
)
mock_kickoff.side_effect = mock_kickoff_side_effect
inputs = {"input": "test", "tool_names": "", "tools": ""}
result = executor.invoke(inputs)
mock_short_term_memory.assert_called_once()
mock_long_term_memory.assert_called_once()
mock_external_memory.assert_called_once()
mock_kickoff.assert_called_once()
assert result == {"output": "Done"}
# Should have system and user messages
assert len(executor.state.messages) >= 2
if __name__ == "__main__":
pytest.main([__file__, "-v"])