Fix issue #2379: Implement timeout mechanism for max_execution_time

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-03-16 22:35:58 +00:00
parent 24f1a19310
commit 91675a4298
6 changed files with 182 additions and 42 deletions

View File

@@ -1,3 +1,4 @@
import concurrent.futures
import re
import shutil
import subprocess
@@ -234,48 +235,118 @@ class Agent(BaseAgent):
else:
task_prompt = self._use_trained_data(task_prompt=task_prompt)
try:
crewai_event_bus.emit(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
)
result = self.agent_executor.invoke(
{
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
"ask_for_human_input": task.human_input,
}
)["output"]
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
result = self.execute_task(task, context, tools)
# Prepare the invoke parameters
invoke_params = {
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
"ask_for_human_input": task.human_input,
}
# Emit the execution started event
crewai_event_bus.emit(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
)
# If max_execution_time is set, use ThreadPoolExecutor with timeout
if self.max_execution_time is not None:
try:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(self.agent_executor.invoke, invoke_params)
try:
result = future.result(timeout=self.max_execution_time)["output"]
except concurrent.futures.TimeoutError:
# Cancel the future to stop the execution
future.cancel()
# Define the timeout error message
error_message = f"Agent execution exceeded maximum time of {self.max_execution_time} seconds"
# Emit the timeout error event
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=error_message,
),
)
# Raise a standard Exception with the timeout message
# This avoids circular import issues while still providing the expected error message
raise Exception(f"Timeout Error: {error_message}")
except Exception as e:
# Re-raise any exceptions
if "Timeout Error:" in str(e):
raise
# For other exceptions, follow the normal retry logic
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
return self.execute_task(task, context, tools)
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
result = self.execute_task(task, context, tools)
else:
# No timeout, execute normally
try:
result = self.agent_executor.invoke(invoke_params)["output"]
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
result = self.execute_task(task, context, tools)
if self.max_rpm and self._rpm_controller:
self._rpm_controller.stop_rpm_counter()

View File

@@ -10,6 +10,7 @@ from .rpm_controller import RPMController
from .exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from .exceptions.agent_execution_timeout_error import AgentExecutionTimeoutError
from .embedding_configurator import EmbeddingConfigurator
__all__ = [
@@ -24,5 +25,6 @@ __all__ = [
"RPMController",
"YamlParser",
"LLMContextLengthExceededException",
"AgentExecutionTimeoutError",
"EmbeddingConfigurator",
]

View File

@@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

View File

@@ -0,0 +1,7 @@
class AgentExecutionTimeoutError(Exception):
"""Exception raised when an agent execution exceeds the maximum allowed time."""
def __init__(self, max_execution_time: int, message: str = None):
self.max_execution_time = max_execution_time
self.message = message or f"Agent execution exceeded maximum allowed time of {max_execution_time} seconds"
super().__init__(self.message)

View File

@@ -0,0 +1 @@
# This file is intentionally left empty to make the directory a Python package

View File

@@ -0,0 +1,58 @@
import time
import pytest
from unittest.mock import patch, MagicMock
from crewai import Agent, Task
def test_agent_max_execution_time():
"""Test that max_execution_time parameter is enforced."""
# Create a simple test function that will be used to simulate a long-running task
def test_timeout():
# Create an agent with a 1-second timeout
with patch('crewai.agent.Agent.create_agent_executor'):
agent = Agent(
role="Test Agent",
goal="Test timeout functionality",
backstory="I am testing the timeout functionality",
max_execution_time=1,
verbose=True
)
# Create a task that will take longer than 1 second
task = Task(
description="Sleep for 5 seconds and then return a result",
expected_output="The result after sleeping",
agent=agent
)
# Mock the agent_executor to simulate a long-running task
mock_executor = MagicMock()
def side_effect(*args, **kwargs):
# Sleep for longer than the timeout to trigger the timeout mechanism
time.sleep(2)
return {"output": "This should never be returned due to timeout"}
mock_executor.invoke.side_effect = side_effect
mock_executor.tools_names = []
mock_executor.tools_description = []
# Replace the agent's executor with our mock
agent.agent_executor = mock_executor
# Mock the event bus to avoid any real event emissions
with patch('crewai.agent.crewai_event_bus'):
# Execute the task and measure the time
start_time = time.time()
# We expect an Exception to be raised due to timeout
with pytest.raises(Exception) as excinfo:
agent.execute_task(task)
# Check that the execution time is close to 1 second (the timeout)
execution_time = time.time() - start_time
assert execution_time <= 2.1, f"Execution took {execution_time:.2f} seconds, expected ~1 second"
# Check that the exception message mentions timeout
assert "timeout" in str(excinfo.value).lower() or "execution time" in str(excinfo.value).lower()
# Run the test function
test_timeout()