Compare commits

...

4 Commits

Author SHA1 Message Date
Devin AI
6fa8f3b73f Refactor timeout mechanism to improve code quality and error handling
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-16 22:51:34 +00:00
Devin AI
4487d2538a Fix import sorting in test file
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-16 22:40:53 +00:00
Devin AI
88c5a0dd01 Fix linting and type checking issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-16 22:37:48 +00:00
Devin AI
91675a4298 Fix issue #2379: Implement timeout mechanism for max_execution_time
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-16 22:35:58 +00:00
6 changed files with 268 additions and 42 deletions

View File

@@ -1,7 +1,8 @@
import concurrent.futures
import re
import shutil
import subprocess
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from typing import Any, Dict, List, Literal, NoReturn, Optional, Sequence, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -17,7 +18,7 @@ from crewai.security import Fingerprint
from crewai.task import Task
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Converter, Prompts
from crewai.utilities import AgentExecutionTimeoutError, Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
from crewai.utilities.events.agent_events import (
@@ -234,48 +235,39 @@ class Agent(BaseAgent):
else:
task_prompt = self._use_trained_data(task_prompt=task_prompt)
# Prepare the invoke parameters
invoke_params = {
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
"ask_for_human_input": task.human_input,
}
# Emit the execution started event
crewai_event_bus.emit(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
)
# Execute with or without timeout based on configuration
try:
crewai_event_bus.emit(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
)
result = self.agent_executor.invoke(
{
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
"ask_for_human_input": task.human_input,
}
)["output"]
if self.max_execution_time is not None:
result = self._execute_with_timeout(invoke_params, task, context, tools)
else:
# No timeout, execute normally
try:
result = self.agent_executor.invoke(invoke_params)["output"]
except Exception as e:
result = self._handle_execution_error(e, task, context, tools)
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
result = self.execute_task(task, context, tools)
# This will catch any unhandled exceptions including timeout errors
# that were re-raised from _execute_with_timeout
raise e
if self.max_rpm and self._rpm_controller:
self._rpm_controller.stop_rpm_counter()
@@ -471,6 +463,105 @@ class Agent(BaseAgent):
def __tools_names(tools) -> str:
return ", ".join([t.name for t in tools])
def _execute_with_timeout(
self,
invoke_params: Dict[str, Any],
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None
) -> str:
"""Execute a task with timeout handling.
Args:
invoke_params: Parameters for agent executor invocation
task: Task to execute
context: Task context
tools: Available tools
Returns:
Output of the agent execution
"""
try:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(self.agent_executor.invoke, invoke_params)
try:
return future.result(timeout=self.max_execution_time)["output"]
except concurrent.futures.TimeoutError:
# Cancel the future to stop the execution
future.cancel()
# Create a timeout error with context
error = AgentExecutionTimeoutError(
max_execution_time=self.max_execution_time,
agent_name=self.role,
task_description=task.description if task else None
)
# Emit the timeout error event
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(error),
),
)
# Raise the timeout error
raise error
except AgentExecutionTimeoutError:
# Re-raise timeout errors directly
raise
except Exception as e:
# Handle other exceptions
return self._handle_execution_error(e, task, context, tools)
def _handle_execution_error(
self,
error: Exception,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None
) -> Union[str, NoReturn]:
"""Handle execution errors with consistent logic.
Args:
error: The caught exception
task: Current task being executed
context: Task context
tools: Available tools
Returns:
Retry result or re-raises the exception
"""
# Do not retry on litellm errors
if error.__class__.__module__.startswith("litellm"):
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(error),
),
)
raise error
# For other errors, follow retry logic
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(error),
),
)
raise error
# Retry execution
return self.execute_task(task, context, tools)
def __repr__(self):
return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})"

View File

@@ -10,6 +10,7 @@ from .rpm_controller import RPMController
from .exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from .exceptions.agent_execution_timeout_error import AgentExecutionTimeoutError
from .embedding_configurator import EmbeddingConfigurator
__all__ = [
@@ -24,5 +25,6 @@ __all__ = [
"RPMController",
"YamlParser",
"LLMContextLengthExceededException",
"AgentExecutionTimeoutError",
"EmbeddingConfigurator",
]

View File

@@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

View File

@@ -0,0 +1,24 @@
class AgentExecutionTimeoutError(Exception):
"""Exception raised when an agent execution exceeds the maximum allowed time."""
def __init__(
self,
max_execution_time: int,
agent_name: str | None = None,
task_description: str | None = None,
message: str | None = None
):
self.max_execution_time = max_execution_time
self.agent_name = agent_name
self.task_description = task_description
# Generate a detailed error message if not provided
if not message:
message = f"Agent execution exceeded maximum allowed time of {max_execution_time} seconds"
if agent_name:
message += f" for agent: {agent_name}"
if task_description:
message += f" while executing task: {task_description}"
self.message = message
super().__init__(self.message)

View File

@@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

View File

@@ -0,0 +1,107 @@
import time
from unittest.mock import MagicMock, patch
import pytest
from crewai import Agent, Task
from crewai.utilities import AgentExecutionTimeoutError
def test_agent_max_execution_time():
"""Test that max_execution_time parameter is enforced."""
# Create a simple test function that will be used to simulate a long-running task
def test_timeout():
# Create an agent with a 1-second timeout
with patch('crewai.agent.Agent.create_agent_executor'):
agent = Agent(
role="Test Agent",
goal="Test timeout functionality",
backstory="I am testing the timeout functionality",
max_execution_time=1,
verbose=True
)
# Create a task that will take longer than 1 second
task = Task(
description="Sleep for 5 seconds and then return a result",
expected_output="The result after sleeping",
agent=agent
)
# Mock the agent_executor to simulate a long-running task
mock_executor = MagicMock()
def side_effect(*args, **kwargs):
# Sleep for longer than the timeout to trigger the timeout mechanism
time.sleep(2)
return {"output": "This should never be returned due to timeout"}
mock_executor.invoke.side_effect = side_effect
mock_executor.tools_names = []
mock_executor.tools_description = []
# Replace the agent's executor with our mock
agent.agent_executor = mock_executor
# Mock the event bus to avoid any real event emissions
with patch('crewai.agent.crewai_event_bus'):
# Execute the task and measure the time
start_time = time.time()
# We expect an Exception to be raised due to timeout
with pytest.raises(Exception) as excinfo:
agent.execute_task(task)
# Check that the execution time is close to 1 second (the timeout)
execution_time = time.time() - start_time
assert execution_time <= 2.1, f"Execution took {execution_time:.2f} seconds, expected ~1 second"
# Check that the exception message mentions timeout or execution time
error_message = str(excinfo.value).lower()
assert any(term in error_message for term in ["timeout", "execution time", "exceeded maximum"])
# Run the test function
test_timeout()
def test_agent_timeout_error_message():
"""Test that the timeout error message includes agent and task information."""
# Create an agent with a very short timeout
with patch('crewai.agent.Agent.create_agent_executor'):
agent = Agent(
role="Test Agent",
goal="Test timeout error messaging",
backstory="I am testing the timeout error messaging",
max_execution_time=1, # Short timeout
verbose=True
)
# Create a task
task = Task(
description="This task should timeout quickly",
expected_output="This should never be returned",
agent=agent
)
# Mock the agent_executor
mock_executor = MagicMock()
def side_effect(*args, **kwargs):
# Sleep to trigger timeout
time.sleep(2)
return {"output": "This should never be returned"}
mock_executor.invoke.side_effect = side_effect
mock_executor.tools_names = []
mock_executor.tools_description = []
# Replace the agent's executor
agent.agent_executor = mock_executor
# Execute the task and expect an exception
with patch('crewai.agent.crewai_event_bus'):
with pytest.raises(Exception) as excinfo:
agent.execute_task(task)
# Verify error message contains agent name and task description
error_message = str(excinfo.value).lower()
assert "test agent" in error_message or "agent: test agent" in error_message
assert "this task should timeout" in error_message or "task: this task should timeout" in error_message