mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 12:28:30 +00:00
Compare commits
6 Commits
devin/1760
...
pr-2024
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
33da3e1797 | ||
|
|
495baeaf38 | ||
|
|
80a5018f6a | ||
|
|
447fbec6f9 | ||
|
|
b1f277cc3a | ||
|
|
1411c8c794 |
@@ -13,6 +13,7 @@ dependencies = [
|
||||
"openai>=1.13.3",
|
||||
"litellm==1.60.2",
|
||||
"instructor>=1.3.3",
|
||||
"timeout-decorator>=0.5.0",
|
||||
# Text Processing
|
||||
"pdfplumber>=0.11.4",
|
||||
"regex>=2024.9.11",
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
|
||||
|
||||
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
|
||||
import timeout_decorator
|
||||
|
||||
from crewai.agents import CacheHandler
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
@@ -64,6 +66,7 @@ class Agent(BaseAgent):
|
||||
"""
|
||||
|
||||
_times_executed: int = PrivateAttr(default=0)
|
||||
_have_forced_answer: bool = PrivateAttr(default=False)
|
||||
max_execution_time: Optional[int] = Field(
|
||||
default=None,
|
||||
description="Maximum execution time for an agent to execute a task",
|
||||
@@ -159,6 +162,77 @@ class Agent(BaseAgent):
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
|
||||
|
||||
def _execute_with_timeout(
|
||||
self,
|
||||
task: Task,
|
||||
context: Optional[str],
|
||||
tools: Optional[List[BaseTool]],
|
||||
timeout: int
|
||||
) -> str:
|
||||
"""Execute task with timeout using thread-based timeout.
|
||||
|
||||
Args:
|
||||
task: The task to execute
|
||||
context: Optional context for the task
|
||||
tools: Optional list of tools to use
|
||||
timeout: Maximum execution time in seconds (must be > 0)
|
||||
|
||||
Returns:
|
||||
The result of the task execution, with force_final_answer prompt appended on timeout
|
||||
|
||||
Raises:
|
||||
ValueError: If timeout is not a positive integer
|
||||
Exception: Any error that occurs during execution
|
||||
"""
|
||||
# Validate timeout before creating any resources
|
||||
if not isinstance(timeout, int) or timeout <= 0:
|
||||
raise ValueError("Timeout must be a positive integer greater than zero")
|
||||
|
||||
completion_event: threading.Event = threading.Event()
|
||||
result_container: List[Optional[str]] = [None]
|
||||
error_container: List[Optional[Exception]] = [None]
|
||||
|
||||
def target() -> None:
|
||||
try:
|
||||
result_container[0] = self._execute_task_without_timeout(task, context, tools)
|
||||
except Exception as e:
|
||||
error_container[0] = e
|
||||
finally:
|
||||
completion_event.set()
|
||||
|
||||
thread: threading.Thread = threading.Thread(target=target)
|
||||
thread.daemon = True # Ensures thread doesn't prevent program exit
|
||||
thread.start()
|
||||
|
||||
# Wait for either completion or timeout
|
||||
completed: bool = completion_event.wait(timeout=timeout)
|
||||
|
||||
if not completed:
|
||||
self._logger.log("warning", f"Task execution timed out after {timeout} seconds")
|
||||
thread.join(timeout=0.1)
|
||||
|
||||
# Clean up resources
|
||||
if hasattr(self, 'agent_executor') and self.agent_executor:
|
||||
self.agent_executor.llm = None # Release LLM resources
|
||||
if hasattr(self.agent_executor, 'close'):
|
||||
self.agent_executor.close()
|
||||
|
||||
# Force final answer using the prompt
|
||||
self._have_forced_answer = True
|
||||
forced_answer = self.i18n.errors("force_final_answer")
|
||||
return f"{result_container[0] if result_container[0] else ''}\n{forced_answer}"
|
||||
|
||||
if error_container[0]:
|
||||
error = error_container[0]
|
||||
self._logger.log("error", f"Task execution failed: {str(error)}")
|
||||
raise error
|
||||
|
||||
if result_container[0] is None:
|
||||
self._logger.log("warning", "Task execution completed but returned no result")
|
||||
raise timeout_decorator.TimeoutError("Task execution completed but returned no result") # This is a different kind of failure than timeout
|
||||
|
||||
return result_container[0]
|
||||
|
||||
def execute_task(
|
||||
self,
|
||||
task: Task,
|
||||
@@ -174,7 +248,42 @@ class Agent(BaseAgent):
|
||||
|
||||
Returns:
|
||||
Output of the agent
|
||||
|
||||
Raises:
|
||||
TimeoutError: If the task execution exceeds max_execution_time (if set)
|
||||
Exception: For other execution errors
|
||||
"""
|
||||
if self.max_execution_time is None:
|
||||
return self._execute_task_without_timeout(task, context, tools)
|
||||
|
||||
original_llm_timeout = getattr(self.llm, 'timeout', None)
|
||||
try:
|
||||
if hasattr(self.llm, 'timeout'):
|
||||
self.llm.timeout = self.max_execution_time
|
||||
|
||||
result = self._execute_with_timeout(task, context, tools, self.max_execution_time)
|
||||
if self._have_forced_answer:
|
||||
self._logger.log("warning", f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds. Using forced answer.")
|
||||
return result
|
||||
except timeout_decorator.TimeoutError:
|
||||
# This is a different kind of failure (e.g., no result at all)
|
||||
error_msg = (
|
||||
f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds "
|
||||
f"and produced no result. Consider increasing max_execution_time or optimizing the task."
|
||||
)
|
||||
self._logger.log("error", error_msg)
|
||||
raise TimeoutError(error_msg)
|
||||
finally:
|
||||
if original_llm_timeout is not None and hasattr(self.llm, 'timeout'):
|
||||
self.llm.timeout = original_llm_timeout
|
||||
|
||||
def _execute_task_without_timeout(
|
||||
self,
|
||||
task: Task,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
) -> str:
|
||||
"""Execute task without timeout - contains the original execute_task logic."""
|
||||
if self.tools_handler:
|
||||
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ from crewai import Agent, Crew, Process, Task
|
||||
from crewai.tasks.conditional_task import ConditionalTask
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.utilities.converter import Converter
|
||||
from concurrent.futures import TimeoutError as FuturesTimeoutError
|
||||
|
||||
|
||||
def test_task_tool_reflect_agent_tools():
|
||||
@@ -1283,3 +1284,139 @@ def test_interpolate_valid_types():
|
||||
assert parsed["optional"] is None
|
||||
assert parsed["nested"]["flag"] is True
|
||||
assert parsed["nested"]["empty"] is None
|
||||
|
||||
|
||||
def test_task_completes_within_max_execution_time():
|
||||
"""Test task completes successfully within specified timeout"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=100 # Ample time for completion
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
with patch.object(Agent, '_execute_task_without_timeout', return_value="Success") as mock_execute:
|
||||
result = task.execute_sync(agent=researcher)
|
||||
assert result.raw == "Success"
|
||||
mock_execute.assert_called_once()
|
||||
|
||||
|
||||
def test_task_exceeds_max_execution_time():
|
||||
"""Test task raises TimeoutError when exceeding max execution time"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=1 # Very short timeout
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
# Mock the OpenAI API call to avoid authentication
|
||||
with patch('litellm.completion', side_effect=TimeoutError("Request timed out")):
|
||||
with pytest.raises(TimeoutError) as excinfo:
|
||||
task.execute_sync(agent=researcher)
|
||||
assert "timed out" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_task_no_max_execution_time():
|
||||
"""Test task executes normally without timeout setting"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=None # No timeout
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
with patch.object(Agent, 'execute_task', return_value="Success") as mock_execute:
|
||||
result = task.execute_sync(agent=researcher)
|
||||
assert result.raw == "Success"
|
||||
mock_execute.assert_called_once()
|
||||
|
||||
|
||||
def test_task_max_execution_time_zero():
|
||||
"""Test immediate timeout with max_execution_time=0"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=1 # Set to minimum valid value
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
# Simulate immediate timeout using FuturesTimeoutError
|
||||
with patch('concurrent.futures.ThreadPoolExecutor') as mock_executor:
|
||||
mock_future = MagicMock()
|
||||
mock_future.result.side_effect = FuturesTimeoutError()
|
||||
mock_executor.return_value.submit.return_value = mock_future
|
||||
|
||||
with pytest.raises(TimeoutError) as excinfo:
|
||||
task.execute_sync(agent=researcher)
|
||||
assert "timed out after 1 seconds" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_task_force_final_answer_on_timeout():
|
||||
"""Test that force_final_answer is used when task times out"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=1 # Very short timeout
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
# Mock the task execution to simulate a partial result before timeout
|
||||
mock_i18n = MagicMock()
|
||||
mock_i18n.errors.return_value = "MUST give your absolute best final answer"
|
||||
researcher.i18n = mock_i18n
|
||||
|
||||
class MockThread:
|
||||
def __init__(self, target, *args, **kwargs):
|
||||
self.target = target
|
||||
self.daemon = kwargs.get('daemon', False)
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
def start(self):
|
||||
# Execute the target function to set the result
|
||||
self.target()
|
||||
|
||||
def join(self, timeout=None):
|
||||
pass
|
||||
|
||||
def mock_thread(*args, **kwargs):
|
||||
return MockThread(*args, **kwargs)
|
||||
|
||||
with patch('threading.Thread', side_effect=mock_thread), \
|
||||
patch('threading.Event.wait', return_value=False), \
|
||||
patch('litellm.completion'), \
|
||||
patch.object(Agent, '_execute_task_without_timeout', return_value="Partial result"):
|
||||
result = task.execute_sync(agent=researcher)
|
||||
assert "MUST give your absolute best final answer" in result.raw
|
||||
assert "Partial result" in result.raw # Should include partial result
|
||||
|
||||
Reference in New Issue
Block a user