From 1411c8c79488b8f1d1954dd717cd36408ed908c6 Mon Sep 17 00:00:00 2001 From: hafsatariq18 Date: Mon, 3 Feb 2025 16:25:43 +0530 Subject: [PATCH] feat: Add max_execution_time support for agent tasks --- src/crewai/agent.py | 66 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/src/crewai/agent.py b/src/crewai/agent.py index dec0effd7..66698c7a8 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -1,8 +1,10 @@ import shutil import subprocess +import threading from typing import Any, Dict, List, Literal, Optional, Union from pydantic import Field, InstanceOf, PrivateAttr, model_validator +import timeout_decorator from crewai.agents import CacheHandler from crewai.agents.agent_builder.base_agent import BaseAgent @@ -166,6 +168,42 @@ class Agent(BaseAgent): except (TypeError, ValueError) as e: raise ValueError(f"Invalid Knowledge Configuration: {str(e)}") + def _execute_with_timeout( + self, + task: Task, + context: Optional[str], + tools: Optional[List[BaseTool]], + timeout: int + ) -> str: + """Execute task with timeout using thread-based timeout.""" + result = None + error = None + + def target(): + nonlocal result, error + try: + result = self._execute_task_without_timeout(task, context, tools) + except Exception as e: + error = e + + thread = threading.Thread(target=target) + thread.daemon = True + thread.start() + thread.join(timeout=timeout) + + if thread.is_alive(): + if hasattr(self, 'agent_executor') and self.agent_executor: + self.agent_executor.llm = None + raise timeout_decorator.TimeoutError(f"Task execution timed out after {timeout} seconds") + + if error: + raise error + + if result is None: + raise timeout_decorator.TimeoutError(f"Task execution completed but returned no result") + + return result + def execute_task( self, task: Task, @@ -181,7 +219,35 @@ class Agent(BaseAgent): Returns: Output of the agent + + Raises: + TimeoutError: If the task execution exceeds max_execution_time (if set) + Exception: For other execution errors """ + if self.max_execution_time is None: + return self._execute_task_without_timeout(task, context, tools) + + original_llm_timeout = getattr(self.llm, 'timeout', None) + try: + if hasattr(self.llm, 'timeout'): + self.llm.timeout = self.max_execution_time + + return self._execute_with_timeout(task, context, tools, self.max_execution_time) + except timeout_decorator.TimeoutError: + error_msg = f"Task execution timed out after {self.max_execution_time} seconds" + self._logger.log("error", error_msg) + raise TimeoutError(error_msg) + finally: + if original_llm_timeout is not None and hasattr(self.llm, 'timeout'): + self.llm.timeout = original_llm_timeout + + def _execute_task_without_timeout( + self, + task: Task, + context: Optional[str] = None, + tools: Optional[List[BaseTool]] = None, + ) -> str: + """Execute task without timeout - contains the original execute_task logic.""" if self.tools_handler: self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")