diff --git a/src/crewai/agent.py b/src/crewai/agent.py index a25fd0ef5..4f954d29b 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -2,7 +2,7 @@ import concurrent.futures import re import shutil import subprocess -from typing import Any, Dict, List, Literal, Optional, Sequence, Union +from typing import Any, Dict, List, Literal, NoReturn, Optional, Sequence, Union from pydantic import Field, InstanceOf, PrivateAttr, model_validator @@ -18,7 +18,7 @@ from crewai.security import Fingerprint from crewai.task import Task from crewai.tools import BaseTool from crewai.tools.agent_tools.agent_tools import AgentTools -from crewai.utilities import Converter, Prompts +from crewai.utilities import AgentExecutionTimeoutError, Converter, Prompts from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE from crewai.utilities.converter import generate_model_description from crewai.utilities.events.agent_events import ( @@ -254,99 +254,20 @@ class Agent(BaseAgent): ), ) - # If max_execution_time is set, use ThreadPoolExecutor with timeout - if self.max_execution_time is not None: - try: - with concurrent.futures.ThreadPoolExecutor() as executor: - future = executor.submit(self.agent_executor.invoke, invoke_params) - try: - result = future.result(timeout=self.max_execution_time)["output"] - except concurrent.futures.TimeoutError: - # Cancel the future to stop the execution - future.cancel() - # Define the timeout error message - error_message = f"Agent execution exceeded maximum time of {self.max_execution_time} seconds" - # Emit the timeout error event - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=error_message, - ), - ) - # Raise a standard Exception with the timeout message - # This avoids circular import issues while still providing the expected error message - raise Exception(f"Timeout Error: {error_message}") - except Exception as e: - # Re-raise any exceptions - if "Timeout Error:" in str(e): - raise - # For other exceptions, follow the normal retry logic - self._times_executed += 1 - if self._times_executed > self.max_retry_limit: - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - return self.execute_task(task, context, tools) - except Exception as e: - if e.__class__.__module__.startswith("litellm"): - # Do not retry on litellm errors - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - self._times_executed += 1 - if self._times_executed > self.max_retry_limit: - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - result = self.execute_task(task, context, tools) - else: - # No timeout, execute normally - try: - result = self.agent_executor.invoke(invoke_params)["output"] - except Exception as e: - if e.__class__.__module__.startswith("litellm"): - # Do not retry on litellm errors - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - self._times_executed += 1 - if self._times_executed > self.max_retry_limit: - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - result = self.execute_task(task, context, tools) + # Execute with or without timeout based on configuration + try: + if self.max_execution_time is not None: + result = self._execute_with_timeout(invoke_params, task, context, tools) + else: + # No timeout, execute normally + try: + result = self.agent_executor.invoke(invoke_params)["output"] + except Exception as e: + result = self._handle_execution_error(e, task, context, tools) + except Exception as e: + # This will catch any unhandled exceptions including timeout errors + # that were re-raised from _execute_with_timeout + raise e if self.max_rpm and self._rpm_controller: self._rpm_controller.stop_rpm_counter() @@ -542,6 +463,105 @@ class Agent(BaseAgent): def __tools_names(tools) -> str: return ", ".join([t.name for t in tools]) + def _execute_with_timeout( + self, + invoke_params: Dict[str, Any], + task: Task, + context: Optional[str] = None, + tools: Optional[List[BaseTool]] = None + ) -> str: + """Execute a task with timeout handling. + + Args: + invoke_params: Parameters for agent executor invocation + task: Task to execute + context: Task context + tools: Available tools + + Returns: + Output of the agent execution + """ + try: + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(self.agent_executor.invoke, invoke_params) + try: + return future.result(timeout=self.max_execution_time)["output"] + except concurrent.futures.TimeoutError: + # Cancel the future to stop the execution + future.cancel() + + # Create a timeout error with context + error = AgentExecutionTimeoutError( + max_execution_time=self.max_execution_time, + agent_name=self.role, + task_description=task.description if task else None + ) + + # Emit the timeout error event + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(error), + ), + ) + + # Raise the timeout error + raise error + except AgentExecutionTimeoutError: + # Re-raise timeout errors directly + raise + except Exception as e: + # Handle other exceptions + return self._handle_execution_error(e, task, context, tools) + + def _handle_execution_error( + self, + error: Exception, + task: Task, + context: Optional[str] = None, + tools: Optional[List[BaseTool]] = None + ) -> Union[str, NoReturn]: + """Handle execution errors with consistent logic. + + Args: + error: The caught exception + task: Current task being executed + context: Task context + tools: Available tools + + Returns: + Retry result or re-raises the exception + """ + # Do not retry on litellm errors + if error.__class__.__module__.startswith("litellm"): + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(error), + ), + ) + raise error + + # For other errors, follow retry logic + self._times_executed += 1 + if self._times_executed > self.max_retry_limit: + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(error), + ), + ) + raise error + + # Retry execution + return self.execute_task(task, context, tools) + def __repr__(self): return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})" diff --git a/src/crewai/utilities/exceptions/agent_execution_timeout_error.py b/src/crewai/utilities/exceptions/agent_execution_timeout_error.py index 8435dbc6f..d92ed2b3c 100644 --- a/src/crewai/utilities/exceptions/agent_execution_timeout_error.py +++ b/src/crewai/utilities/exceptions/agent_execution_timeout_error.py @@ -1,7 +1,24 @@ class AgentExecutionTimeoutError(Exception): """Exception raised when an agent execution exceeds the maximum allowed time.""" - def __init__(self, max_execution_time: int, message: str | None = None): + def __init__( + self, + max_execution_time: int, + agent_name: str | None = None, + task_description: str | None = None, + message: str | None = None + ): self.max_execution_time = max_execution_time - self.message = message or f"Agent execution exceeded maximum allowed time of {max_execution_time} seconds" + self.agent_name = agent_name + self.task_description = task_description + + # Generate a detailed error message if not provided + if not message: + message = f"Agent execution exceeded maximum allowed time of {max_execution_time} seconds" + if agent_name: + message += f" for agent: {agent_name}" + if task_description: + message += f" while executing task: {task_description}" + + self.message = message super().__init__(self.message) diff --git a/tests/test_timeout/test_agent_timeout.py b/tests/test_timeout/test_agent_timeout.py index 967a75925..519b965eb 100644 --- a/tests/test_timeout/test_agent_timeout.py +++ b/tests/test_timeout/test_agent_timeout.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock, patch import pytest from crewai import Agent, Task +from crewai.utilities import AgentExecutionTimeoutError def test_agent_max_execution_time(): @@ -54,8 +55,53 @@ def test_agent_max_execution_time(): execution_time = time.time() - start_time assert execution_time <= 2.1, f"Execution took {execution_time:.2f} seconds, expected ~1 second" - # Check that the exception message mentions timeout - assert "timeout" in str(excinfo.value).lower() or "execution time" in str(excinfo.value).lower() + # Check that the exception message mentions timeout or execution time + error_message = str(excinfo.value).lower() + assert any(term in error_message for term in ["timeout", "execution time", "exceeded maximum"]) # Run the test function test_timeout() + + +def test_agent_timeout_error_message(): + """Test that the timeout error message includes agent and task information.""" + # Create an agent with a very short timeout + with patch('crewai.agent.Agent.create_agent_executor'): + agent = Agent( + role="Test Agent", + goal="Test timeout error messaging", + backstory="I am testing the timeout error messaging", + max_execution_time=1, # Short timeout + verbose=True + ) + + # Create a task + task = Task( + description="This task should timeout quickly", + expected_output="This should never be returned", + agent=agent + ) + + # Mock the agent_executor + mock_executor = MagicMock() + def side_effect(*args, **kwargs): + # Sleep to trigger timeout + time.sleep(2) + return {"output": "This should never be returned"} + + mock_executor.invoke.side_effect = side_effect + mock_executor.tools_names = [] + mock_executor.tools_description = [] + + # Replace the agent's executor + agent.agent_executor = mock_executor + + # Execute the task and expect an exception + with patch('crewai.agent.crewai_event_bus'): + with pytest.raises(Exception) as excinfo: + agent.execute_task(task) + + # Verify error message contains agent name and task description + error_message = str(excinfo.value).lower() + assert "test agent" in error_message or "agent: test agent" in error_message + assert "this task should timeout" in error_message or "task: this task should timeout" in error_message