From 5a589c8e4e75e637846709ce73aad74dd7fa1cac Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Tue, 25 Nov 2025 12:43:08 -0800 Subject: [PATCH] refactor: clean up comments and improve code clarity in agent executor flow - Removed outdated comments and unnecessary explanations in and classes to enhance code readability. - Simplified parameter updates in the agent executor to avoid confusion regarding executor recreation. - Improved clarity in the method to ensure proper handling of non-final answers without raising errors. --- lib/crewai/src/crewai/agent/core.py | 9 +--- .../crewai/agents/agent_builder/base_agent.py | 4 +- .../crewai/agents/crew_agent_executor_flow.py | 39 +------------- .../agents/test_crew_agent_executor_flow.py | 52 +++++++++++++------ 4 files changed, 42 insertions(+), 62 deletions(-) diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 3299620cd..f8bc85d46 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -636,9 +636,7 @@ class Agent(BaseAgent): self._rpm_controller.check_or_wait if self._rpm_controller else None ) - # Update existing executor or create new one (avoid recreation overhead) if self.agent_executor is not None: - # Update task-varying parameters on existing executor self._update_executor_parameters( task=task, tools=parsed_tools, @@ -650,7 +648,7 @@ class Agent(BaseAgent): else: self.agent_executor = CrewAgentExecutorFlow( llm=self.llm, - task=task, # type: ignore[arg-type] + task=task, agent=self, crew=self.crew, tools=parsed_tools, @@ -688,8 +686,7 @@ class Agent(BaseAgent): stop_words: Stop words list. rpm_limit_fn: RPM limit callback function. """ - # Update task-specific parameters - self.agent_executor.task = task # type: ignore[arg-type] + self.agent_executor.task = task self.agent_executor.tools = tools self.agent_executor.original_tools = raw_tools self.agent_executor.prompt = prompt @@ -698,11 +695,9 @@ class Agent(BaseAgent): self.agent_executor.tools_description = render_text_description_and_args(tools) self.agent_executor.response_model = task.response_model if task else None - # Update potentially-changed dependencies self.agent_executor.tools_handler = self.tools_handler self.agent_executor.request_within_rpm_limit = rpm_limit_fn - # Update LLM stop words if self.agent_executor.llm: existing_stop = getattr(self.agent_executor.llm, "stop", []) self.agent_executor.llm.stop = list( diff --git a/lib/crewai/src/crewai/agents/agent_builder/base_agent.py b/lib/crewai/src/crewai/agents/agent_builder/base_agent.py index f11b02c3f..dac82012b 100644 --- a/lib/crewai/src/crewai/agents/agent_builder/base_agent.py +++ b/lib/crewai/src/crewai/agents/agent_builder/base_agent.py @@ -448,7 +448,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): if self.cache: self.cache_handler = cache_handler self.tools_handler.cache = cache_handler - # self.create_agent_executor() + # TODO: we should do if agent_executor, then we update as we were re-creating the agent_executor which is not ideal def set_rpm_controller(self, rpm_controller: RPMController) -> None: """Set the rpm controller for the agent. @@ -458,7 +458,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): """ if not self._rpm_controller: self._rpm_controller = rpm_controller - # self.create_agent_executor() + # TODO: we should do if agent_executor, then we update as we were re-creating the agent_executor which is not ideal def set_knowledge(self, crew_embedder: EmbedderConfig | None = None) -> None: pass diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor_flow.py b/lib/crewai/src/crewai/agents/crew_agent_executor_flow.py index 587a4b2b1..f721e6fb1 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor_flow.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor_flow.py @@ -1,9 +1,3 @@ -"""Flow-based agent executor for crew AI agents. - -Implements the ReAct pattern using Flow's event-driven architecture -as an alternative to the imperative while-loop pattern. -""" - from __future__ import annotations from collections.abc import Callable @@ -133,9 +127,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): callbacks: Optional callbacks list. response_model: Optional Pydantic model for structured outputs. """ - # Store all parameters as instance variables BEFORE calling super().__init__() - # This is required because Flow.__init__ calls getattr() which may trigger - # @property decorators that reference these attributes self._i18n: I18N = get_i18n() self.llm = llm self.task = task @@ -165,18 +156,15 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): # Execution guard to prevent concurrent/duplicate executions self._is_executing: bool = False self._has_been_invoked: bool = False - self._flow_initialized: bool = False # Track if Flow.__init__ was called + self._flow_initialized: bool = False - # Debug: Track instance creation self._instance_id = str(uuid4())[:8] - # Initialize hooks self.before_llm_call_hooks: list[Callable] = [] self.after_llm_call_hooks: list[Callable] = [] self.before_llm_call_hooks.extend(get_before_llm_call_hooks()) self.after_llm_call_hooks.extend(get_after_llm_call_hooks()) - # Configure LLM stop words if self.llm: existing_stop = getattr(self.llm, "stop", []) self.llm.stop = list( @@ -187,7 +175,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): ) ) - # Create a temporary minimal state for property access before Flow init self._state = AgentReActState() def _ensure_flow_initialized(self) -> None: @@ -480,8 +467,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): Flow Event: -> "completed" (END) """ - # Guard: Only finalize if we actually have a valid final answer - # This prevents finalization during initialization or intermediate states if self.state.current_answer is None: self._printer.print( content="⚠️ Finalize called but no answer in state - skipping", @@ -489,10 +474,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): ) return "skipped" - # Validate we have an AgentFinish (lines 307-311) if not isinstance(self.state.current_answer, AgentFinish): - # This can happen if Flow is triggered during initialization - # Don't raise error, just log and skip self._printer.print( content=f"⚠️ Finalize called with {type(self.state.current_answer).__name__} instead of AgentFinish - skipping", color="yellow", @@ -501,7 +483,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): self.state.is_finished = True - # Show logs (line 312) self._show_logs(self.state.current_answer) return "completed" @@ -523,14 +504,11 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): printer=self._printer, ) - # If error handler returns an answer, use it if formatted_answer: self.state.current_answer = formatted_answer - # Increment iterations (finally block) self.state.iterations += 1 - # Loop back via initialized event return "initialized" @listen("context_error") @@ -551,10 +529,8 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): i18n=self._i18n, ) - # Increment iterations (finally block) self.state.iterations += 1 - # Loop back via initialized event return "initialized" def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]: @@ -572,8 +548,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): Returns: Dictionary with agent output. """ - # Guard: Prevent concurrent executions - # Ensure Flow is initialized before execution self._ensure_flow_initialized() if self._is_executing: @@ -585,7 +559,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): self._is_executing = True self._has_been_invoked = True - # Debug: Track invoke calls self._printer.print( content=f"🚀 FlowExecutor.invoke() called on instance: {self._instance_id}", color="green", @@ -600,7 +573,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): self.state.current_answer = None self.state.is_finished = False - # Format and initialize messages (lines 170-181) if "system" in self.prompt: system_prompt = self._format_prompt( cast(str, self.prompt.get("system", "")), inputs @@ -616,15 +588,12 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs) self.state.messages.append(format_message_for_llm(user_prompt)) - # Set human input flag (line 185) self.state.ask_for_human_input = bool( inputs.get("ask_for_human_input", False) ) - # Run the flow (replaces _invoke_loop call at line 188) self.kickoff() - # Extract final answer from state formatted_answer = self.state.current_answer if not isinstance(formatted_answer, AgentFinish): @@ -632,11 +601,9 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): "Agent execution ended without reaching a final answer." ) - # Handle human feedback if needed (lines 199-200) if self.state.ask_for_human_input: formatted_answer = self._handle_human_feedback(formatted_answer) - # Create memories (lines 202-204) self._create_short_term_memory(formatted_answer) self._create_long_term_memory(formatted_answer) self._create_external_memory(formatted_answer) @@ -653,7 +620,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): handle_unknown_error(self._printer, e) raise finally: - # Always reset execution flag self._is_executing = False def _handle_agent_action( @@ -670,7 +636,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): Returns: Updated action or final answer. """ - # Special case for add_image_tool add_image_tool = self._i18n.tools("add_image") if ( isinstance(add_image_tool, dict) @@ -685,7 +650,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): return handle_agent_action_core( formatted_answer=formatted_answer, tool_result=tool_result, - messages=list(self.state.messages), # Pass copy + messages=list(self.state.messages), step_callback=self.step_callback, show_logs=self._show_logs, ) diff --git a/lib/crewai/tests/agents/test_crew_agent_executor_flow.py b/lib/crewai/tests/agents/test_crew_agent_executor_flow.py index a975212ac..e7bb9770d 100644 --- a/lib/crewai/tests/agents/test_crew_agent_executor_flow.py +++ b/lib/crewai/tests/agents/test_crew_agent_executor_flow.py @@ -14,7 +14,6 @@ from crewai.agents.crew_agent_executor_flow import ( ) from crewai.agents.parser import AgentAction, AgentFinish - class TestAgentReActState: """Test AgentReActState Pydantic model.""" @@ -168,14 +167,17 @@ class TestCrewAgentExecutorFlow: mock_show_logs.assert_called_once() def test_finalize_failure(self, mock_dependencies): - """Test finalize raises error without AgentFinish.""" + """Test finalize skips when given AgentAction instead of AgentFinish.""" executor = CrewAgentExecutorFlow(**mock_dependencies) executor.state.current_answer = AgentAction( thought="thinking", tool="search", tool_input="query", text="action text" ) - with pytest.raises(RuntimeError, match="without reaching a final answer"): - executor.finalize() + result = executor.finalize() + + # Should return "skipped" and not set is_finished + assert result == "skipped" + assert executor.state.is_finished is False def test_format_prompt(self, mock_dependencies): """Test prompt formatting.""" @@ -409,9 +411,14 @@ class TestFlowInvoke: ): """Test successful invoke without human feedback.""" executor = CrewAgentExecutorFlow(**mock_dependencies) - executor.state.current_answer = AgentFinish( - thought="final thinking", output="Final result", text="complete" - ) + + # Mock kickoff to set the final answer in state + def mock_kickoff_side_effect(): + executor.state.current_answer = AgentFinish( + thought="final thinking", output="Final result", text="complete" + ) + + mock_kickoff.side_effect = mock_kickoff_side_effect inputs = {"input": "test", "tool_names": "", "tools": ""} result = executor.invoke(inputs) @@ -436,24 +443,37 @@ class TestFlowInvoke: executor.invoke(inputs) @patch.object(CrewAgentExecutorFlow, "kickoff") - def test_invoke_with_system_prompt(self, mock_kickoff, mock_dependencies): + @patch.object(CrewAgentExecutorFlow, "_create_short_term_memory") + @patch.object(CrewAgentExecutorFlow, "_create_long_term_memory") + @patch.object(CrewAgentExecutorFlow, "_create_external_memory") + def test_invoke_with_system_prompt( + self, + mock_external_memory, + mock_long_term_memory, + mock_short_term_memory, + mock_kickoff, + mock_dependencies, + ): """Test invoke with system prompt configuration.""" mock_dependencies["prompt"] = { "system": "System: {input}", "user": "User: {input} {tool_names} {tools}", } executor = CrewAgentExecutorFlow(**mock_dependencies) - executor.state.current_answer = AgentFinish( - thought="final thoughts", output="Done", text="complete" - ) + + def mock_kickoff_side_effect(): + executor.state.current_answer = AgentFinish( + thought="final thoughts", output="Done", text="complete" + ) + + mock_kickoff.side_effect = mock_kickoff_side_effect inputs = {"input": "test", "tool_names": "", "tools": ""} result = executor.invoke(inputs) + mock_short_term_memory.assert_called_once() + mock_long_term_memory.assert_called_once() + mock_external_memory.assert_called_once() + mock_kickoff.assert_called_once() assert result == {"output": "Done"} - # Should have system and user messages assert len(executor.state.messages) >= 2 - - -if __name__ == "__main__": - pytest.main([__file__, "-v"])