diff --git a/src/crewai/agent.py b/src/crewai/agent.py index a40841db1..038db5a06 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -1,6 +1,8 @@ +import concurrent.futures import re import shutil import subprocess +import threading from typing import Any, Dict, List, Literal, Optional, Sequence, Union from pydantic import Field, InstanceOf, PrivateAttr, model_validator @@ -154,13 +156,13 @@ class Agent(BaseAgent): except (TypeError, ValueError) as e: raise ValueError(f"Invalid Knowledge Configuration: {str(e)}") - def execute_task( + def _execute_task_without_timeout( self, task: Task, context: Optional[str] = None, tools: Optional[List[BaseTool]] = None, ) -> str: - """Execute a task with the agent. + """Execute a task with the agent without timeout. Args: task: Task to execute. @@ -275,7 +277,7 @@ class Agent(BaseAgent): ), ) raise e - result = self.execute_task(task, context, tools) + result = self._execute_task_without_timeout(task, context, tools) if self.max_rpm and self._rpm_controller: self._rpm_controller.stop_rpm_counter() @@ -291,6 +293,118 @@ class Agent(BaseAgent): event=AgentExecutionCompletedEvent(agent=self, task=task, output=result), ) return result + + def _execute_with_timeout( + self, + task: Task, + context: Optional[str] = None, + tools: Optional[List[BaseTool]] = None, + ) -> str: + """Execute a task with a timeout. + + Args: + task: Task to execute. + context: Context to execute the task in. + tools: Tools to use for the task. + + Returns: + Output of the agent + + Raises: + TimeoutError: If the execution exceeds max_execution_time + """ + if not isinstance(self.max_execution_time, int) or self.max_execution_time <= 0: + raise ValueError("max_execution_time must be a positive integer") + + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit( + self._execute_task_without_timeout, task, context, tools + ) + + try: + return future.result(timeout=self.max_execution_time) + except concurrent.futures.TimeoutError: + future.cancel() + + self._logger.log( + "warning", + f"Task execution timed out after {self.max_execution_time} seconds" + ) + + if hasattr(self, 'agent_executor') and self.agent_executor: + try: + self._logger.log( + "info", + "Requesting final answer due to timeout" + ) + + force_final_answer_message = { + "role": "assistant", + "content": self.i18n.errors("force_final_answer") + } + + if hasattr(self.agent_executor, 'messages'): + self.agent_executor.messages.append(force_final_answer_message) + + final_answer = self.agent_executor.llm.call( + self.agent_executor.messages, + callbacks=self.agent_executor.callbacks, + ) + + if final_answer: + formatted_answer = self.agent_executor._format_answer(final_answer) + if hasattr(formatted_answer, 'output'): + return formatted_answer.output + except Exception as e: + self._logger.log( + "error", + f"Failed to get partial result after timeout: {str(e)}" + ) + + error_msg = ( + f"Task '{task.description}' execution timed out after " + f"{self.max_execution_time} seconds. Consider increasing " + f"max_execution_time or optimizing the task." + ) + raise TimeoutError(error_msg) + + def execute_task( + self, + task: Task, + context: Optional[str] = None, + tools: Optional[List[BaseTool]] = None, + ) -> str: + """Execute a task with the agent. + + Args: + task: Task to execute. + context: Context to execute the task in. + tools: Tools to use for the task. + + Returns: + Output of the agent + + Raises: + TimeoutError: If the execution exceeds max_execution_time (if set) + Exception: For other execution errors + """ + self._times_executed = 0 + + if self.max_execution_time is None: + return self._execute_task_without_timeout(task, context, tools) + + try: + return self._execute_with_timeout(task, context, tools) + except TimeoutError as e: + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(e), + ), + ) + raise def create_agent_executor( self, tools: Optional[List[BaseTool]] = None, task=None diff --git a/tests/agent_test.py b/tests/agent_test.py index 9abc84137..6f0c4eb16 100644 --- a/tests/agent_test.py +++ b/tests/agent_test.py @@ -1,8 +1,9 @@ """Test Agent creation and execution basic functionality.""" +import concurrent.futures import os from unittest import mock -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest @@ -546,6 +547,98 @@ def test_agent_moved_on_after_max_iterations(): assert output == "42" +def test_agent_timeout_with_max_execution_time(): + """Test that an agent with max_execution_time raises a TimeoutError when execution takes too long.""" + + @tool + def slow_tool() -> str: + """A tool that takes a long time to execute.""" + import time + time.sleep(2) # Sleep for 2 seconds + return "This is a slow response" + + with patch.object(Agent, "_execute_task_without_timeout") as mock_execute: + mock_execute.side_effect = concurrent.futures.TimeoutError() + + agent = Agent( + role="test role", + goal="test goal", + backstory="test backstory", + max_execution_time=1, # Set timeout to 1 second + allow_delegation=False, + ) + + task = Task( + description="Use the slow_tool and wait for its response.", + expected_output="The response from the slow tool.", + ) + + with pytest.raises(TimeoutError): + agent.execute_task( + task=task, + tools=[slow_tool], + ) + + +def test_agent_partial_result_with_timeout(): + """Test that an agent with max_execution_time can return a partial result before timeout.""" + + @tool + def slow_tool() -> str: + """A tool that takes a long time to execute.""" + import time + time.sleep(0.1) # Just a small delay + return "This is a slow response" + + with patch("concurrent.futures.ThreadPoolExecutor.submit") as mock_submit: + mock_future = MagicMock() + mock_future.result.side_effect = concurrent.futures.TimeoutError() + mock_submit.return_value = mock_future + + with patch.object(LLM, "call") as mock_llm_call: + mock_llm_call.return_value = "Partial result due to timeout" + + with patch.object(CrewAgentExecutor, "_format_answer") as mock_format_answer: + mock_format_answer.return_value = AgentFinish( + thought="", + output="Partial result due to timeout", + text="Partial result due to timeout", + ) + + agent = Agent( + role="test role", + goal="test goal", + backstory="test backstory", + max_execution_time=1, # Set timeout to 1 second + allow_delegation=False, + ) + + agent.agent_executor = MagicMock() + agent.agent_executor.messages = [] + agent.agent_executor.llm = MagicMock() + agent.agent_executor.llm.call.return_value = "Partial result due to timeout" + agent.agent_executor._format_answer.return_value = AgentFinish( + thought="", + output="Partial result due to timeout", + text="Partial result due to timeout", + ) + agent.agent_executor.callbacks = [] + + task = Task( + description="Use the slow_tool and wait for its response.", + expected_output="The response from the slow tool.", + ) + + try: + result = agent.execute_task( + task=task, + tools=[slow_tool], + ) + assert "Partial result" in result + except Exception as e: + assert isinstance(e, TimeoutError) + + @pytest.mark.vcr(filter_headers=["authorization"]) def test_agent_respect_the_max_rpm_set(capsys): @tool