mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 15:48:29 +00:00
Enhance task execution timeout handling with better error messages and resource cleanup
This commit is contained in:
@@ -175,34 +175,67 @@ class Agent(BaseAgent):
|
||||
tools: Optional[List[BaseTool]],
|
||||
timeout: int
|
||||
) -> str:
|
||||
"""Execute task with timeout using thread-based timeout."""
|
||||
result = None
|
||||
error = None
|
||||
"""Execute task with timeout using thread-based timeout.
|
||||
|
||||
def target():
|
||||
nonlocal result, error
|
||||
Args:
|
||||
task: The task to execute
|
||||
context: Optional context for the task
|
||||
tools: Optional list of tools to use
|
||||
timeout: Maximum execution time in seconds (must be > 0)
|
||||
|
||||
Returns:
|
||||
The result of the task execution
|
||||
|
||||
Raises:
|
||||
ValueError: If timeout is not a positive integer
|
||||
TimeoutError: If execution exceeds the timeout
|
||||
Exception: Any error that occurs during execution
|
||||
"""
|
||||
# Validate timeout before creating any resources
|
||||
if not isinstance(timeout, int) or timeout <= 0:
|
||||
raise ValueError("Timeout must be a positive integer greater than zero")
|
||||
|
||||
completion_event: threading.Event = threading.Event()
|
||||
result_container: List[Optional[str]] = [None]
|
||||
error_container: List[Optional[Exception]] = [None]
|
||||
|
||||
def target() -> None:
|
||||
try:
|
||||
result = self._execute_task_without_timeout(task, context, tools)
|
||||
result_container[0] = self._execute_task_without_timeout(task, context, tools)
|
||||
except Exception as e:
|
||||
error = e
|
||||
error_container[0] = e
|
||||
finally:
|
||||
completion_event.set()
|
||||
|
||||
thread = threading.Thread(target=target)
|
||||
thread.daemon = True
|
||||
thread: threading.Thread = threading.Thread(target=target)
|
||||
thread.daemon = True # Ensures thread doesn't prevent program exit
|
||||
thread.start()
|
||||
thread.join(timeout=timeout)
|
||||
|
||||
if thread.is_alive():
|
||||
# Wait for either completion or timeout
|
||||
completed: bool = completion_event.wait(timeout=timeout)
|
||||
|
||||
if not completed:
|
||||
self._logger.log("warning", f"Task execution timed out after {timeout} seconds")
|
||||
thread.join(timeout=0.1)
|
||||
|
||||
# Clean up resources
|
||||
if hasattr(self, 'agent_executor') and self.agent_executor:
|
||||
self.agent_executor.llm = None
|
||||
self.agent_executor.llm = None # Release LLM resources
|
||||
if hasattr(self.agent_executor, 'close'):
|
||||
self.agent_executor.close()
|
||||
|
||||
raise timeout_decorator.TimeoutError(f"Task execution timed out after {timeout} seconds")
|
||||
|
||||
if error:
|
||||
if error_container[0]:
|
||||
error = error_container[0]
|
||||
self._logger.log("error", f"Task execution failed: {str(error)}")
|
||||
raise error
|
||||
|
||||
if result is None:
|
||||
raise timeout_decorator.TimeoutError(f"Task execution completed but returned no result")
|
||||
|
||||
return result
|
||||
|
||||
if result_container[0] is None:
|
||||
self._logger.log("warning", "Task execution completed but returned no result")
|
||||
raise timeout_decorator.TimeoutError("Task execution completed but returned no result")
|
||||
|
||||
return result_container[0]
|
||||
|
||||
def execute_task(
|
||||
self,
|
||||
@@ -234,7 +267,10 @@ class Agent(BaseAgent):
|
||||
|
||||
return self._execute_with_timeout(task, context, tools, self.max_execution_time)
|
||||
except timeout_decorator.TimeoutError:
|
||||
error_msg = f"Task execution timed out after {self.max_execution_time} seconds"
|
||||
error_msg = (
|
||||
f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds. "
|
||||
f"Consider increasing max_execution_time or optimizing the task."
|
||||
)
|
||||
self._logger.log("error", error_msg)
|
||||
raise TimeoutError(error_msg)
|
||||
finally:
|
||||
|
||||
@@ -1356,7 +1356,7 @@ def test_task_max_execution_time_zero():
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=0 # Immediate timeout
|
||||
max_execution_time=1 # Set to minimum valid value
|
||||
)
|
||||
|
||||
task = Task(
|
||||
@@ -1365,7 +1365,7 @@ def test_task_max_execution_time_zero():
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
# Simulate immediate timeout
|
||||
# Simulate immediate timeout using FuturesTimeoutError
|
||||
with patch('concurrent.futures.ThreadPoolExecutor') as mock_executor:
|
||||
mock_future = MagicMock()
|
||||
mock_future.result.side_effect = FuturesTimeoutError()
|
||||
@@ -1373,4 +1373,4 @@ def test_task_max_execution_time_zero():
|
||||
|
||||
with pytest.raises(TimeoutError) as excinfo:
|
||||
task.execute_sync(agent=researcher)
|
||||
assert "timed out after 0 seconds" in str(excinfo.value)
|
||||
assert "timed out after 1 seconds" in str(excinfo.value)
|
||||
|
||||
Reference in New Issue
Block a user