refactor: cleanup crew agent executor (#3440)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

refactor: cleanup crew agent executor & add docs

- Remove dead code, unused imports, and obsolete methods
- Modernize with updated type hints and static _format_prompt
- Add docstrings for clarity
This commit is contained in:
Greyson LaLonde
2025-09-04 15:32:47 -04:00
committed by GitHub
parent f0def350a4
commit ab82da02f9

View File

@@ -1,4 +1,11 @@
from typing import Any, Callable, Dict, List, Optional, Union """Agent executor for crew AI agents.
Handles agent execution flow including LLM interactions, tool execution,
and memory management.
"""
from collections.abc import Callable
from typing import Any
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
@@ -8,8 +15,12 @@ from crewai.agents.parser import (
OutputParserException, OutputParserException,
) )
from crewai.agents.tools_handler import ToolsHandler from crewai.agents.tools_handler import ToolsHandler
from crewai.llm import BaseLLM from crewai.events.event_bus import crewai_event_bus
from crewai.tools.base_tool import BaseTool from crewai.events.types.logging_events import (
AgentLogsExecutionEvent,
AgentLogsStartedEvent,
)
from crewai.llms.base_llm import BaseLLM
from crewai.tools.structured_tool import CrewStructuredTool from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult from crewai.tools.tool_types import ToolResult
from crewai.utilities import I18N, Printer from crewai.utilities import I18N, Printer
@@ -26,19 +37,17 @@ from crewai.utilities.agent_utils import (
is_context_length_exceeded, is_context_length_exceeded,
process_llm_response, process_llm_response,
) )
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.logger import Logger
from crewai.utilities.tool_utils import execute_tool_and_check_finality from crewai.utilities.tool_utils import execute_tool_and_check_finality
from crewai.utilities.training_handler import CrewTrainingHandler from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.events.types.logging_events import (
AgentLogsStartedEvent,
AgentLogsExecutionEvent,
)
from crewai.events.event_bus import crewai_event_bus
class CrewAgentExecutor(CrewAgentExecutorMixin): class CrewAgentExecutor(CrewAgentExecutorMixin):
_logger: Logger = Logger() """Executor for crew agents.
Manages the execution lifecycle of an agent including prompt formatting,
LLM interactions, tool execution, and feedback handling.
"""
def __init__( def __init__(
self, self,
@@ -48,18 +57,39 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
agent: BaseAgent, agent: BaseAgent,
prompt: dict[str, str], prompt: dict[str, str],
max_iter: int, max_iter: int,
tools: List[CrewStructuredTool], tools: list[CrewStructuredTool],
tools_names: str, tools_names: str,
stop_words: List[str], stop_words: list[str],
tools_description: str, tools_description: str,
tools_handler: ToolsHandler, tools_handler: ToolsHandler,
step_callback: Any = None, step_callback: Any = None,
original_tools: List[Any] | None = None, original_tools: list[Any] | None = None,
function_calling_llm: Any = None, function_calling_llm: Any = None,
respect_context_window: bool = False, respect_context_window: bool = False,
request_within_rpm_limit: Optional[Callable[[], bool]] = None, request_within_rpm_limit: Callable[[], bool] | None = None,
callbacks: List[Any] | None = None, callbacks: list[Any] | None = None,
): ) -> None:
"""Initialize executor.
Args:
llm: Language model instance.
task: Task to execute.
crew: Crew instance.
agent: Agent to execute.
prompt: Prompt templates.
max_iter: Maximum iterations.
tools: Available tools.
tools_names: Tool names string.
stop_words: Stop word list.
tools_description: Tool descriptions.
tools_handler: Tool handler instance.
step_callback: Optional step callback.
original_tools: Original tool list.
function_calling_llm: Optional function calling LLM.
respect_context_window: Respect context limits.
request_within_rpm_limit: RPM limit check function.
callbacks: Optional callbacks list.
"""
self._i18n: I18N = I18N() self._i18n: I18N = I18N()
self.llm: BaseLLM = llm self.llm: BaseLLM = llm
self.task = task self.task = task
@@ -81,12 +111,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.respect_context_window = respect_context_window self.respect_context_window = respect_context_window
self.request_within_rpm_limit = request_within_rpm_limit self.request_within_rpm_limit = request_within_rpm_limit
self.ask_for_human_input = False self.ask_for_human_input = False
self.messages: List[Dict[str, str]] = [] self.messages: list[dict[str, str]] = []
self.iterations = 0 self.iterations = 0
self.log_error_after = 3 self.log_error_after = 3
self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = {
tool.name: tool for tool in self.tools
}
existing_stop = self.llm.stop or [] existing_stop = self.llm.stop or []
self.llm.stop = list( self.llm.stop = list(
set( set(
@@ -96,7 +123,15 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
) )
) )
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]: def invoke(self, inputs: dict[str, str]) -> dict[str, Any]:
"""Execute the agent with given inputs.
Args:
inputs: Input dictionary containing prompt variables.
Returns:
Dictionary with agent output.
"""
if "system" in self.prompt: if "system" in self.prompt:
system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs) system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs)
user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs) user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs)
@@ -131,9 +166,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
return {"output": formatted_answer.output} return {"output": formatted_answer.output}
def _invoke_loop(self) -> AgentFinish: def _invoke_loop(self) -> AgentFinish:
""" """Execute agent loop until completion.
Main loop to invoke the agent's thought process until it reaches a conclusion
or the maximum number of iterations is reached. Returns:
Final answer from the agent.
""" """
formatted_answer = None formatted_answer = None
while not isinstance(formatted_answer, AgentFinish): while not isinstance(formatted_answer, AgentFinish):
@@ -190,7 +226,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
) )
self._invoke_step_callback(formatted_answer) self._invoke_step_callback(formatted_answer)
self._append_message(formatted_answer.text, role="assistant") self._append_message(formatted_answer.text)
except OutputParserException as e: except OutputParserException as e:
formatted_answer = handle_output_parser_exception( formatted_answer = handle_output_parser_exception(
@@ -231,8 +267,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _handle_agent_action( def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult self, formatted_answer: AgentAction, tool_result: ToolResult
) -> Union[AgentAction, AgentFinish]: ) -> AgentAction | AgentFinish:
"""Handle the AgentAction, execute tools, and process the results.""" """Process agent action and tool execution.
Args:
formatted_answer: Agent's action to execute.
tool_result: Result from tool execution.
Returns:
Updated action or final answer.
"""
# Special case for add_image_tool # Special case for add_image_tool
add_image_tool = self._i18n.tools("add_image") add_image_tool = self._i18n.tools("add_image")
if ( if (
@@ -251,17 +295,28 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
show_logs=self._show_logs, show_logs=self._show_logs,
) )
def _invoke_step_callback(self, formatted_answer) -> None: def _invoke_step_callback(
"""Invoke the step callback if it exists.""" self, formatted_answer: AgentAction | AgentFinish
) -> None:
"""Invoke step callback.
Args:
formatted_answer: Current agent response.
"""
if self.step_callback: if self.step_callback:
self.step_callback(formatted_answer) self.step_callback(formatted_answer)
def _append_message(self, text: str, role: str = "assistant") -> None: def _append_message(self, text: str, role: str = "assistant") -> None:
"""Append a message to the message list with the given role.""" """Add message to conversation history.
Args:
text: Message content.
role: Message role (default: assistant).
"""
self.messages.append(format_message_for_llm(text, role=role)) self.messages.append(format_message_for_llm(text, role=role))
def _show_start_logs(self): def _show_start_logs(self) -> None:
"""Show logs for the start of agent execution.""" """Emit agent start event."""
if self.agent is None: if self.agent is None:
raise ValueError("Agent cannot be None") raise ValueError("Agent cannot be None")
@@ -277,8 +332,12 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
), ),
) )
def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]): def _show_logs(self, formatted_answer: AgentAction | AgentFinish) -> None:
"""Show logs for the agent's execution.""" """Emit agent execution event.
Args:
formatted_answer: Agent's response to log.
"""
if self.agent is None: if self.agent is None:
raise ValueError("Agent cannot be None") raise ValueError("Agent cannot be None")
@@ -292,44 +351,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
), ),
) )
def _summarize_messages(self) -> None:
messages_groups = []
for message in self.messages:
content = message["content"]
cut_size = self.llm.get_context_window_size()
for i in range(0, len(content), cut_size):
messages_groups.append({"content": content[i : i + cut_size]})
summarized_contents = []
for group in messages_groups:
summary = self.llm.call(
[
format_message_for_llm(
self._i18n.slice("summarizer_system_message"), role="system"
),
format_message_for_llm(
self._i18n.slice("summarize_instruction").format(
group=group["content"]
),
),
],
callbacks=self.callbacks,
)
summarized_contents.append({"content": str(summary)})
merged_summary = " ".join(content["content"] for content in summarized_contents)
self.messages = [
format_message_for_llm(
self._i18n.slice("summary").format(merged_summary=merged_summary)
)
]
def _handle_crew_training_output( def _handle_crew_training_output(
self, result: AgentFinish, human_feedback: Optional[str] = None self, result: AgentFinish, human_feedback: str | None = None
) -> None: ) -> None:
"""Handle the process of saving training data.""" """Save training data.
agent_id = str(self.agent.id) # type: ignore
Args:
result: Agent's final output.
human_feedback: Optional feedback from human.
"""
agent_id = str(self.agent.id)
train_iteration = ( train_iteration = (
getattr(self.crew, "_train_iteration", None) if self.crew else None getattr(self.crew, "_train_iteration", None) if self.crew else None
) )
@@ -371,20 +402,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
training_data[agent_id] = agent_training_data training_data[agent_id] = agent_training_data
training_handler.save(training_data) training_handler.save(training_data)
def _format_prompt(self, prompt: str, inputs: Dict[str, str]) -> str: @staticmethod
def _format_prompt(prompt: str, inputs: dict[str, str]) -> str:
"""Format prompt with input values.
Args:
prompt: Template string.
inputs: Values to substitute.
Returns:
Formatted prompt.
"""
prompt = prompt.replace("{input}", inputs["input"]) prompt = prompt.replace("{input}", inputs["input"])
prompt = prompt.replace("{tool_names}", inputs["tool_names"]) prompt = prompt.replace("{tool_names}", inputs["tool_names"])
prompt = prompt.replace("{tools}", inputs["tools"]) prompt = prompt.replace("{tools}", inputs["tools"])
return prompt return prompt
def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish: def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""Handle human feedback with different flows for training vs regular use. """Process human feedback.
Args: Args:
formatted_answer: The initial AgentFinish result to get feedback on formatted_answer: Initial agent result.
Returns: Returns:
AgentFinish: The final answer after processing feedback Final answer after feedback.
""" """
human_feedback = self._ask_human_input(formatted_answer.output) human_feedback = self._ask_human_input(formatted_answer.output)
@@ -394,13 +435,25 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
return self._handle_regular_feedback(formatted_answer, human_feedback) return self._handle_regular_feedback(formatted_answer, human_feedback)
def _is_training_mode(self) -> bool: def _is_training_mode(self) -> bool:
"""Check if crew is in training mode.""" """Check if training mode is active.
Returns:
True if in training mode.
"""
return bool(self.crew and self.crew._train) return bool(self.crew and self.crew._train)
def _handle_training_feedback( def _handle_training_feedback(
self, initial_answer: AgentFinish, feedback: str self, initial_answer: AgentFinish, feedback: str
) -> AgentFinish: ) -> AgentFinish:
"""Process feedback for training scenarios with single iteration.""" """Process training feedback.
Args:
initial_answer: Initial agent output.
feedback: Training feedback.
Returns:
Improved answer.
"""
self._handle_crew_training_output(initial_answer, feedback) self._handle_crew_training_output(initial_answer, feedback)
self.messages.append( self.messages.append(
format_message_for_llm( format_message_for_llm(
@@ -415,7 +468,15 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _handle_regular_feedback( def _handle_regular_feedback(
self, current_answer: AgentFinish, initial_feedback: str self, current_answer: AgentFinish, initial_feedback: str
) -> AgentFinish: ) -> AgentFinish:
"""Process feedback for regular use with potential multiple iterations.""" """Process regular feedback iteratively.
Args:
current_answer: Current agent output.
initial_feedback: Initial user feedback.
Returns:
Final answer after iterations.
"""
feedback = initial_feedback feedback = initial_feedback
answer = current_answer answer = current_answer
@@ -430,30 +491,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
return answer return answer
def _process_feedback_iteration(self, feedback: str) -> AgentFinish: def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process a single feedback iteration.""" """Process single feedback iteration.
Args:
feedback: User feedback.
Returns:
Updated agent response.
"""
self.messages.append( self.messages.append(
format_message_for_llm( format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback) self._i18n.slice("feedback_instructions").format(feedback=feedback)
) )
) )
return self._invoke_loop() return self._invoke_loop()
def _log_feedback_error(self, retry_count: int, error: Exception) -> None:
"""Log feedback processing errors."""
self._printer.print(
content=(
f"Error processing feedback: {error}. "
f"Retrying... ({retry_count + 1}/{MAX_LLM_RETRY})"
),
color="red",
)
def _log_max_retries_exceeded(self) -> None:
"""Log when max retries for feedback processing are exceeded."""
self._printer.print(
content=(
f"Failed to process feedback after {MAX_LLM_RETRY} attempts. "
"Ending feedback loop."
),
color="red",
)