From 91675a42984a665e4bc858451ab714c5844ff7cc Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 16 Mar 2025 22:35:58 +0000 Subject: [PATCH] Fix issue #2379: Implement timeout mechanism for max_execution_time Co-Authored-By: Joe Moura --- src/crewai/agent.py | 155 +++++++++++++----- src/crewai/utilities/__init__.py | 2 + src/crewai/utilities/exceptions/__init__.py | 1 + .../agent_execution_timeout_error.py | 7 + tests/test_timeout/__init__.py | 1 + tests/test_timeout/test_agent_timeout.py | 58 +++++++ 6 files changed, 182 insertions(+), 42 deletions(-) create mode 100644 src/crewai/utilities/exceptions/__init__.py create mode 100644 src/crewai/utilities/exceptions/agent_execution_timeout_error.py create mode 100644 tests/test_timeout/__init__.py create mode 100644 tests/test_timeout/test_agent_timeout.py diff --git a/src/crewai/agent.py b/src/crewai/agent.py index d10b768d4..a25fd0ef5 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -1,3 +1,4 @@ +import concurrent.futures import re import shutil import subprocess @@ -234,48 +235,118 @@ class Agent(BaseAgent): else: task_prompt = self._use_trained_data(task_prompt=task_prompt) - try: - crewai_event_bus.emit( - self, - event=AgentExecutionStartedEvent( - agent=self, - tools=self.tools, - task_prompt=task_prompt, - task=task, - ), - ) - result = self.agent_executor.invoke( - { - "input": task_prompt, - "tool_names": self.agent_executor.tools_names, - "tools": self.agent_executor.tools_description, - "ask_for_human_input": task.human_input, - } - )["output"] - except Exception as e: - if e.__class__.__module__.startswith("litellm"): - # Do not retry on litellm errors - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - self._times_executed += 1 - if self._times_executed > self.max_retry_limit: - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - result = self.execute_task(task, context, tools) + # Prepare the invoke parameters + invoke_params = { + "input": task_prompt, + "tool_names": self.agent_executor.tools_names, + "tools": self.agent_executor.tools_description, + "ask_for_human_input": task.human_input, + } + + # Emit the execution started event + crewai_event_bus.emit( + self, + event=AgentExecutionStartedEvent( + agent=self, + tools=self.tools, + task_prompt=task_prompt, + task=task, + ), + ) + + # If max_execution_time is set, use ThreadPoolExecutor with timeout + if self.max_execution_time is not None: + try: + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(self.agent_executor.invoke, invoke_params) + try: + result = future.result(timeout=self.max_execution_time)["output"] + except concurrent.futures.TimeoutError: + # Cancel the future to stop the execution + future.cancel() + # Define the timeout error message + error_message = f"Agent execution exceeded maximum time of {self.max_execution_time} seconds" + # Emit the timeout error event + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=error_message, + ), + ) + # Raise a standard Exception with the timeout message + # This avoids circular import issues while still providing the expected error message + raise Exception(f"Timeout Error: {error_message}") + except Exception as e: + # Re-raise any exceptions + if "Timeout Error:" in str(e): + raise + # For other exceptions, follow the normal retry logic + self._times_executed += 1 + if self._times_executed > self.max_retry_limit: + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(e), + ), + ) + raise e + return self.execute_task(task, context, tools) + except Exception as e: + if e.__class__.__module__.startswith("litellm"): + # Do not retry on litellm errors + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(e), + ), + ) + raise e + self._times_executed += 1 + if self._times_executed > self.max_retry_limit: + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(e), + ), + ) + raise e + result = self.execute_task(task, context, tools) + else: + # No timeout, execute normally + try: + result = self.agent_executor.invoke(invoke_params)["output"] + except Exception as e: + if e.__class__.__module__.startswith("litellm"): + # Do not retry on litellm errors + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(e), + ), + ) + raise e + self._times_executed += 1 + if self._times_executed > self.max_retry_limit: + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(e), + ), + ) + raise e + result = self.execute_task(task, context, tools) if self.max_rpm and self._rpm_controller: self._rpm_controller.stop_rpm_counter() diff --git a/src/crewai/utilities/__init__.py b/src/crewai/utilities/__init__.py index dd6d9fa44..7c13d7780 100644 --- a/src/crewai/utilities/__init__.py +++ b/src/crewai/utilities/__init__.py @@ -10,6 +10,7 @@ from .rpm_controller import RPMController from .exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededException, ) +from .exceptions.agent_execution_timeout_error import AgentExecutionTimeoutError from .embedding_configurator import EmbeddingConfigurator __all__ = [ @@ -24,5 +25,6 @@ __all__ = [ "RPMController", "YamlParser", "LLMContextLengthExceededException", + "AgentExecutionTimeoutError", "EmbeddingConfigurator", ] diff --git a/src/crewai/utilities/exceptions/__init__.py b/src/crewai/utilities/exceptions/__init__.py new file mode 100644 index 000000000..06d391492 --- /dev/null +++ b/src/crewai/utilities/exceptions/__init__.py @@ -0,0 +1 @@ +# This file is intentionally left empty to make the directory a Python package diff --git a/src/crewai/utilities/exceptions/agent_execution_timeout_error.py b/src/crewai/utilities/exceptions/agent_execution_timeout_error.py new file mode 100644 index 000000000..f2c0fbba5 --- /dev/null +++ b/src/crewai/utilities/exceptions/agent_execution_timeout_error.py @@ -0,0 +1,7 @@ +class AgentExecutionTimeoutError(Exception): + """Exception raised when an agent execution exceeds the maximum allowed time.""" + + def __init__(self, max_execution_time: int, message: str = None): + self.max_execution_time = max_execution_time + self.message = message or f"Agent execution exceeded maximum allowed time of {max_execution_time} seconds" + super().__init__(self.message) diff --git a/tests/test_timeout/__init__.py b/tests/test_timeout/__init__.py new file mode 100644 index 000000000..06d391492 --- /dev/null +++ b/tests/test_timeout/__init__.py @@ -0,0 +1 @@ +# This file is intentionally left empty to make the directory a Python package diff --git a/tests/test_timeout/test_agent_timeout.py b/tests/test_timeout/test_agent_timeout.py new file mode 100644 index 000000000..26cfba5ab --- /dev/null +++ b/tests/test_timeout/test_agent_timeout.py @@ -0,0 +1,58 @@ +import time +import pytest +from unittest.mock import patch, MagicMock +from crewai import Agent, Task + +def test_agent_max_execution_time(): + """Test that max_execution_time parameter is enforced.""" + # Create a simple test function that will be used to simulate a long-running task + def test_timeout(): + # Create an agent with a 1-second timeout + with patch('crewai.agent.Agent.create_agent_executor'): + agent = Agent( + role="Test Agent", + goal="Test timeout functionality", + backstory="I am testing the timeout functionality", + max_execution_time=1, + verbose=True + ) + + # Create a task that will take longer than 1 second + task = Task( + description="Sleep for 5 seconds and then return a result", + expected_output="The result after sleeping", + agent=agent + ) + + # Mock the agent_executor to simulate a long-running task + mock_executor = MagicMock() + def side_effect(*args, **kwargs): + # Sleep for longer than the timeout to trigger the timeout mechanism + time.sleep(2) + return {"output": "This should never be returned due to timeout"} + + mock_executor.invoke.side_effect = side_effect + mock_executor.tools_names = [] + mock_executor.tools_description = [] + + # Replace the agent's executor with our mock + agent.agent_executor = mock_executor + + # Mock the event bus to avoid any real event emissions + with patch('crewai.agent.crewai_event_bus'): + # Execute the task and measure the time + start_time = time.time() + + # We expect an Exception to be raised due to timeout + with pytest.raises(Exception) as excinfo: + agent.execute_task(task) + + # Check that the execution time is close to 1 second (the timeout) + execution_time = time.time() - start_time + assert execution_time <= 2.1, f"Execution took {execution_time:.2f} seconds, expected ~1 second" + + # Check that the exception message mentions timeout + assert "timeout" in str(excinfo.value).lower() or "execution time" in str(excinfo.value).lower() + + # Run the test function + test_timeout()