Compare commits

...

6 Commits

Author SHA1 Message Date
Devin AI
33da3e1797 feat: use force_final_answer prompt on timeout
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-09 23:45:32 +00:00
João Moura
495baeaf38 Merge branch 'main' into feature/max-execution-time 2025-02-09 20:30:53 -03:00
hafsatariq18
80a5018f6a Enhance task execution timeout handling with better error messages and resource cleanup 2025-02-04 14:27:31 +05:30
hafsatariq18
447fbec6f9 test: add task_max_execution_time test cases 2025-02-03 17:26:00 +05:30
hafsatariq18
b1f277cc3a chore: Add timeout-decorator dependency 2025-02-03 16:37:44 +05:30
hafsatariq18
1411c8c794 feat: Add max_execution_time support for agent tasks 2025-02-03 16:25:43 +05:30
3 changed files with 247 additions and 0 deletions

View File

@@ -13,6 +13,7 @@ dependencies = [
"openai>=1.13.3",
"litellm==1.60.2",
"instructor>=1.3.3",
"timeout-decorator>=0.5.0",
# Text Processing
"pdfplumber>=0.11.4",
"regex>=2024.9.11",

View File

@@ -1,9 +1,11 @@
import re
import shutil
import subprocess
import threading
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
import timeout_decorator
from crewai.agents import CacheHandler
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -64,6 +66,7 @@ class Agent(BaseAgent):
"""
_times_executed: int = PrivateAttr(default=0)
_have_forced_answer: bool = PrivateAttr(default=False)
max_execution_time: Optional[int] = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
@@ -159,6 +162,77 @@ class Agent(BaseAgent):
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
def _execute_with_timeout(
self,
task: Task,
context: Optional[str],
tools: Optional[List[BaseTool]],
timeout: int
) -> str:
"""Execute task with timeout using thread-based timeout.
Args:
task: The task to execute
context: Optional context for the task
tools: Optional list of tools to use
timeout: Maximum execution time in seconds (must be > 0)
Returns:
The result of the task execution, with force_final_answer prompt appended on timeout
Raises:
ValueError: If timeout is not a positive integer
Exception: Any error that occurs during execution
"""
# Validate timeout before creating any resources
if not isinstance(timeout, int) or timeout <= 0:
raise ValueError("Timeout must be a positive integer greater than zero")
completion_event: threading.Event = threading.Event()
result_container: List[Optional[str]] = [None]
error_container: List[Optional[Exception]] = [None]
def target() -> None:
try:
result_container[0] = self._execute_task_without_timeout(task, context, tools)
except Exception as e:
error_container[0] = e
finally:
completion_event.set()
thread: threading.Thread = threading.Thread(target=target)
thread.daemon = True # Ensures thread doesn't prevent program exit
thread.start()
# Wait for either completion or timeout
completed: bool = completion_event.wait(timeout=timeout)
if not completed:
self._logger.log("warning", f"Task execution timed out after {timeout} seconds")
thread.join(timeout=0.1)
# Clean up resources
if hasattr(self, 'agent_executor') and self.agent_executor:
self.agent_executor.llm = None # Release LLM resources
if hasattr(self.agent_executor, 'close'):
self.agent_executor.close()
# Force final answer using the prompt
self._have_forced_answer = True
forced_answer = self.i18n.errors("force_final_answer")
return f"{result_container[0] if result_container[0] else ''}\n{forced_answer}"
if error_container[0]:
error = error_container[0]
self._logger.log("error", f"Task execution failed: {str(error)}")
raise error
if result_container[0] is None:
self._logger.log("warning", "Task execution completed but returned no result")
raise timeout_decorator.TimeoutError("Task execution completed but returned no result") # This is a different kind of failure than timeout
return result_container[0]
def execute_task(
self,
task: Task,
@@ -174,7 +248,42 @@ class Agent(BaseAgent):
Returns:
Output of the agent
Raises:
TimeoutError: If the task execution exceeds max_execution_time (if set)
Exception: For other execution errors
"""
if self.max_execution_time is None:
return self._execute_task_without_timeout(task, context, tools)
original_llm_timeout = getattr(self.llm, 'timeout', None)
try:
if hasattr(self.llm, 'timeout'):
self.llm.timeout = self.max_execution_time
result = self._execute_with_timeout(task, context, tools, self.max_execution_time)
if self._have_forced_answer:
self._logger.log("warning", f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds. Using forced answer.")
return result
except timeout_decorator.TimeoutError:
# This is a different kind of failure (e.g., no result at all)
error_msg = (
f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds "
f"and produced no result. Consider increasing max_execution_time or optimizing the task."
)
self._logger.log("error", error_msg)
raise TimeoutError(error_msg)
finally:
if original_llm_timeout is not None and hasattr(self.llm, 'timeout'):
self.llm.timeout = original_llm_timeout
def _execute_task_without_timeout(
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute task without timeout - contains the original execute_task logic."""
if self.tools_handler:
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")

View File

@@ -13,6 +13,7 @@ from crewai import Agent, Crew, Process, Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.converter import Converter
from concurrent.futures import TimeoutError as FuturesTimeoutError
def test_task_tool_reflect_agent_tools():
@@ -1283,3 +1284,139 @@ def test_interpolate_valid_types():
assert parsed["optional"] is None
assert parsed["nested"]["flag"] is True
assert parsed["nested"]["empty"] is None
def test_task_completes_within_max_execution_time():
"""Test task completes successfully within specified timeout"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=100 # Ample time for completion
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
with patch.object(Agent, '_execute_task_without_timeout', return_value="Success") as mock_execute:
result = task.execute_sync(agent=researcher)
assert result.raw == "Success"
mock_execute.assert_called_once()
def test_task_exceeds_max_execution_time():
"""Test task raises TimeoutError when exceeding max execution time"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=1 # Very short timeout
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
# Mock the OpenAI API call to avoid authentication
with patch('litellm.completion', side_effect=TimeoutError("Request timed out")):
with pytest.raises(TimeoutError) as excinfo:
task.execute_sync(agent=researcher)
assert "timed out" in str(excinfo.value)
def test_task_no_max_execution_time():
"""Test task executes normally without timeout setting"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=None # No timeout
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
with patch.object(Agent, 'execute_task', return_value="Success") as mock_execute:
result = task.execute_sync(agent=researcher)
assert result.raw == "Success"
mock_execute.assert_called_once()
def test_task_max_execution_time_zero():
"""Test immediate timeout with max_execution_time=0"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=1 # Set to minimum valid value
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
# Simulate immediate timeout using FuturesTimeoutError
with patch('concurrent.futures.ThreadPoolExecutor') as mock_executor:
mock_future = MagicMock()
mock_future.result.side_effect = FuturesTimeoutError()
mock_executor.return_value.submit.return_value = mock_future
with pytest.raises(TimeoutError) as excinfo:
task.execute_sync(agent=researcher)
assert "timed out after 1 seconds" in str(excinfo.value)
def test_task_force_final_answer_on_timeout():
"""Test that force_final_answer is used when task times out"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=1 # Very short timeout
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
# Mock the task execution to simulate a partial result before timeout
mock_i18n = MagicMock()
mock_i18n.errors.return_value = "MUST give your absolute best final answer"
researcher.i18n = mock_i18n
class MockThread:
def __init__(self, target, *args, **kwargs):
self.target = target
self.daemon = kwargs.get('daemon', False)
self.args = args
self.kwargs = kwargs
def start(self):
# Execute the target function to set the result
self.target()
def join(self, timeout=None):
pass
def mock_thread(*args, **kwargs):
return MockThread(*args, **kwargs)
with patch('threading.Thread', side_effect=mock_thread), \
patch('threading.Event.wait', return_value=False), \
patch('litellm.completion'), \
patch.object(Agent, '_execute_task_without_timeout', return_value="Partial result"):
result = task.execute_sync(agent=researcher)
assert "MUST give your absolute best final answer" in result.raw
assert "Partial result" in result.raw # Should include partial result