diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index 97f948425..b93d3806c 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -1,6 +1,5 @@ import json import re -from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional, Union from crewai.agents.agent_builder.base_agent import BaseAgent @@ -14,36 +13,28 @@ from crewai.agents.tools_handler import ToolsHandler from crewai.llm import LLM from crewai.tools.base_tool import BaseTool from crewai.tools.structured_tool import CrewStructuredTool -from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException +from crewai.tools.tool_types import ToolResult from crewai.utilities import I18N, Printer from crewai.utilities.agent_utils import ( enforce_rpm_limit, format_message_for_llm, get_llm_response, + handle_agent_action_core, + handle_context_length, handle_max_iterations_exceeded, + handle_output_parser_exception, + handle_unknown_error, has_reached_max_iterations, + is_context_length_exceeded, process_llm_response, + show_agent_logs, ) from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE -from crewai.utilities.events import ( - ToolUsageErrorEvent, - ToolUsageStartedEvent, - crewai_event_bus, -) -from crewai.utilities.events.tool_usage_events import ToolUsageStartedEvent -from crewai.utilities.exceptions.context_window_exceeding_exception import ( - LLMContextLengthExceededException, -) from crewai.utilities.logger import Logger +from crewai.utilities.tool_utils import execute_tool_and_check_finality from crewai.utilities.training_handler import CrewTrainingHandler -@dataclass -class ToolResult: - result: Any - result_as_answer: bool - - class CrewAgentExecutor(CrewAgentExecutorMixin): _logger: Logger = Logger() @@ -120,7 +111,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): ) raise except Exception as e: - self._handle_unknown_error(e) + handle_unknown_error(self._printer, e) if e.__class__.__module__.startswith("litellm"): # Do not retry on litellm errors raise e @@ -163,8 +154,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): formatted_answer = process_llm_response(answer, self.use_stop_words) if isinstance(formatted_answer, AgentAction): - tool_result = self._execute_tool_and_check_finality( - formatted_answer + tool_result = execute_tool_and_check_finality( + agent_action=formatted_answer, + tools=self.tools, + i18n=self._i18n, + agent_key=self.agent.key if self.agent else None, + agent_role=self.agent.role if self.agent else None, + tools_handler=self.tools_handler, + task=self.task, + agent=self.agent, + function_calling_llm=self.function_calling_llm, ) formatted_answer = self._handle_agent_action( formatted_answer, tool_result @@ -174,17 +173,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self._append_message(formatted_answer.text, role="assistant") except OutputParserException as e: - formatted_answer = self._handle_output_parser_exception(e) + formatted_answer = handle_output_parser_exception( + e=e, + messages=self.messages, + iterations=self.iterations, + log_error_after=self.log_error_after, + printer=self._printer, + ) except Exception as e: if e.__class__.__module__.startswith("litellm"): # Do not retry on litellm errors raise e - if self._is_context_length_exceeded(e): - self._handle_context_length() + if is_context_length_exceeded(e): + handle_context_length( + respect_context_window=self.respect_context_window, + printer=self._printer, + messages=self.messages, + llm=self.llm, + callbacks=self.callbacks, + i18n=self._i18n, + ) continue else: - self._handle_unknown_error(e) + handle_unknown_error(self._printer, e) raise e finally: self.iterations += 1 @@ -197,45 +209,27 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self._show_logs(formatted_answer) return formatted_answer - def _handle_unknown_error(self, exception: Exception) -> None: - """Handle unknown errors by informing the user.""" - self._printer.print( - content="An unknown error occurred. Please check the details below.", - color="red", - ) - self._printer.print( - content=f"Error details: {exception}", - color="red", - ) - def _handle_agent_action( self, formatted_answer: AgentAction, tool_result: ToolResult ) -> Union[AgentAction, AgentFinish]: """Handle the AgentAction, execute tools, and process the results.""" + # Special case for add_image_tool add_image_tool = self._i18n.tools("add_image") if ( isinstance(add_image_tool, dict) and formatted_answer.tool.casefold().strip() == add_image_tool.get("name", "").casefold().strip() ): - self.messages.append(tool_result.result) - return formatted_answer # Continue the loop + self.messages.append({"role": "assistant", "content": tool_result.result}) + return formatted_answer - if self.step_callback: - self.step_callback(tool_result) - - formatted_answer.text += f"\nObservation: {tool_result.result}" - formatted_answer.result = tool_result.result - - if tool_result.result_as_answer: - return AgentFinish( - thought="", - output=tool_result.result, - text=formatted_answer.text, - ) - - self._show_logs(formatted_answer) - return formatted_answer + return handle_agent_action_core( + formatted_answer=formatted_answer, + tool_result=tool_result, + messages=self.messages, + step_callback=self.step_callback, + show_logs=self._show_logs, + ) def _invoke_step_callback(self, formatted_answer) -> None: """Invoke the step callback if it exists.""" @@ -246,145 +240,31 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): """Append a message to the message list with the given role.""" self.messages.append(format_message_for_llm(text, role=role)) - def _handle_output_parser_exception(self, e: OutputParserException) -> AgentAction: - """Handle OutputParserException by updating messages and formatted_answer.""" - self.messages.append({"role": "user", "content": e.error}) - - formatted_answer = AgentAction( - text=e.error, - tool="", - tool_input="", - thought="", + def _show_start_logs(self): + """Show logs for the start of agent execution.""" + if self.agent is None: + raise ValueError("Agent cannot be None") + show_agent_logs( + printer=self._printer, + agent_role=self.agent.role, + task_description=( + getattr(self.task, "description") if self.task else "Not Found" + ), + verbose=self.agent.verbose + or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)), ) - if self.iterations > self.log_error_after: - self._printer.print( - content=f"Error parsing LLM output, agent will retry: {e.error}", - color="red", - ) - - return formatted_answer - - def _is_context_length_exceeded(self, exception: Exception) -> bool: - """Check if the exception is due to context length exceeding.""" - return LLMContextLengthExceededException( - str(exception) - )._is_context_limit_error(str(exception)) - - def _show_start_logs(self): - if self.agent is None: - raise ValueError("Agent cannot be None") - if self.agent.verbose or ( - hasattr(self, "crew") and getattr(self.crew, "verbose", False) - ): - agent_role = self.agent.role.split("\n")[0] - self._printer.print( - content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m" - ) - description = ( - getattr(self.task, "description") if self.task else "Not Found" - ) - self._printer.print( - content=f"\033[95m## Task:\033[00m \033[92m{description}\033[00m" - ) - def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]): + """Show logs for the agent's execution.""" if self.agent is None: raise ValueError("Agent cannot be None") - if self.agent.verbose or ( - hasattr(self, "crew") and getattr(self.crew, "verbose", False) - ): - agent_role = self.agent.role.split("\n")[0] - if isinstance(formatted_answer, AgentAction): - thought = re.sub(r"\n+", "\n", formatted_answer.thought) - formatted_json = json.dumps( - formatted_answer.tool_input, - indent=2, - ensure_ascii=False, - ) - self._printer.print( - content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m" - ) - if thought and thought != "": - self._printer.print( - content=f"\033[95m## Thought:\033[00m \033[92m{thought}\033[00m" - ) - self._printer.print( - content=f"\033[95m## Using tool:\033[00m \033[92m{formatted_answer.tool}\033[00m" - ) - self._printer.print( - content=f"\033[95m## Tool Input:\033[00m \033[92m\n{formatted_json}\033[00m" - ) - self._printer.print( - content=f"\033[95m## Tool Output:\033[00m \033[92m\n{formatted_answer.result}\033[00m" - ) - elif isinstance(formatted_answer, AgentFinish): - self._printer.print( - content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m" - ) - self._printer.print( - content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m\n\n" - ) - - def _execute_tool_and_check_finality(self, agent_action: AgentAction) -> ToolResult: - try: - if self.agent: - crewai_event_bus.emit( - self, - event=ToolUsageStartedEvent( - agent_key=self.agent.key, - agent_role=self.agent.role, - tool_name=agent_action.tool, - tool_args=agent_action.tool_input, - tool_class=agent_action.tool, - ), - ) - tool_usage = ToolUsage( - tools_handler=self.tools_handler, - tools=self.tools, - function_calling_llm=self.function_calling_llm, - task=self.task, - agent=self.agent, - action=agent_action, - ) - tool_calling = tool_usage.parse_tool_calling(agent_action.text) - - if isinstance(tool_calling, ToolUsageErrorException): - tool_result = tool_calling.message - return ToolResult(result=tool_result, result_as_answer=False) - else: - if tool_calling.tool_name.casefold().strip() in [ - name.casefold().strip() for name in self.tool_name_to_tool_map - ] or tool_calling.tool_name.casefold().replace("_", " ") in [ - name.casefold().strip() for name in self.tool_name_to_tool_map - ]: - tool_result = tool_usage.use(tool_calling, agent_action.text) - tool = self.tool_name_to_tool_map.get(tool_calling.tool_name) - if tool: - return ToolResult( - result=tool_result, result_as_answer=tool.result_as_answer - ) - else: - tool_result = self._i18n.errors("wrong_tool_name").format( - tool=tool_calling.tool_name, - tools=", ".join([tool.name.casefold() for tool in self.tools]), - ) - return ToolResult(result=tool_result, result_as_answer=False) - - except Exception as e: - if self.agent: - crewai_event_bus.emit( - self, - event=ToolUsageErrorEvent( # validation error - agent_key=self.agent.key, - agent_role=self.agent.role, - tool_name=agent_action.tool, - tool_args=agent_action.tool_input, - tool_class=agent_action.tool, - error=str(e), - ), - ) - raise e + show_agent_logs( + printer=self._printer, + agent_role=self.agent.role, + formatted_answer=formatted_answer, + verbose=self.agent.verbose + or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)), + ) def _summarize_messages(self) -> None: messages_groups = [] @@ -392,7 +272,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): content = message["content"] cut_size = self.llm.get_context_window_size() for i in range(0, len(content), cut_size): - messages_groups.append(content[i : i + cut_size]) + messages_groups.append({"content": content[i : i + cut_size]}) summarized_contents = [] for group in messages_groups: @@ -402,14 +282,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self._i18n.slice("summarizer_system_message"), role="system" ), format_message_for_llm( - self._i18n.slice("summarize_instruction").format(group=group), + self._i18n.slice("summarize_instruction").format( + group=group["content"] + ), ), ], callbacks=self.callbacks, ) - summarized_contents.append(summary) + summarized_contents.append({"content": str(summary)}) - merged_summary = " ".join(str(content) for content in summarized_contents) + merged_summary = " ".join(content["content"] for content in summarized_contents) self.messages = [ format_message_for_llm( @@ -417,22 +299,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): ) ] - def _handle_context_length(self) -> None: - if self.respect_context_window: - self._printer.print( - content="Context length exceeded. Summarizing content to fit the model context window.", - color="yellow", - ) - self._summarize_messages() - else: - self._printer.print( - content="Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.", - color="red", - ) - raise SystemExit( - "Context length exceeded and user opted not to summarize. Consider using smaller text or RAG tools from crewai_tools." - ) - def _handle_crew_training_output( self, result: AgentFinish, human_feedback: Optional[str] = None ) -> None: diff --git a/src/crewai/lite_agent.py b/src/crewai/lite_agent.py index 683a3c887..10b0ac919 100644 --- a/src/crewai/lite_agent.py +++ b/src/crewai/lite_agent.py @@ -17,18 +17,23 @@ from crewai.agents.parser import ( from crewai.llm import LLM from crewai.tools.base_tool import BaseTool from crewai.tools.structured_tool import CrewStructuredTool -from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException from crewai.utilities import I18N from crewai.utilities.agent_utils import ( enforce_rpm_limit, format_message_for_llm, get_llm_response, get_tool_names, + handle_agent_action_core, + handle_context_length, handle_max_iterations_exceeded, + handle_output_parser_exception, + handle_unknown_error, has_reached_max_iterations, + is_context_length_exceeded, parse_tools, process_llm_response, render_text_description_and_args, + show_agent_logs, ) from crewai.utilities.converter import convert_to_model, generate_model_description from crewai.utilities.events.agent_events import ( @@ -45,14 +50,7 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import ( from crewai.utilities.llm_utils import create_llm from crewai.utilities.printer import Printer from crewai.utilities.token_counter_callback import TokenCalcHandler - - -class ToolResult: - """Result of tool execution.""" - - def __init__(self, result: str, result_as_answer: bool = False): - self.result = result - self.result_as_answer = result_as_answer +from crewai.utilities.tool_utils import execute_tool_and_check_finality class LiteAgentOutput(BaseModel): @@ -104,6 +102,7 @@ class LiteAgent(BaseModel): model_config = {"arbitrary_types_allowed": True} + # Core Agent Properties role: str = Field(description="Role of the agent") goal: str = Field(description="Goal of the agent") backstory: str = Field(description="Backstory of the agent") @@ -113,58 +112,52 @@ class LiteAgent(BaseModel): tools: List[BaseTool] = Field( default_factory=list, description="Tools at agent's disposal" ) - verbose: bool = Field( - default=False, description="Whether to print execution details" - ) + + # Execution Control Properties max_iterations: int = Field( default=15, description="Maximum number of iterations for tool usage" ) max_execution_time: Optional[int] = Field( default=None, description="Maximum execution time in seconds" ) - response_format: Optional[Type[BaseModel]] = Field( - default=None, description="Pydantic model for structured output" - ) - tools_results: List[Dict[str, Any]] = Field( - default=[], description="Results of the tools used by the agent." - ) respect_context_window: bool = Field( default=True, description="Whether to respect the context window of the LLM", ) - callbacks: List[Callable] = Field( - default=[], description="Callbacks to be used for the agent" - ) - _parsed_tools: List[CrewStructuredTool] = PrivateAttr(default_factory=list) - _token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess) - _cache_handler: CacheHandler = PrivateAttr(default_factory=CacheHandler) - _times_executed: int = PrivateAttr(default=0) - _max_retry_limit: int = PrivateAttr(default=2) - _key: str = PrivateAttr(default_factory=lambda: str(uuid.uuid4())) - # Store messages for conversation - _messages: List[Dict[str, str]] = PrivateAttr(default_factory=list) - # Iteration counter - _iterations: int = PrivateAttr(default=0) - # Tracking metrics - _formatting_errors: int = PrivateAttr(default=0) - _tools_errors: int = PrivateAttr(default=0) - _delegations: Dict[str, int] = PrivateAttr(default_factory=dict) - # Internationalization - _printer: Printer = PrivateAttr(default_factory=Printer) - - i18n: I18N = Field(default=I18N(), description="Internationalization settings.") - request_within_rpm_limit: Optional[Callable[[], bool]] = Field( - default=None, - description="Callback to check if the request is within the RPM limit", - ) use_stop_words: bool = Field( default=True, description="Whether to use stop words to prevent the LLM from using tools", ) - tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = Field( - default_factory=dict, - description="Mapping of tool names to tool instances", + request_within_rpm_limit: Optional[Callable[[], bool]] = Field( + default=None, + description="Callback to check if the request is within the RPM limit", ) + i18n: I18N = Field(default=I18N(), description="Internationalization settings.") + + # Output and Formatting Properties + response_format: Optional[Type[BaseModel]] = Field( + default=None, description="Pydantic model for structured output" + ) + verbose: bool = Field( + default=False, description="Whether to print execution details" + ) + callbacks: List[Callable] = Field( + default=[], description="Callbacks to be used for the agent" + ) + + # State and Results + tools_results: List[Dict[str, Any]] = Field( + default=[], description="Results of the tools used by the agent." + ) + + # Private Attributes + _parsed_tools: List[CrewStructuredTool] = PrivateAttr(default_factory=list) + _token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess) + _cache_handler: CacheHandler = PrivateAttr(default_factory=CacheHandler) + _key: str = PrivateAttr(default_factory=lambda: str(uuid.uuid4())) + _messages: List[Dict[str, str]] = PrivateAttr(default_factory=list) + _iterations: int = PrivateAttr(default=0) + _printer: Printer = PrivateAttr(default_factory=Printer) @model_validator(mode="after") def setup_llm(self): @@ -184,9 +177,6 @@ class LiteAgent(BaseModel): """Parse the tools and convert them to CrewStructuredTool instances.""" self._parsed_tools = parse_tools(self.tools) - # Initialize tool name to tool mapping - self.tool_name_to_tool_map = {tool.name: tool for tool in self._parsed_tools} - return self @property @@ -306,7 +296,15 @@ class LiteAgent(BaseModel): ) # Execute the agent using invoke loop - agent_finish = await self._invoke_loop() + try: + agent_finish = await self._invoke_loop() + except Exception as e: + self._printer.print( + content="Agent failed to reach a final answer. This is likely a bug - please report it.", + color="red", + ) + handle_unknown_error(self._printer, e) + raise e formatted_result: Optional[BaseModel] = None if self.response_format: @@ -333,17 +331,8 @@ class LiteAgent(BaseModel): usage_metrics=usage_metrics.model_dump() if usage_metrics else None, ) - except AssertionError: - self._printer.print( - content="Agent failed to reach a final answer. This is likely a bug - please report it.", - color="red", - ) - raise except Exception as e: - self._handle_unknown_error(e) - if e.__class__.__module__.startswith("litellm"): - # Do not retry on litellm errors - raise e + handle_unknown_error(self._printer, e) raise e async def _invoke_loop(self) -> AgentFinish: @@ -379,26 +368,45 @@ class LiteAgent(BaseModel): formatted_answer = process_llm_response(answer, self.use_stop_words) if isinstance(formatted_answer, AgentAction): - tool_result = self._execute_tool_and_check_finality( - formatted_answer + tool_result = execute_tool_and_check_finality( + agent_action=formatted_answer, + tools=self._parsed_tools, + i18n=self.i18n, + agent_key=self.key, + agent_role=self.role, ) - formatted_answer = self._handle_agent_action( - formatted_answer, tool_result + formatted_answer = handle_agent_action_core( + formatted_answer=formatted_answer, + tool_result=tool_result, + show_logs=self._show_logs, ) self._append_message(formatted_answer.text, role="assistant") except OutputParserException as e: - formatted_answer = self._handle_output_parser_exception(e) + formatted_answer = handle_output_parser_exception( + e=e, + messages=self._messages, + iterations=self._iterations, + log_error_after=3, + printer=self._printer, + ) except Exception as e: if e.__class__.__module__.startswith("litellm"): # Do not retry on litellm errors raise e - if self._is_context_length_exceeded(e): - self._handle_context_length() + if is_context_length_exceeded(e): + handle_context_length( + respect_context_window=self.respect_context_window, + printer=self._printer, + messages=self._messages, + llm=cast(LLM, self.llm), + callbacks=self._callbacks, + i18n=self.i18n, + ) continue else: - self._handle_unknown_error(e) + handle_unknown_error(self._printer, e) raise e finally: @@ -412,202 +420,15 @@ class LiteAgent(BaseModel): self._show_logs(formatted_answer) return formatted_answer - def _execute_tool_and_check_finality(self, agent_action: AgentAction) -> ToolResult: - try: - crewai_event_bus.emit( - self, - event=ToolUsageStartedEvent( - agent_key=self.key, - agent_role=self.role, - tool_name=agent_action.tool, - tool_args=agent_action.tool_input, - tool_class=agent_action.tool, - ), - ) - tool_usage = ToolUsage( - agent=self, - tools=self._parsed_tools, - action=agent_action, - tools_handler=None, - task=None, - function_calling_llm=None, - ) - tool_calling = tool_usage.parse_tool_calling(agent_action.text) - - if isinstance(tool_calling, ToolUsageErrorException): - tool_result = tool_calling.message - return ToolResult(result=tool_result, result_as_answer=False) - else: - if tool_calling.tool_name.casefold().strip() in [ - tool.name.casefold().strip() for tool in self._parsed_tools - ] or tool_calling.tool_name.casefold().replace("_", " ") in [ - tool.name.casefold().strip() for tool in self._parsed_tools - ]: - tool_result = tool_usage.use(tool_calling, agent_action.text) - tool = self.tool_name_to_tool_map.get(tool_calling.tool_name) - if tool: - return ToolResult( - result=tool_result, result_as_answer=tool.result_as_answer - ) - else: - tool_result = self.i18n.errors("wrong_tool_name").format( - tool=tool_calling.tool_name, - tools=", ".join( - [tool.name.casefold() for tool in self._parsed_tools] - ), - ) - return ToolResult(result=tool_result, result_as_answer=False) - - except Exception as e: - crewai_event_bus.emit( - self, - event=ToolUsageErrorEvent( - agent_key=self.key, - agent_role=self.role, - tool_name=agent_action.tool, - tool_args=agent_action.tool_input, - tool_class=agent_action.tool, - error=str(e), - ), - ) - raise e - - def _handle_agent_action( - self, formatted_answer: AgentAction, tool_result: ToolResult - ) -> Union[AgentAction, AgentFinish]: - """Handle the AgentAction, execute tools, and process the results.""" - - formatted_answer.text += f"\nObservation: {tool_result.result}" - formatted_answer.result = tool_result.result - - if tool_result.result_as_answer: - return AgentFinish( - thought="", - output=tool_result.result, - text=formatted_answer.text, - ) - - self._show_logs(formatted_answer) - return formatted_answer - def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]): - if self.verbose: - agent_role = self.role.split("\n")[0] - if isinstance(formatted_answer, AgentAction): - thought = re.sub(r"\n+", "\n", formatted_answer.thought) - formatted_json = json.dumps( - formatted_answer.tool_input, - indent=2, - ensure_ascii=False, - ) - self._printer.print( - content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m" - ) - if thought and thought != "": - self._printer.print( - content=f"\033[95m## Thought:\033[00m \033[92m{thought}\033[00m" - ) - self._printer.print( - content=f"\033[95m## Using tool:\033[00m \033[92m{formatted_answer.tool}\033[00m" - ) - self._printer.print( - content=f"\033[95m## Tool Input:\033[00m \033[92m\n{formatted_json}\033[00m" - ) - self._printer.print( - content=f"\033[95m## Tool Output:\033[00m \033[92m\n{formatted_answer.result}\033[00m" - ) - elif isinstance(formatted_answer, AgentFinish): - self._printer.print( - content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m" - ) - self._printer.print( - content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m\n\n" - ) + """Show logs for the agent's execution.""" + show_agent_logs( + printer=self._printer, + agent_role=self.role, + formatted_answer=formatted_answer, + verbose=self.verbose, + ) def _append_message(self, text: str, role: str = "assistant") -> None: """Append a message to the message list with the given role.""" self._messages.append(format_message_for_llm(text, role=role)) - - def _handle_output_parser_exception(self, e: OutputParserException) -> AgentAction: - """Handle OutputParserException by updating messages and formatted_answer.""" - self._messages.append({"role": "user", "content": e.error}) - - formatted_answer = AgentAction( - text=e.error, - tool="", - tool_input="", - thought="", - ) - - MAX_ITERATIONS = 3 - if self._iterations > MAX_ITERATIONS: - self._printer.print( - content=f"Error parsing LLM output, agent will retry: {e.error}", - color="red", - ) - - return formatted_answer - - def _is_context_length_exceeded(self, exception: Exception) -> bool: - """Check if the exception is due to context length exceeding.""" - return LLMContextLengthExceededException( - str(exception) - )._is_context_limit_error(str(exception)) - - def _handle_context_length(self) -> None: - if self.respect_context_window: - self._printer.print( - content="Context length exceeded. Summarizing content to fit the model context window.", - color="yellow", - ) - self._summarize_messages() - else: - self._printer.print( - content="Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.", - color="red", - ) - raise SystemExit( - "Context length exceeded and user opted not to summarize. Consider using smaller text or RAG tools from crewai_tools." - ) - - def _summarize_messages(self) -> None: - messages_groups = [] - for message in self.messages: - content = message["content"] - cut_size = cast(LLM, self.llm).get_context_window_size() - for i in range(0, len(content), cut_size): - messages_groups.append(content[i : i + cut_size]) - - summarized_contents = [] - for group in messages_groups: - summary = cast(LLM, self.llm).call( - [ - format_message_for_llm( - self.i18n.slice("summarizer_system_message"), role="system" - ), - format_message_for_llm( - self.i18n.slice("summarize_instruction").format(group=group), - ), - ], - callbacks=self.callbacks, - ) - summarized_contents.append(summary) - - merged_summary = " ".join(str(content) for content in summarized_contents) - - self.messages = [ - format_message_for_llm( - self.i18n.slice("summary").format(merged_summary=merged_summary) - ) - ] - - def _handle_unknown_error(self, exception: Exception) -> None: - """Handle unknown errors by informing the user.""" - self._printer.print( - content="An unknown error occurred. Please check the details below.", - color="red", - ) - self._printer.print( - content=f"Error details: {exception}", - color="red", - ) diff --git a/src/crewai/tools/tool_types.py b/src/crewai/tools/tool_types.py new file mode 100644 index 000000000..3e37fed2f --- /dev/null +++ b/src/crewai/tools/tool_types.py @@ -0,0 +1,9 @@ +from dataclasses import dataclass + + +@dataclass +class ToolResult: + """Result of tool execution.""" + + result: str + result_as_answer: bool = False diff --git a/src/crewai/tools/tool_usage.py b/src/crewai/tools/tool_usage.py index 29d502876..c881364da 100644 --- a/src/crewai/tools/tool_usage.py +++ b/src/crewai/tools/tool_usage.py @@ -2,6 +2,7 @@ import ast import datetime import json import time +from dataclasses import dataclass from difflib import SequenceMatcher from json import JSONDecodeError from textwrap import dedent @@ -13,10 +14,9 @@ from json_repair import repair_json from crewai.agents.tools_handler import ToolsHandler from crewai.task import Task from crewai.telemetry import Telemetry -from crewai.tools import BaseTool from crewai.tools.structured_tool import CrewStructuredTool from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling -from crewai.utilities import I18N, Converter, ConverterError, Printer +from crewai.utilities import I18N, Converter, Printer from crewai.utilities.agent_utils import ( get_tool_names, render_text_description_and_args, @@ -52,6 +52,14 @@ class ToolUsageErrorException(Exception): super().__init__(self.message) +@dataclass +class ToolResult: + """Result of tool execution.""" + + result: str + result_as_answer: bool = False + + class ToolUsage: """ Class that represents the usage of a tool by an agent. @@ -72,10 +80,10 @@ class ToolUsage: tools: List[CrewStructuredTool], task: Optional[Task], function_calling_llm: Any, - agent: Union["BaseAgent", "LiteAgent"], - action: Any, + agent: Optional[Union["BaseAgent", "LiteAgent"]] = None, + action: Any = None, ) -> None: - self._i18n: I18N = agent.i18n + self._i18n: I18N = agent.i18n if agent else I18N() self._printer: Printer = Printer() self._telemetry: Telemetry = Telemetry() self._run_attempts: int = 1 @@ -107,7 +115,7 @@ class ToolUsage: ) -> str: if isinstance(calling, ToolUsageErrorException): error = calling.message - if self.agent.verbose: + if self.agent and self.agent.verbose: self._printer.print(content=f"\n\n{error}\n", color="red") if self.task: self.task.increment_tools_errors() @@ -119,7 +127,7 @@ class ToolUsage: error = getattr(e, "message", str(e)) if self.task: self.task.increment_tools_errors() - if self.agent.verbose: + if self.agent and self.agent.verbose: self._printer.print(content=f"\n\n{error}\n", color="red") return error @@ -132,7 +140,7 @@ class ToolUsage: error = getattr(e, "message", str(e)) if self.task: self.task.increment_tools_errors() - if self.agent.verbose: + if self.agent and self.agent.verbose: self._printer.print(content=f"\n\n{error}\n", color="red") return error @@ -220,7 +228,7 @@ class ToolUsage: ).message if self.task: self.task.increment_tools_errors() - if self.agent.verbose: + if self.agent and self.agent.verbose: self._printer.print( content=f"\n\n{error_message}\n", color="red" ) @@ -269,7 +277,7 @@ class ToolUsage: result_as_answer = available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "result_as_answer" data["result_as_answer"] = result_as_answer - if hasattr(self.agent, "tools_results"): + if self.agent and hasattr(self.agent, "tools_results"): self.agent.tools_results.append(data) return result # type: ignore # No return value expected @@ -323,8 +331,8 @@ class ToolUsage: if self.task: self.task.increment_tools_errors() tool_selection_data = { - "agent_key": self.agent.key, - "agent_role": self.agent.role, + "agent_key": getattr(self.agent, "key", None) if self.agent else None, + "agent_role": getattr(self.agent, "role", None) if self.agent else None, "tool_name": tool_name, "tool_args": {}, "tool_class": self.tools_description, @@ -357,7 +365,9 @@ class ToolUsage: descriptions.append(tool.description) return "\n--\n".join(descriptions) - def _function_calling(self, tool_string: str): + def _function_calling( + self, tool_string: str + ) -> Union[ToolCalling, InstructorToolCalling]: model = ( InstructorToolCalling if self.function_calling_llm.supports_function_calling() @@ -379,16 +389,10 @@ class ToolUsage: max_attempts=1, ) tool_object = converter.to_pydantic() - calling = ToolCalling( - tool_name=tool_object["tool_name"], - arguments=tool_object["arguments"], - log=tool_string, # type: ignore - ) + if not isinstance(tool_object, (ToolCalling, InstructorToolCalling)): + raise ToolUsageErrorException("Failed to parse tool calling") - if isinstance(calling, ConverterError): - raise calling - - return calling + return tool_object def _original_tool_calling( self, tool_string: str, raise_error: bool = False @@ -436,7 +440,7 @@ class ToolUsage: self._telemetry.tool_usage_error(llm=self.function_calling_llm) if self.task: self.task.increment_tools_errors() - if self.agent.verbose: + if self.agent and self.agent.verbose: self._printer.print(content=f"\n\n{e}\n", color="red") return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling") f'{self._i18n.errors("tool_usage_error").format(error=e)}\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}' @@ -499,8 +503,8 @@ class ToolUsage: def _emit_validate_input_error(self, final_error: str): tool_selection_data = { - "agent_key": self.agent.key, - "agent_role": self.agent.role, + "agent_key": getattr(self.agent, "key", None) if self.agent else None, + "agent_role": getattr(self.agent, "role", None) if self.agent else None, "tool_name": self.action.tool, "tool_args": str(self.action.tool_input), "tool_class": self.__class__.__name__, @@ -541,12 +545,21 @@ class ToolUsage: def _prepare_event_data( self, tool: Any, tool_calling: Union[ToolCalling, InstructorToolCalling] ) -> dict: - return { - "agent_key": self.agent.key, - "agent_role": (self.agent._original_role or self.agent.role), + event_data = { "run_attempts": self._run_attempts, "delegations": self.task.delegations if self.task else 0, "tool_name": tool.name, "tool_args": tool_calling.arguments, "tool_class": tool.__class__.__name__, + "agent_key": ( + getattr(self.agent, "key", "unknown") if self.agent else "unknown" + ), + "agent_role": ( + getattr(self.agent, "_original_role", None) + or getattr(self.agent, "role", "unknown") + if self.agent + else "unknown" + ), } + + return event_data diff --git a/src/crewai/utilities/agent_utils.py b/src/crewai/utilities/agent_utils.py index 4fdbb807c..a3b028ade 100644 --- a/src/crewai/utilities/agent_utils.py +++ b/src/crewai/utilities/agent_utils.py @@ -1,3 +1,5 @@ +import json +import re from typing import Any, Callable, Dict, List, Optional, Sequence, Union from crewai.agents.parser import ( @@ -11,8 +13,17 @@ from crewai.llm import LLM from crewai.tools import BaseTool as CrewAITool from crewai.tools.base_tool import BaseTool from crewai.tools.structured_tool import CrewStructuredTool -from crewai.utilities.i18n import I18N -from crewai.utilities.printer import Printer +from crewai.tools.tool_types import ToolResult +from crewai.utilities import I18N, Printer +from crewai.utilities.events import ( + ToolUsageErrorEvent, + ToolUsageStartedEvent, + crewai_event_bus, +) +from crewai.utilities.events.tool_usage_events import ToolUsageStartedEvent +from crewai.utilities.exceptions.context_window_exceeding_exception import ( + LLMContextLengthExceededException, +) def parse_tools(tools: List[BaseTool]) -> List[CrewStructuredTool]: @@ -169,3 +180,254 @@ def process_llm_response( answer = answer.split("Observation:")[0].strip() return format_answer(answer) + + +def handle_agent_action_core( + formatted_answer: AgentAction, + tool_result: ToolResult, + messages: Optional[List[Dict[str, str]]] = None, + step_callback: Optional[Callable] = None, + show_logs: Optional[Callable] = None, +) -> Union[AgentAction, AgentFinish]: + """Core logic for handling agent actions and tool results. + + Args: + formatted_answer: The agent's action + tool_result: The result of executing the tool + messages: Optional list of messages to append results to + step_callback: Optional callback to execute after processing + show_logs: Optional function to show logs + + Returns: + Either an AgentAction or AgentFinish + """ + if step_callback: + step_callback(tool_result) + + formatted_answer.text += f"\nObservation: {tool_result.result}" + formatted_answer.result = tool_result.result + + if tool_result.result_as_answer: + return AgentFinish( + thought="", + output=tool_result.result, + text=formatted_answer.text, + ) + + if show_logs: + show_logs(formatted_answer) + + if messages is not None: + messages.append({"role": "assistant", "content": tool_result.result}) + + return formatted_answer + + +def handle_unknown_error(printer: Any, exception: Exception) -> None: + """Handle unknown errors by informing the user. + + Args: + printer: Printer instance for output + exception: The exception that occurred + """ + printer.print( + content="An unknown error occurred. Please check the details below.", + color="red", + ) + printer.print( + content=f"Error details: {exception}", + color="red", + ) + + +def handle_output_parser_exception( + e: OutputParserException, + messages: List[Dict[str, str]], + iterations: int, + log_error_after: int = 3, + printer: Optional[Any] = None, +) -> AgentAction: + """Handle OutputParserException by updating messages and formatted_answer. + + Args: + e: The OutputParserException that occurred + messages: List of messages to append to + iterations: Current iteration count + log_error_after: Number of iterations after which to log errors + printer: Optional printer instance for logging + + Returns: + AgentAction: A formatted answer with the error + """ + messages.append({"role": "user", "content": e.error}) + + formatted_answer = AgentAction( + text=e.error, + tool="", + tool_input="", + thought="", + ) + + if iterations > log_error_after and printer: + printer.print( + content=f"Error parsing LLM output, agent will retry: {e.error}", + color="red", + ) + + return formatted_answer + + +def is_context_length_exceeded(exception: Exception) -> bool: + """Check if the exception is due to context length exceeding. + + Args: + exception: The exception to check + + Returns: + bool: True if the exception is due to context length exceeding + """ + return LLMContextLengthExceededException(str(exception))._is_context_limit_error( + str(exception) + ) + + +def handle_context_length( + respect_context_window: bool, + printer: Any, + messages: List[Dict[str, str]], + llm: Any, + callbacks: List[Any], + i18n: Any, +) -> None: + """Handle context length exceeded by either summarizing or raising an error. + + Args: + respect_context_window: Whether to respect context window + printer: Printer instance for output + messages: List of messages to summarize + llm: LLM instance for summarization + callbacks: List of callbacks for LLM + i18n: I18N instance for messages + """ + if respect_context_window: + printer.print( + content="Context length exceeded. Summarizing content to fit the model context window.", + color="yellow", + ) + summarize_messages(messages, llm, callbacks, i18n) + else: + printer.print( + content="Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.", + color="red", + ) + raise SystemExit( + "Context length exceeded and user opted not to summarize. Consider using smaller text or RAG tools from crewai_tools." + ) + + +def summarize_messages( + messages: List[Dict[str, str]], + llm: Any, + callbacks: List[Any], + i18n: Any, +) -> None: + """Summarize messages to fit within context window. + + Args: + messages: List of messages to summarize + llm: LLM instance for summarization + callbacks: List of callbacks for LLM + i18n: I18N instance for messages + """ + messages_groups = [] + for message in messages: + content = message["content"] + cut_size = llm.get_context_window_size() + for i in range(0, len(content), cut_size): + messages_groups.append({"content": content[i : i + cut_size]}) + + summarized_contents = [] + for group in messages_groups: + summary = llm.call( + [ + format_message_for_llm( + i18n.slice("summarizer_system_message"), role="system" + ), + format_message_for_llm( + i18n.slice("summarize_instruction").format(group=group["content"]), + ), + ], + callbacks=callbacks, + ) + summarized_contents.append({"content": str(summary)}) + + merged_summary = " ".join(content["content"] for content in summarized_contents) + + messages.clear() + messages.append( + format_message_for_llm( + i18n.slice("summary").format(merged_summary=merged_summary) + ) + ) + + +def show_agent_logs( + printer: Printer, + agent_role: str, + formatted_answer: Optional[Union[AgentAction, AgentFinish]] = None, + task_description: Optional[str] = None, + verbose: bool = False, +) -> None: + """Show agent logs for both start and execution states. + + Args: + printer: Printer instance for output + agent_role: Role of the agent + formatted_answer: Optional AgentAction or AgentFinish for execution logs + task_description: Optional task description for start logs + verbose: Whether to show verbose output + """ + if not verbose: + return + + agent_role = agent_role.split("\n")[0] + + if formatted_answer is None: + # Start logs + printer.print( + content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m" + ) + if task_description: + printer.print( + content=f"\033[95m## Task:\033[00m \033[92m{task_description}\033[00m" + ) + else: + # Execution logs + printer.print( + content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m" + ) + + if isinstance(formatted_answer, AgentAction): + thought = re.sub(r"\n+", "\n", formatted_answer.thought) + formatted_json = json.dumps( + formatted_answer.tool_input, + indent=2, + ensure_ascii=False, + ) + if thought and thought != "": + printer.print( + content=f"\033[95m## Thought:\033[00m \033[92m{thought}\033[00m" + ) + printer.print( + content=f"\033[95m## Using tool:\033[00m \033[92m{formatted_answer.tool}\033[00m" + ) + printer.print( + content=f"\033[95m## Tool Input:\033[00m \033[92m\n{formatted_json}\033[00m" + ) + printer.print( + content=f"\033[95m## Tool Output:\033[00m \033[92m\n{formatted_answer.result}\033[00m" + ) + elif isinstance(formatted_answer, AgentFinish): + printer.print( + content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m\n\n" + ) diff --git a/src/crewai/utilities/tool_utils.py b/src/crewai/utilities/tool_utils.py new file mode 100644 index 000000000..812ed3bf2 --- /dev/null +++ b/src/crewai/utilities/tool_utils.py @@ -0,0 +1,107 @@ +from typing import Any, List, Optional + +from crewai.agents.parser import AgentAction +from crewai.tools.structured_tool import CrewStructuredTool +from crewai.tools.tool_types import ToolResult +from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException +from crewai.utilities.events import crewai_event_bus +from crewai.utilities.events.tool_usage_events import ( + ToolUsageErrorEvent, + ToolUsageStartedEvent, +) +from crewai.utilities.i18n import I18N + + +def execute_tool_and_check_finality( + agent_action: AgentAction, + tools: List[CrewStructuredTool], + i18n: I18N, + agent_key: Optional[str] = None, + agent_role: Optional[str] = None, + tools_handler: Optional[Any] = None, + task: Optional[Any] = None, + agent: Optional[Any] = None, + function_calling_llm: Optional[Any] = None, +) -> ToolResult: + """Execute a tool and check if the result should be treated as a final answer. + + Args: + agent_action: The action containing the tool to execute + tools: List of available tools + i18n: Internationalization settings + agent_key: Optional key for event emission + agent_role: Optional role for event emission + tools_handler: Optional tools handler for tool execution + task: Optional task for tool execution + agent: Optional agent instance for tool execution + function_calling_llm: Optional LLM for function calling + + Returns: + ToolResult containing the execution result and whether it should be treated as a final answer + """ + try: + # Create tool name to tool map + tool_name_to_tool_map = {tool.name: tool for tool in tools} + + # Emit tool usage event if agent info is available + if agent_key and agent_role and agent: + crewai_event_bus.emit( + agent, + event=ToolUsageStartedEvent( + agent_key=agent_key, + agent_role=agent_role, + tool_name=agent_action.tool, + tool_args=agent_action.tool_input, + tool_class=agent_action.tool, + ), + ) + + # Create tool usage instance + tool_usage = ToolUsage( + tools_handler=tools_handler, + tools=tools, + function_calling_llm=function_calling_llm, + task=task, + agent=agent, + action=agent_action, + ) + + # Parse tool calling + tool_calling = tool_usage.parse_tool_calling(agent_action.text) + + if isinstance(tool_calling, ToolUsageErrorException): + return ToolResult(tool_calling.message, False) + + # Check if tool name matches + if tool_calling.tool_name.casefold().strip() in [ + name.casefold().strip() for name in tool_name_to_tool_map + ] or tool_calling.tool_name.casefold().replace("_", " ") in [ + name.casefold().strip() for name in tool_name_to_tool_map + ]: + tool_result = tool_usage.use(tool_calling, agent_action.text) + tool = tool_name_to_tool_map.get(tool_calling.tool_name) + if tool: + return ToolResult(tool_result, tool.result_as_answer) + + # Handle invalid tool name + tool_result = i18n.errors("wrong_tool_name").format( + tool=tool_calling.tool_name, + tools=", ".join([tool.name.casefold() for tool in tools]), + ) + return ToolResult(tool_result, False) + + except Exception as e: + # Emit error event if agent info is available + if agent_key and agent_role and agent: + crewai_event_bus.emit( + agent, + event=ToolUsageErrorEvent( + agent_key=agent_key, + agent_role=agent_role, + tool_name=agent_action.tool, + tool_args=agent_action.tool_input, + tool_class=agent_action.tool, + error=str(e), + ), + ) + raise e