This PR implements proper timeout handling for the max_execution_time parameter in the Agent class. When an LLM request times out, the system will now stop waiting for a response after the time defined by max_execution_time and either return a partial result or raise a TimeoutError.

Key changes:
- Added ThreadPoolExecutor-based timeout mechanism
- Implemented proper error handling and resource cleanup
- Added comprehensive tests for timeout functionality
- Ensured compatibility with existing agent execution patterns

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-03-31 20:01:21 +00:00
parent 3c24350306
commit 142f3bbc60
2 changed files with 211 additions and 4 deletions

View File

@@ -1,6 +1,8 @@
import concurrent.futures
import re import re
import shutil import shutil
import subprocess import subprocess
import threading
from typing import Any, Dict, List, Literal, Optional, Sequence, Union from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -154,13 +156,13 @@ class Agent(BaseAgent):
except (TypeError, ValueError) as e: except (TypeError, ValueError) as e:
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}") raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
def execute_task( def _execute_task_without_timeout(
self, self,
task: Task, task: Task,
context: Optional[str] = None, context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None, tools: Optional[List[BaseTool]] = None,
) -> str: ) -> str:
"""Execute a task with the agent. """Execute a task with the agent without timeout.
Args: Args:
task: Task to execute. task: Task to execute.
@@ -275,7 +277,7 @@ class Agent(BaseAgent):
), ),
) )
raise e raise e
result = self.execute_task(task, context, tools) result = self._execute_task_without_timeout(task, context, tools)
if self.max_rpm and self._rpm_controller: if self.max_rpm and self._rpm_controller:
self._rpm_controller.stop_rpm_counter() self._rpm_controller.stop_rpm_counter()
@@ -292,6 +294,118 @@ class Agent(BaseAgent):
) )
return result return result
def _execute_with_timeout(
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute a task with a timeout.
Args:
task: Task to execute.
context: Context to execute the task in.
tools: Tools to use for the task.
Returns:
Output of the agent
Raises:
TimeoutError: If the execution exceeds max_execution_time
"""
if not isinstance(self.max_execution_time, int) or self.max_execution_time <= 0:
raise ValueError("max_execution_time must be a positive integer")
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
self._execute_task_without_timeout, task, context, tools
)
try:
return future.result(timeout=self.max_execution_time)
except concurrent.futures.TimeoutError:
future.cancel()
self._logger.log(
"warning",
f"Task execution timed out after {self.max_execution_time} seconds"
)
if hasattr(self, 'agent_executor') and self.agent_executor:
try:
self._logger.log(
"info",
"Requesting final answer due to timeout"
)
force_final_answer_message = {
"role": "assistant",
"content": self.i18n.errors("force_final_answer")
}
if hasattr(self.agent_executor, 'messages'):
self.agent_executor.messages.append(force_final_answer_message)
final_answer = self.agent_executor.llm.call(
self.agent_executor.messages,
callbacks=self.agent_executor.callbacks,
)
if final_answer:
formatted_answer = self.agent_executor._format_answer(final_answer)
if hasattr(formatted_answer, 'output'):
return formatted_answer.output
except Exception as e:
self._logger.log(
"error",
f"Failed to get partial result after timeout: {str(e)}"
)
error_msg = (
f"Task '{task.description}' execution timed out after "
f"{self.max_execution_time} seconds. Consider increasing "
f"max_execution_time or optimizing the task."
)
raise TimeoutError(error_msg)
def execute_task(
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute a task with the agent.
Args:
task: Task to execute.
context: Context to execute the task in.
tools: Tools to use for the task.
Returns:
Output of the agent
Raises:
TimeoutError: If the execution exceeds max_execution_time (if set)
Exception: For other execution errors
"""
self._times_executed = 0
if self.max_execution_time is None:
return self._execute_task_without_timeout(task, context, tools)
try:
return self._execute_with_timeout(task, context, tools)
except TimeoutError as e:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise
def create_agent_executor( def create_agent_executor(
self, tools: Optional[List[BaseTool]] = None, task=None self, tools: Optional[List[BaseTool]] = None, task=None
) -> None: ) -> None:

View File

@@ -1,8 +1,9 @@
"""Test Agent creation and execution basic functionality.""" """Test Agent creation and execution basic functionality."""
import concurrent.futures
import os import os
from unittest import mock from unittest import mock
from unittest.mock import patch from unittest.mock import MagicMock, patch
import pytest import pytest
@@ -546,6 +547,98 @@ def test_agent_moved_on_after_max_iterations():
assert output == "42" assert output == "42"
def test_agent_timeout_with_max_execution_time():
"""Test that an agent with max_execution_time raises a TimeoutError when execution takes too long."""
@tool
def slow_tool() -> str:
"""A tool that takes a long time to execute."""
import time
time.sleep(2) # Sleep for 2 seconds
return "This is a slow response"
with patch.object(Agent, "_execute_task_without_timeout") as mock_execute:
mock_execute.side_effect = concurrent.futures.TimeoutError()
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
max_execution_time=1, # Set timeout to 1 second
allow_delegation=False,
)
task = Task(
description="Use the slow_tool and wait for its response.",
expected_output="The response from the slow tool.",
)
with pytest.raises(TimeoutError):
agent.execute_task(
task=task,
tools=[slow_tool],
)
def test_agent_partial_result_with_timeout():
"""Test that an agent with max_execution_time can return a partial result before timeout."""
@tool
def slow_tool() -> str:
"""A tool that takes a long time to execute."""
import time
time.sleep(0.1) # Just a small delay
return "This is a slow response"
with patch("concurrent.futures.ThreadPoolExecutor.submit") as mock_submit:
mock_future = MagicMock()
mock_future.result.side_effect = concurrent.futures.TimeoutError()
mock_submit.return_value = mock_future
with patch.object(LLM, "call") as mock_llm_call:
mock_llm_call.return_value = "Partial result due to timeout"
with patch.object(CrewAgentExecutor, "_format_answer") as mock_format_answer:
mock_format_answer.return_value = AgentFinish(
thought="",
output="Partial result due to timeout",
text="Partial result due to timeout",
)
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
max_execution_time=1, # Set timeout to 1 second
allow_delegation=False,
)
agent.agent_executor = MagicMock()
agent.agent_executor.messages = []
agent.agent_executor.llm = MagicMock()
agent.agent_executor.llm.call.return_value = "Partial result due to timeout"
agent.agent_executor._format_answer.return_value = AgentFinish(
thought="",
output="Partial result due to timeout",
text="Partial result due to timeout",
)
agent.agent_executor.callbacks = []
task = Task(
description="Use the slow_tool and wait for its response.",
expected_output="The response from the slow tool.",
)
try:
result = agent.execute_task(
task=task,
tools=[slow_tool],
)
assert "Partial result" in result
except Exception as e:
assert isinstance(e, TimeoutError)
@pytest.mark.vcr(filter_headers=["authorization"]) @pytest.mark.vcr(filter_headers=["authorization"])
def test_agent_respect_the_max_rpm_set(capsys): def test_agent_respect_the_max_rpm_set(capsys):
@tool @tool