mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 16:48:30 +00:00
feat: Add max_execution_time support for agent tasks
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
from typing import Any, Dict, List, Literal, Optional, Union
|
||||
|
||||
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
|
||||
import timeout_decorator
|
||||
|
||||
from crewai.agents import CacheHandler
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
@@ -166,6 +168,42 @@ class Agent(BaseAgent):
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
|
||||
|
||||
def _execute_with_timeout(
|
||||
self,
|
||||
task: Task,
|
||||
context: Optional[str],
|
||||
tools: Optional[List[BaseTool]],
|
||||
timeout: int
|
||||
) -> str:
|
||||
"""Execute task with timeout using thread-based timeout."""
|
||||
result = None
|
||||
error = None
|
||||
|
||||
def target():
|
||||
nonlocal result, error
|
||||
try:
|
||||
result = self._execute_task_without_timeout(task, context, tools)
|
||||
except Exception as e:
|
||||
error = e
|
||||
|
||||
thread = threading.Thread(target=target)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
thread.join(timeout=timeout)
|
||||
|
||||
if thread.is_alive():
|
||||
if hasattr(self, 'agent_executor') and self.agent_executor:
|
||||
self.agent_executor.llm = None
|
||||
raise timeout_decorator.TimeoutError(f"Task execution timed out after {timeout} seconds")
|
||||
|
||||
if error:
|
||||
raise error
|
||||
|
||||
if result is None:
|
||||
raise timeout_decorator.TimeoutError(f"Task execution completed but returned no result")
|
||||
|
||||
return result
|
||||
|
||||
def execute_task(
|
||||
self,
|
||||
task: Task,
|
||||
@@ -181,7 +219,35 @@ class Agent(BaseAgent):
|
||||
|
||||
Returns:
|
||||
Output of the agent
|
||||
|
||||
Raises:
|
||||
TimeoutError: If the task execution exceeds max_execution_time (if set)
|
||||
Exception: For other execution errors
|
||||
"""
|
||||
if self.max_execution_time is None:
|
||||
return self._execute_task_without_timeout(task, context, tools)
|
||||
|
||||
original_llm_timeout = getattr(self.llm, 'timeout', None)
|
||||
try:
|
||||
if hasattr(self.llm, 'timeout'):
|
||||
self.llm.timeout = self.max_execution_time
|
||||
|
||||
return self._execute_with_timeout(task, context, tools, self.max_execution_time)
|
||||
except timeout_decorator.TimeoutError:
|
||||
error_msg = f"Task execution timed out after {self.max_execution_time} seconds"
|
||||
self._logger.log("error", error_msg)
|
||||
raise TimeoutError(error_msg)
|
||||
finally:
|
||||
if original_llm_timeout is not None and hasattr(self.llm, 'timeout'):
|
||||
self.llm.timeout = original_llm_timeout
|
||||
|
||||
def _execute_task_without_timeout(
|
||||
self,
|
||||
task: Task,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
) -> str:
|
||||
"""Execute task without timeout - contains the original execute_task logic."""
|
||||
if self.tools_handler:
|
||||
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user