Refactor timeout mechanism to improve code quality and error handling

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-03-16 22:51:34 +00:00
parent 4487d2538a
commit 6fa8f3b73f
3 changed files with 182 additions and 99 deletions

View File

@@ -2,7 +2,7 @@ import concurrent.futures
import re
import shutil
import subprocess
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from typing import Any, Dict, List, Literal, NoReturn, Optional, Sequence, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -18,7 +18,7 @@ from crewai.security import Fingerprint
from crewai.task import Task
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Converter, Prompts
from crewai.utilities import AgentExecutionTimeoutError, Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
from crewai.utilities.events.agent_events import (
@@ -254,99 +254,20 @@ class Agent(BaseAgent):
),
)
# If max_execution_time is set, use ThreadPoolExecutor with timeout
if self.max_execution_time is not None:
try:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(self.agent_executor.invoke, invoke_params)
try:
result = future.result(timeout=self.max_execution_time)["output"]
except concurrent.futures.TimeoutError:
# Cancel the future to stop the execution
future.cancel()
# Define the timeout error message
error_message = f"Agent execution exceeded maximum time of {self.max_execution_time} seconds"
# Emit the timeout error event
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=error_message,
),
)
# Raise a standard Exception with the timeout message
# This avoids circular import issues while still providing the expected error message
raise Exception(f"Timeout Error: {error_message}")
except Exception as e:
# Re-raise any exceptions
if "Timeout Error:" in str(e):
raise
# For other exceptions, follow the normal retry logic
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
return self.execute_task(task, context, tools)
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
result = self.execute_task(task, context, tools)
else:
# No timeout, execute normally
try:
result = self.agent_executor.invoke(invoke_params)["output"]
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
result = self.execute_task(task, context, tools)
# Execute with or without timeout based on configuration
try:
if self.max_execution_time is not None:
result = self._execute_with_timeout(invoke_params, task, context, tools)
else:
# No timeout, execute normally
try:
result = self.agent_executor.invoke(invoke_params)["output"]
except Exception as e:
result = self._handle_execution_error(e, task, context, tools)
except Exception as e:
# This will catch any unhandled exceptions including timeout errors
# that were re-raised from _execute_with_timeout
raise e
if self.max_rpm and self._rpm_controller:
self._rpm_controller.stop_rpm_counter()
@@ -542,6 +463,105 @@ class Agent(BaseAgent):
def __tools_names(tools) -> str:
return ", ".join([t.name for t in tools])
def _execute_with_timeout(
self,
invoke_params: Dict[str, Any],
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None
) -> str:
"""Execute a task with timeout handling.
Args:
invoke_params: Parameters for agent executor invocation
task: Task to execute
context: Task context
tools: Available tools
Returns:
Output of the agent execution
"""
try:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(self.agent_executor.invoke, invoke_params)
try:
return future.result(timeout=self.max_execution_time)["output"]
except concurrent.futures.TimeoutError:
# Cancel the future to stop the execution
future.cancel()
# Create a timeout error with context
error = AgentExecutionTimeoutError(
max_execution_time=self.max_execution_time,
agent_name=self.role,
task_description=task.description if task else None
)
# Emit the timeout error event
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(error),
),
)
# Raise the timeout error
raise error
except AgentExecutionTimeoutError:
# Re-raise timeout errors directly
raise
except Exception as e:
# Handle other exceptions
return self._handle_execution_error(e, task, context, tools)
def _handle_execution_error(
self,
error: Exception,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None
) -> Union[str, NoReturn]:
"""Handle execution errors with consistent logic.
Args:
error: The caught exception
task: Current task being executed
context: Task context
tools: Available tools
Returns:
Retry result or re-raises the exception
"""
# Do not retry on litellm errors
if error.__class__.__module__.startswith("litellm"):
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(error),
),
)
raise error
# For other errors, follow retry logic
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(error),
),
)
raise error
# Retry execution
return self.execute_task(task, context, tools)
def __repr__(self):
return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})"

View File

@@ -1,7 +1,24 @@
class AgentExecutionTimeoutError(Exception):
"""Exception raised when an agent execution exceeds the maximum allowed time."""
def __init__(self, max_execution_time: int, message: str | None = None):
def __init__(
self,
max_execution_time: int,
agent_name: str | None = None,
task_description: str | None = None,
message: str | None = None
):
self.max_execution_time = max_execution_time
self.message = message or f"Agent execution exceeded maximum allowed time of {max_execution_time} seconds"
self.agent_name = agent_name
self.task_description = task_description
# Generate a detailed error message if not provided
if not message:
message = f"Agent execution exceeded maximum allowed time of {max_execution_time} seconds"
if agent_name:
message += f" for agent: {agent_name}"
if task_description:
message += f" while executing task: {task_description}"
self.message = message
super().__init__(self.message)

View File

@@ -4,6 +4,7 @@ from unittest.mock import MagicMock, patch
import pytest
from crewai import Agent, Task
from crewai.utilities import AgentExecutionTimeoutError
def test_agent_max_execution_time():
@@ -54,8 +55,53 @@ def test_agent_max_execution_time():
execution_time = time.time() - start_time
assert execution_time <= 2.1, f"Execution took {execution_time:.2f} seconds, expected ~1 second"
# Check that the exception message mentions timeout
assert "timeout" in str(excinfo.value).lower() or "execution time" in str(excinfo.value).lower()
# Check that the exception message mentions timeout or execution time
error_message = str(excinfo.value).lower()
assert any(term in error_message for term in ["timeout", "execution time", "exceeded maximum"])
# Run the test function
test_timeout()
def test_agent_timeout_error_message():
"""Test that the timeout error message includes agent and task information."""
# Create an agent with a very short timeout
with patch('crewai.agent.Agent.create_agent_executor'):
agent = Agent(
role="Test Agent",
goal="Test timeout error messaging",
backstory="I am testing the timeout error messaging",
max_execution_time=1, # Short timeout
verbose=True
)
# Create a task
task = Task(
description="This task should timeout quickly",
expected_output="This should never be returned",
agent=agent
)
# Mock the agent_executor
mock_executor = MagicMock()
def side_effect(*args, **kwargs):
# Sleep to trigger timeout
time.sleep(2)
return {"output": "This should never be returned"}
mock_executor.invoke.side_effect = side_effect
mock_executor.tools_names = []
mock_executor.tools_description = []
# Replace the agent's executor
agent.agent_executor = mock_executor
# Execute the task and expect an exception
with patch('crewai.agent.crewai_event_bus'):
with pytest.raises(Exception) as excinfo:
agent.execute_task(task)
# Verify error message contains agent name and task description
error_message = str(excinfo.value).lower()
assert "test agent" in error_message or "agent: test agent" in error_message
assert "this task should timeout" in error_message or "task: this task should timeout" in error_message