From ab82da02f93cdbc76a0813585b57388baabae7c2 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Thu, 4 Sep 2025 15:32:47 -0400 Subject: [PATCH] refactor: cleanup crew agent executor (#3440) refactor: cleanup crew agent executor & add docs - Remove dead code, unused imports, and obsolete methods - Modernize with updated type hints and static _format_prompt - Add docstrings for clarity --- src/crewai/agents/crew_agent_executor.py | 246 ++++++++++++++--------- 1 file changed, 147 insertions(+), 99 deletions(-) diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index 5ab4a09ea..f4a5cebe3 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -1,4 +1,11 @@ -from typing import Any, Callable, Dict, List, Optional, Union +"""Agent executor for crew AI agents. + +Handles agent execution flow including LLM interactions, tool execution, +and memory management. +""" + +from collections.abc import Callable +from typing import Any from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin @@ -8,8 +15,12 @@ from crewai.agents.parser import ( OutputParserException, ) from crewai.agents.tools_handler import ToolsHandler -from crewai.llm import BaseLLM -from crewai.tools.base_tool import BaseTool +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.logging_events import ( + AgentLogsExecutionEvent, + AgentLogsStartedEvent, +) +from crewai.llms.base_llm import BaseLLM from crewai.tools.structured_tool import CrewStructuredTool from crewai.tools.tool_types import ToolResult from crewai.utilities import I18N, Printer @@ -26,19 +37,17 @@ from crewai.utilities.agent_utils import ( is_context_length_exceeded, process_llm_response, ) -from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE -from crewai.utilities.logger import Logger +from crewai.utilities.constants import TRAINING_DATA_FILE from crewai.utilities.tool_utils import execute_tool_and_check_finality from crewai.utilities.training_handler import CrewTrainingHandler -from crewai.events.types.logging_events import ( - AgentLogsStartedEvent, - AgentLogsExecutionEvent, -) -from crewai.events.event_bus import crewai_event_bus class CrewAgentExecutor(CrewAgentExecutorMixin): - _logger: Logger = Logger() + """Executor for crew agents. + + Manages the execution lifecycle of an agent including prompt formatting, + LLM interactions, tool execution, and feedback handling. + """ def __init__( self, @@ -48,18 +57,39 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): agent: BaseAgent, prompt: dict[str, str], max_iter: int, - tools: List[CrewStructuredTool], + tools: list[CrewStructuredTool], tools_names: str, - stop_words: List[str], + stop_words: list[str], tools_description: str, tools_handler: ToolsHandler, step_callback: Any = None, - original_tools: List[Any] | None = None, + original_tools: list[Any] | None = None, function_calling_llm: Any = None, respect_context_window: bool = False, - request_within_rpm_limit: Optional[Callable[[], bool]] = None, - callbacks: List[Any] | None = None, - ): + request_within_rpm_limit: Callable[[], bool] | None = None, + callbacks: list[Any] | None = None, + ) -> None: + """Initialize executor. + + Args: + llm: Language model instance. + task: Task to execute. + crew: Crew instance. + agent: Agent to execute. + prompt: Prompt templates. + max_iter: Maximum iterations. + tools: Available tools. + tools_names: Tool names string. + stop_words: Stop word list. + tools_description: Tool descriptions. + tools_handler: Tool handler instance. + step_callback: Optional step callback. + original_tools: Original tool list. + function_calling_llm: Optional function calling LLM. + respect_context_window: Respect context limits. + request_within_rpm_limit: RPM limit check function. + callbacks: Optional callbacks list. + """ self._i18n: I18N = I18N() self.llm: BaseLLM = llm self.task = task @@ -81,12 +111,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self.respect_context_window = respect_context_window self.request_within_rpm_limit = request_within_rpm_limit self.ask_for_human_input = False - self.messages: List[Dict[str, str]] = [] + self.messages: list[dict[str, str]] = [] self.iterations = 0 self.log_error_after = 3 - self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = { - tool.name: tool for tool in self.tools - } existing_stop = self.llm.stop or [] self.llm.stop = list( set( @@ -96,7 +123,15 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): ) ) - def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]: + def invoke(self, inputs: dict[str, str]) -> dict[str, Any]: + """Execute the agent with given inputs. + + Args: + inputs: Input dictionary containing prompt variables. + + Returns: + Dictionary with agent output. + """ if "system" in self.prompt: system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs) user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs) @@ -131,9 +166,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): return {"output": formatted_answer.output} def _invoke_loop(self) -> AgentFinish: - """ - Main loop to invoke the agent's thought process until it reaches a conclusion - or the maximum number of iterations is reached. + """Execute agent loop until completion. + + Returns: + Final answer from the agent. """ formatted_answer = None while not isinstance(formatted_answer, AgentFinish): @@ -190,7 +226,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): ) self._invoke_step_callback(formatted_answer) - self._append_message(formatted_answer.text, role="assistant") + self._append_message(formatted_answer.text) except OutputParserException as e: formatted_answer = handle_output_parser_exception( @@ -231,8 +267,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): def _handle_agent_action( self, formatted_answer: AgentAction, tool_result: ToolResult - ) -> Union[AgentAction, AgentFinish]: - """Handle the AgentAction, execute tools, and process the results.""" + ) -> AgentAction | AgentFinish: + """Process agent action and tool execution. + + Args: + formatted_answer: Agent's action to execute. + tool_result: Result from tool execution. + + Returns: + Updated action or final answer. + """ # Special case for add_image_tool add_image_tool = self._i18n.tools("add_image") if ( @@ -251,17 +295,28 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): show_logs=self._show_logs, ) - def _invoke_step_callback(self, formatted_answer) -> None: - """Invoke the step callback if it exists.""" + def _invoke_step_callback( + self, formatted_answer: AgentAction | AgentFinish + ) -> None: + """Invoke step callback. + + Args: + formatted_answer: Current agent response. + """ if self.step_callback: self.step_callback(formatted_answer) def _append_message(self, text: str, role: str = "assistant") -> None: - """Append a message to the message list with the given role.""" + """Add message to conversation history. + + Args: + text: Message content. + role: Message role (default: assistant). + """ self.messages.append(format_message_for_llm(text, role=role)) - def _show_start_logs(self): - """Show logs for the start of agent execution.""" + def _show_start_logs(self) -> None: + """Emit agent start event.""" if self.agent is None: raise ValueError("Agent cannot be None") @@ -277,8 +332,12 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): ), ) - def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]): - """Show logs for the agent's execution.""" + def _show_logs(self, formatted_answer: AgentAction | AgentFinish) -> None: + """Emit agent execution event. + + Args: + formatted_answer: Agent's response to log. + """ if self.agent is None: raise ValueError("Agent cannot be None") @@ -292,44 +351,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): ), ) - def _summarize_messages(self) -> None: - messages_groups = [] - for message in self.messages: - content = message["content"] - cut_size = self.llm.get_context_window_size() - for i in range(0, len(content), cut_size): - messages_groups.append({"content": content[i : i + cut_size]}) - - summarized_contents = [] - for group in messages_groups: - summary = self.llm.call( - [ - format_message_for_llm( - self._i18n.slice("summarizer_system_message"), role="system" - ), - format_message_for_llm( - self._i18n.slice("summarize_instruction").format( - group=group["content"] - ), - ), - ], - callbacks=self.callbacks, - ) - summarized_contents.append({"content": str(summary)}) - - merged_summary = " ".join(content["content"] for content in summarized_contents) - - self.messages = [ - format_message_for_llm( - self._i18n.slice("summary").format(merged_summary=merged_summary) - ) - ] - def _handle_crew_training_output( - self, result: AgentFinish, human_feedback: Optional[str] = None + self, result: AgentFinish, human_feedback: str | None = None ) -> None: - """Handle the process of saving training data.""" - agent_id = str(self.agent.id) # type: ignore + """Save training data. + + Args: + result: Agent's final output. + human_feedback: Optional feedback from human. + """ + agent_id = str(self.agent.id) train_iteration = ( getattr(self.crew, "_train_iteration", None) if self.crew else None ) @@ -371,20 +402,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): training_data[agent_id] = agent_training_data training_handler.save(training_data) - def _format_prompt(self, prompt: str, inputs: Dict[str, str]) -> str: + @staticmethod + def _format_prompt(prompt: str, inputs: dict[str, str]) -> str: + """Format prompt with input values. + + Args: + prompt: Template string. + inputs: Values to substitute. + + Returns: + Formatted prompt. + """ prompt = prompt.replace("{input}", inputs["input"]) prompt = prompt.replace("{tool_names}", inputs["tool_names"]) prompt = prompt.replace("{tools}", inputs["tools"]) return prompt def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish: - """Handle human feedback with different flows for training vs regular use. + """Process human feedback. Args: - formatted_answer: The initial AgentFinish result to get feedback on + formatted_answer: Initial agent result. Returns: - AgentFinish: The final answer after processing feedback + Final answer after feedback. """ human_feedback = self._ask_human_input(formatted_answer.output) @@ -394,13 +435,25 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): return self._handle_regular_feedback(formatted_answer, human_feedback) def _is_training_mode(self) -> bool: - """Check if crew is in training mode.""" + """Check if training mode is active. + + Returns: + True if in training mode. + """ return bool(self.crew and self.crew._train) def _handle_training_feedback( self, initial_answer: AgentFinish, feedback: str ) -> AgentFinish: - """Process feedback for training scenarios with single iteration.""" + """Process training feedback. + + Args: + initial_answer: Initial agent output. + feedback: Training feedback. + + Returns: + Improved answer. + """ self._handle_crew_training_output(initial_answer, feedback) self.messages.append( format_message_for_llm( @@ -415,7 +468,15 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): def _handle_regular_feedback( self, current_answer: AgentFinish, initial_feedback: str ) -> AgentFinish: - """Process feedback for regular use with potential multiple iterations.""" + """Process regular feedback iteratively. + + Args: + current_answer: Current agent output. + initial_feedback: Initial user feedback. + + Returns: + Final answer after iterations. + """ feedback = initial_feedback answer = current_answer @@ -430,30 +491,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): return answer def _process_feedback_iteration(self, feedback: str) -> AgentFinish: - """Process a single feedback iteration.""" + """Process single feedback iteration. + + Args: + feedback: User feedback. + + Returns: + Updated agent response. + """ self.messages.append( format_message_for_llm( self._i18n.slice("feedback_instructions").format(feedback=feedback) ) ) return self._invoke_loop() - - def _log_feedback_error(self, retry_count: int, error: Exception) -> None: - """Log feedback processing errors.""" - self._printer.print( - content=( - f"Error processing feedback: {error}. " - f"Retrying... ({retry_count + 1}/{MAX_LLM_RETRY})" - ), - color="red", - ) - - def _log_max_retries_exceeded(self) -> None: - """Log when max retries for feedback processing are exceeded.""" - self._printer.print( - content=( - f"Failed to process feedback after {MAX_LLM_RETRY} attempts. " - "Ending feedback loop." - ), - color="red", - )