Merge branch 'main' into gl/fix/cache-handler-types-and-imports

This commit is contained in:
Greyson LaLonde
2025-09-04 16:01:26 -04:00
committed by GitHub

View File

@@ -1,4 +1,11 @@
from typing import Any, Callable, Dict, List, Optional, Union
"""Agent executor for crew AI agents.
Handles agent execution flow including LLM interactions, tool execution,
and memory management.
"""
from collections.abc import Callable
from typing import Any
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
@@ -8,8 +15,12 @@ from crewai.agents.parser import (
OutputParserException,
)
from crewai.agents.tools_handler import ToolsHandler
from crewai.llm import BaseLLM
from crewai.tools.base_tool import BaseTool
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.logging_events import (
AgentLogsExecutionEvent,
AgentLogsStartedEvent,
)
from crewai.llms.base_llm import BaseLLM
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
from crewai.utilities import I18N, Printer
@@ -26,19 +37,17 @@ from crewai.utilities.agent_utils import (
is_context_length_exceeded,
process_llm_response,
)
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.logger import Logger
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.tool_utils import execute_tool_and_check_finality
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.events.types.logging_events import (
AgentLogsStartedEvent,
AgentLogsExecutionEvent,
)
from crewai.events.event_bus import crewai_event_bus
class CrewAgentExecutor(CrewAgentExecutorMixin):
_logger: Logger = Logger()
"""Executor for crew agents.
Manages the execution lifecycle of an agent including prompt formatting,
LLM interactions, tool execution, and feedback handling.
"""
def __init__(
self,
@@ -48,18 +57,39 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
agent: BaseAgent,
prompt: dict[str, str],
max_iter: int,
tools: List[CrewStructuredTool],
tools: list[CrewStructuredTool],
tools_names: str,
stop_words: List[str],
stop_words: list[str],
tools_description: str,
tools_handler: ToolsHandler,
step_callback: Any = None,
original_tools: List[Any] | None = None,
original_tools: list[Any] | None = None,
function_calling_llm: Any = None,
respect_context_window: bool = False,
request_within_rpm_limit: Optional[Callable[[], bool]] = None,
callbacks: List[Any] | None = None,
):
request_within_rpm_limit: Callable[[], bool] | None = None,
callbacks: list[Any] | None = None,
) -> None:
"""Initialize executor.
Args:
llm: Language model instance.
task: Task to execute.
crew: Crew instance.
agent: Agent to execute.
prompt: Prompt templates.
max_iter: Maximum iterations.
tools: Available tools.
tools_names: Tool names string.
stop_words: Stop word list.
tools_description: Tool descriptions.
tools_handler: Tool handler instance.
step_callback: Optional step callback.
original_tools: Original tool list.
function_calling_llm: Optional function calling LLM.
respect_context_window: Respect context limits.
request_within_rpm_limit: RPM limit check function.
callbacks: Optional callbacks list.
"""
self._i18n: I18N = I18N()
self.llm: BaseLLM = llm
self.task = task
@@ -81,12 +111,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.respect_context_window = respect_context_window
self.request_within_rpm_limit = request_within_rpm_limit
self.ask_for_human_input = False
self.messages: List[Dict[str, str]] = []
self.messages: list[dict[str, str]] = []
self.iterations = 0
self.log_error_after = 3
self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = {
tool.name: tool for tool in self.tools
}
existing_stop = self.llm.stop or []
self.llm.stop = list(
set(
@@ -96,7 +123,15 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
)
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
def invoke(self, inputs: dict[str, str]) -> dict[str, Any]:
"""Execute the agent with given inputs.
Args:
inputs: Input dictionary containing prompt variables.
Returns:
Dictionary with agent output.
"""
if "system" in self.prompt:
system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs)
user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs)
@@ -131,9 +166,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
return {"output": formatted_answer.output}
def _invoke_loop(self) -> AgentFinish:
"""
Main loop to invoke the agent's thought process until it reaches a conclusion
or the maximum number of iterations is reached.
"""Execute agent loop until completion.
Returns:
Final answer from the agent.
"""
formatted_answer = None
while not isinstance(formatted_answer, AgentFinish):
@@ -190,7 +226,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
self._invoke_step_callback(formatted_answer)
self._append_message(formatted_answer.text, role="assistant")
self._append_message(formatted_answer.text)
except OutputParserException as e:
formatted_answer = handle_output_parser_exception(
@@ -231,8 +267,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> Union[AgentAction, AgentFinish]:
"""Handle the AgentAction, execute tools, and process the results."""
) -> AgentAction | AgentFinish:
"""Process agent action and tool execution.
Args:
formatted_answer: Agent's action to execute.
tool_result: Result from tool execution.
Returns:
Updated action or final answer.
"""
# Special case for add_image_tool
add_image_tool = self._i18n.tools("add_image")
if (
@@ -251,17 +295,28 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
show_logs=self._show_logs,
)
def _invoke_step_callback(self, formatted_answer) -> None:
"""Invoke the step callback if it exists."""
def _invoke_step_callback(
self, formatted_answer: AgentAction | AgentFinish
) -> None:
"""Invoke step callback.
Args:
formatted_answer: Current agent response.
"""
if self.step_callback:
self.step_callback(formatted_answer)
def _append_message(self, text: str, role: str = "assistant") -> None:
"""Append a message to the message list with the given role."""
"""Add message to conversation history.
Args:
text: Message content.
role: Message role (default: assistant).
"""
self.messages.append(format_message_for_llm(text, role=role))
def _show_start_logs(self):
"""Show logs for the start of agent execution."""
def _show_start_logs(self) -> None:
"""Emit agent start event."""
if self.agent is None:
raise ValueError("Agent cannot be None")
@@ -277,8 +332,12 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
),
)
def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]):
"""Show logs for the agent's execution."""
def _show_logs(self, formatted_answer: AgentAction | AgentFinish) -> None:
"""Emit agent execution event.
Args:
formatted_answer: Agent's response to log.
"""
if self.agent is None:
raise ValueError("Agent cannot be None")
@@ -292,44 +351,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
),
)
def _summarize_messages(self) -> None:
messages_groups = []
for message in self.messages:
content = message["content"]
cut_size = self.llm.get_context_window_size()
for i in range(0, len(content), cut_size):
messages_groups.append({"content": content[i : i + cut_size]})
summarized_contents = []
for group in messages_groups:
summary = self.llm.call(
[
format_message_for_llm(
self._i18n.slice("summarizer_system_message"), role="system"
),
format_message_for_llm(
self._i18n.slice("summarize_instruction").format(
group=group["content"]
),
),
],
callbacks=self.callbacks,
)
summarized_contents.append({"content": str(summary)})
merged_summary = " ".join(content["content"] for content in summarized_contents)
self.messages = [
format_message_for_llm(
self._i18n.slice("summary").format(merged_summary=merged_summary)
)
]
def _handle_crew_training_output(
self, result: AgentFinish, human_feedback: Optional[str] = None
self, result: AgentFinish, human_feedback: str | None = None
) -> None:
"""Handle the process of saving training data."""
agent_id = str(self.agent.id) # type: ignore
"""Save training data.
Args:
result: Agent's final output.
human_feedback: Optional feedback from human.
"""
agent_id = str(self.agent.id)
train_iteration = (
getattr(self.crew, "_train_iteration", None) if self.crew else None
)
@@ -371,20 +402,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
training_data[agent_id] = agent_training_data
training_handler.save(training_data)
def _format_prompt(self, prompt: str, inputs: Dict[str, str]) -> str:
@staticmethod
def _format_prompt(prompt: str, inputs: dict[str, str]) -> str:
"""Format prompt with input values.
Args:
prompt: Template string.
inputs: Values to substitute.
Returns:
Formatted prompt.
"""
prompt = prompt.replace("{input}", inputs["input"])
prompt = prompt.replace("{tool_names}", inputs["tool_names"])
prompt = prompt.replace("{tools}", inputs["tools"])
return prompt
def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""Handle human feedback with different flows for training vs regular use.
"""Process human feedback.
Args:
formatted_answer: The initial AgentFinish result to get feedback on
formatted_answer: Initial agent result.
Returns:
AgentFinish: The final answer after processing feedback
Final answer after feedback.
"""
human_feedback = self._ask_human_input(formatted_answer.output)
@@ -394,13 +435,25 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
return self._handle_regular_feedback(formatted_answer, human_feedback)
def _is_training_mode(self) -> bool:
"""Check if crew is in training mode."""
"""Check if training mode is active.
Returns:
True if in training mode.
"""
return bool(self.crew and self.crew._train)
def _handle_training_feedback(
self, initial_answer: AgentFinish, feedback: str
) -> AgentFinish:
"""Process feedback for training scenarios with single iteration."""
"""Process training feedback.
Args:
initial_answer: Initial agent output.
feedback: Training feedback.
Returns:
Improved answer.
"""
self._handle_crew_training_output(initial_answer, feedback)
self.messages.append(
format_message_for_llm(
@@ -415,7 +468,15 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _handle_regular_feedback(
self, current_answer: AgentFinish, initial_feedback: str
) -> AgentFinish:
"""Process feedback for regular use with potential multiple iterations."""
"""Process regular feedback iteratively.
Args:
current_answer: Current agent output.
initial_feedback: Initial user feedback.
Returns:
Final answer after iterations.
"""
feedback = initial_feedback
answer = current_answer
@@ -430,30 +491,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
return answer
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process a single feedback iteration."""
"""Process single feedback iteration.
Args:
feedback: User feedback.
Returns:
Updated agent response.
"""
self.messages.append(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
return self._invoke_loop()
def _log_feedback_error(self, retry_count: int, error: Exception) -> None:
"""Log feedback processing errors."""
self._printer.print(
content=(
f"Error processing feedback: {error}. "
f"Retrying... ({retry_count + 1}/{MAX_LLM_RETRY})"
),
color="red",
)
def _log_max_retries_exceeded(self) -> None:
"""Log when max retries for feedback processing are exceeded."""
self._printer.print(
content=(
f"Failed to process feedback after {MAX_LLM_RETRY} attempts. "
"Ending feedback loop."
),
color="red",
)