diff --git a/src/crewai/agent.py b/src/crewai/agent.py index 66698c7a8..04cf7d871 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -175,34 +175,67 @@ class Agent(BaseAgent): tools: Optional[List[BaseTool]], timeout: int ) -> str: - """Execute task with timeout using thread-based timeout.""" - result = None - error = None + """Execute task with timeout using thread-based timeout. - def target(): - nonlocal result, error + Args: + task: The task to execute + context: Optional context for the task + tools: Optional list of tools to use + timeout: Maximum execution time in seconds (must be > 0) + + Returns: + The result of the task execution + + Raises: + ValueError: If timeout is not a positive integer + TimeoutError: If execution exceeds the timeout + Exception: Any error that occurs during execution + """ + # Validate timeout before creating any resources + if not isinstance(timeout, int) or timeout <= 0: + raise ValueError("Timeout must be a positive integer greater than zero") + + completion_event: threading.Event = threading.Event() + result_container: List[Optional[str]] = [None] + error_container: List[Optional[Exception]] = [None] + + def target() -> None: try: - result = self._execute_task_without_timeout(task, context, tools) + result_container[0] = self._execute_task_without_timeout(task, context, tools) except Exception as e: - error = e + error_container[0] = e + finally: + completion_event.set() - thread = threading.Thread(target=target) - thread.daemon = True + thread: threading.Thread = threading.Thread(target=target) + thread.daemon = True # Ensures thread doesn't prevent program exit thread.start() - thread.join(timeout=timeout) - if thread.is_alive(): + # Wait for either completion or timeout + completed: bool = completion_event.wait(timeout=timeout) + + if not completed: + self._logger.log("warning", f"Task execution timed out after {timeout} seconds") + thread.join(timeout=0.1) + + # Clean up resources if hasattr(self, 'agent_executor') and self.agent_executor: - self.agent_executor.llm = None + self.agent_executor.llm = None # Release LLM resources + if hasattr(self.agent_executor, 'close'): + self.agent_executor.close() + raise timeout_decorator.TimeoutError(f"Task execution timed out after {timeout} seconds") - if error: + if error_container[0]: + error = error_container[0] + self._logger.log("error", f"Task execution failed: {str(error)}") raise error - - if result is None: - raise timeout_decorator.TimeoutError(f"Task execution completed but returned no result") - - return result + + if result_container[0] is None: + self._logger.log("warning", "Task execution completed but returned no result") + raise timeout_decorator.TimeoutError("Task execution completed but returned no result") + + return result_container[0] def execute_task( self, @@ -234,7 +267,10 @@ class Agent(BaseAgent): return self._execute_with_timeout(task, context, tools, self.max_execution_time) except timeout_decorator.TimeoutError: - error_msg = f"Task execution timed out after {self.max_execution_time} seconds" + error_msg = ( + f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds. " + f"Consider increasing max_execution_time or optimizing the task." + ) self._logger.log("error", error_msg) raise TimeoutError(error_msg) finally: diff --git a/tests/task_test.py b/tests/task_test.py index c2599d85b..12cece5db 100644 --- a/tests/task_test.py +++ b/tests/task_test.py @@ -1356,7 +1356,7 @@ def test_task_max_execution_time_zero(): role="Researcher", goal="Test goal", backstory="Test backstory", - max_execution_time=0 # Immediate timeout + max_execution_time=1 # Set to minimum valid value ) task = Task( @@ -1365,7 +1365,7 @@ def test_task_max_execution_time_zero(): agent=researcher ) - # Simulate immediate timeout + # Simulate immediate timeout using FuturesTimeoutError with patch('concurrent.futures.ThreadPoolExecutor') as mock_executor: mock_future = MagicMock() mock_future.result.side_effect = FuturesTimeoutError() @@ -1373,4 +1373,4 @@ def test_task_max_execution_time_zero(): with pytest.raises(TimeoutError) as excinfo: task.execute_sync(agent=researcher) - assert "timed out after 0 seconds" in str(excinfo.value) + assert "timed out after 1 seconds" in str(excinfo.value)