Compare commits

...

6 Commits

Author SHA1 Message Date
Devin AI
33da3e1797 feat: use force_final_answer prompt on timeout
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-09 23:45:32 +00:00
João Moura
495baeaf38 Merge branch 'main' into feature/max-execution-time 2025-02-09 20:30:53 -03:00
hafsatariq18
80a5018f6a Enhance task execution timeout handling with better error messages and resource cleanup 2025-02-04 14:27:31 +05:30
hafsatariq18
447fbec6f9 test: add task_max_execution_time test cases 2025-02-03 17:26:00 +05:30
hafsatariq18
b1f277cc3a chore: Add timeout-decorator dependency 2025-02-03 16:37:44 +05:30
hafsatariq18
1411c8c794 feat: Add max_execution_time support for agent tasks 2025-02-03 16:25:43 +05:30
3 changed files with 247 additions and 0 deletions

View File

@@ -13,6 +13,7 @@ dependencies = [
"openai>=1.13.3", "openai>=1.13.3",
"litellm==1.60.2", "litellm==1.60.2",
"instructor>=1.3.3", "instructor>=1.3.3",
"timeout-decorator>=0.5.0",
# Text Processing # Text Processing
"pdfplumber>=0.11.4", "pdfplumber>=0.11.4",
"regex>=2024.9.11", "regex>=2024.9.11",

View File

@@ -1,9 +1,11 @@
import re import re
import shutil import shutil
import subprocess import subprocess
import threading
from typing import Any, Dict, List, Literal, Optional, Sequence, Union from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator from pydantic import Field, InstanceOf, PrivateAttr, model_validator
import timeout_decorator
from crewai.agents import CacheHandler from crewai.agents import CacheHandler
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -64,6 +66,7 @@ class Agent(BaseAgent):
""" """
_times_executed: int = PrivateAttr(default=0) _times_executed: int = PrivateAttr(default=0)
_have_forced_answer: bool = PrivateAttr(default=False)
max_execution_time: Optional[int] = Field( max_execution_time: Optional[int] = Field(
default=None, default=None,
description="Maximum execution time for an agent to execute a task", description="Maximum execution time for an agent to execute a task",
@@ -159,6 +162,77 @@ class Agent(BaseAgent):
except (TypeError, ValueError) as e: except (TypeError, ValueError) as e:
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}") raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
def _execute_with_timeout(
self,
task: Task,
context: Optional[str],
tools: Optional[List[BaseTool]],
timeout: int
) -> str:
"""Execute task with timeout using thread-based timeout.
Args:
task: The task to execute
context: Optional context for the task
tools: Optional list of tools to use
timeout: Maximum execution time in seconds (must be > 0)
Returns:
The result of the task execution, with force_final_answer prompt appended on timeout
Raises:
ValueError: If timeout is not a positive integer
Exception: Any error that occurs during execution
"""
# Validate timeout before creating any resources
if not isinstance(timeout, int) or timeout <= 0:
raise ValueError("Timeout must be a positive integer greater than zero")
completion_event: threading.Event = threading.Event()
result_container: List[Optional[str]] = [None]
error_container: List[Optional[Exception]] = [None]
def target() -> None:
try:
result_container[0] = self._execute_task_without_timeout(task, context, tools)
except Exception as e:
error_container[0] = e
finally:
completion_event.set()
thread: threading.Thread = threading.Thread(target=target)
thread.daemon = True # Ensures thread doesn't prevent program exit
thread.start()
# Wait for either completion or timeout
completed: bool = completion_event.wait(timeout=timeout)
if not completed:
self._logger.log("warning", f"Task execution timed out after {timeout} seconds")
thread.join(timeout=0.1)
# Clean up resources
if hasattr(self, 'agent_executor') and self.agent_executor:
self.agent_executor.llm = None # Release LLM resources
if hasattr(self.agent_executor, 'close'):
self.agent_executor.close()
# Force final answer using the prompt
self._have_forced_answer = True
forced_answer = self.i18n.errors("force_final_answer")
return f"{result_container[0] if result_container[0] else ''}\n{forced_answer}"
if error_container[0]:
error = error_container[0]
self._logger.log("error", f"Task execution failed: {str(error)}")
raise error
if result_container[0] is None:
self._logger.log("warning", "Task execution completed but returned no result")
raise timeout_decorator.TimeoutError("Task execution completed but returned no result") # This is a different kind of failure than timeout
return result_container[0]
def execute_task( def execute_task(
self, self,
task: Task, task: Task,
@@ -174,7 +248,42 @@ class Agent(BaseAgent):
Returns: Returns:
Output of the agent Output of the agent
Raises:
TimeoutError: If the task execution exceeds max_execution_time (if set)
Exception: For other execution errors
""" """
if self.max_execution_time is None:
return self._execute_task_without_timeout(task, context, tools)
original_llm_timeout = getattr(self.llm, 'timeout', None)
try:
if hasattr(self.llm, 'timeout'):
self.llm.timeout = self.max_execution_time
result = self._execute_with_timeout(task, context, tools, self.max_execution_time)
if self._have_forced_answer:
self._logger.log("warning", f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds. Using forced answer.")
return result
except timeout_decorator.TimeoutError:
# This is a different kind of failure (e.g., no result at all)
error_msg = (
f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds "
f"and produced no result. Consider increasing max_execution_time or optimizing the task."
)
self._logger.log("error", error_msg)
raise TimeoutError(error_msg)
finally:
if original_llm_timeout is not None and hasattr(self.llm, 'timeout'):
self.llm.timeout = original_llm_timeout
def _execute_task_without_timeout(
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute task without timeout - contains the original execute_task logic."""
if self.tools_handler: if self.tools_handler:
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling") self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")

View File

@@ -13,6 +13,7 @@ from crewai import Agent, Crew, Process, Task
from crewai.tasks.conditional_task import ConditionalTask from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput from crewai.tasks.task_output import TaskOutput
from crewai.utilities.converter import Converter from crewai.utilities.converter import Converter
from concurrent.futures import TimeoutError as FuturesTimeoutError
def test_task_tool_reflect_agent_tools(): def test_task_tool_reflect_agent_tools():
@@ -1283,3 +1284,139 @@ def test_interpolate_valid_types():
assert parsed["optional"] is None assert parsed["optional"] is None
assert parsed["nested"]["flag"] is True assert parsed["nested"]["flag"] is True
assert parsed["nested"]["empty"] is None assert parsed["nested"]["empty"] is None
def test_task_completes_within_max_execution_time():
"""Test task completes successfully within specified timeout"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=100 # Ample time for completion
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
with patch.object(Agent, '_execute_task_without_timeout', return_value="Success") as mock_execute:
result = task.execute_sync(agent=researcher)
assert result.raw == "Success"
mock_execute.assert_called_once()
def test_task_exceeds_max_execution_time():
"""Test task raises TimeoutError when exceeding max execution time"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=1 # Very short timeout
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
# Mock the OpenAI API call to avoid authentication
with patch('litellm.completion', side_effect=TimeoutError("Request timed out")):
with pytest.raises(TimeoutError) as excinfo:
task.execute_sync(agent=researcher)
assert "timed out" in str(excinfo.value)
def test_task_no_max_execution_time():
"""Test task executes normally without timeout setting"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=None # No timeout
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
with patch.object(Agent, 'execute_task', return_value="Success") as mock_execute:
result = task.execute_sync(agent=researcher)
assert result.raw == "Success"
mock_execute.assert_called_once()
def test_task_max_execution_time_zero():
"""Test immediate timeout with max_execution_time=0"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=1 # Set to minimum valid value
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
# Simulate immediate timeout using FuturesTimeoutError
with patch('concurrent.futures.ThreadPoolExecutor') as mock_executor:
mock_future = MagicMock()
mock_future.result.side_effect = FuturesTimeoutError()
mock_executor.return_value.submit.return_value = mock_future
with pytest.raises(TimeoutError) as excinfo:
task.execute_sync(agent=researcher)
assert "timed out after 1 seconds" in str(excinfo.value)
def test_task_force_final_answer_on_timeout():
"""Test that force_final_answer is used when task times out"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=1 # Very short timeout
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
# Mock the task execution to simulate a partial result before timeout
mock_i18n = MagicMock()
mock_i18n.errors.return_value = "MUST give your absolute best final answer"
researcher.i18n = mock_i18n
class MockThread:
def __init__(self, target, *args, **kwargs):
self.target = target
self.daemon = kwargs.get('daemon', False)
self.args = args
self.kwargs = kwargs
def start(self):
# Execute the target function to set the result
self.target()
def join(self, timeout=None):
pass
def mock_thread(*args, **kwargs):
return MockThread(*args, **kwargs)
with patch('threading.Thread', side_effect=mock_thread), \
patch('threading.Event.wait', return_value=False), \
patch('litellm.completion'), \
patch.object(Agent, '_execute_task_without_timeout', return_value="Partial result"):
result = task.execute_sync(agent=researcher)
assert "MUST give your absolute best final answer" in result.raw
assert "Partial result" in result.raw # Should include partial result