diff --git a/src/crewai/agent.py b/src/crewai/agent.py index 038db5a06..ca7ffd680 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -3,7 +3,7 @@ import re import shutil import subprocess import threading -from typing import Any, Dict, List, Literal, Optional, Sequence, Union +from typing import Any, Dict, List, Literal, Optional, Sequence, TypeVar, Union from pydantic import Field, InstanceOf, PrivateAttr, model_validator @@ -59,6 +59,7 @@ class Agent(BaseAgent): """ _times_executed: int = PrivateAttr(default=0) + _times_executed_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock) max_execution_time: Optional[int] = Field( default=None, description="Maximum execution time for an agent to execute a task", @@ -266,17 +267,18 @@ class Agent(BaseAgent): ), ) raise e - self._times_executed += 1 - if self._times_executed > self.max_retry_limit: - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e + with self._times_executed_lock: + self._times_executed += 1 + if self._times_executed > self.max_retry_limit: + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(e), + ), + ) + raise e result = self._execute_task_without_timeout(task, context, tools) if self.max_rpm and self._rpm_controller: @@ -299,7 +301,7 @@ class Agent(BaseAgent): task: Task, context: Optional[str] = None, tools: Optional[List[BaseTool]] = None, - ) -> str: + ) -> Union[str, Any]: """Execute a task with a timeout. Args: @@ -325,6 +327,7 @@ class Agent(BaseAgent): return future.result(timeout=self.max_execution_time) except concurrent.futures.TimeoutError: future.cancel() + self._cleanup_timeout_resources() self._logger.log( "warning", @@ -363,8 +366,10 @@ class Agent(BaseAgent): error_msg = ( f"Task '{task.description}' execution timed out after " - f"{self.max_execution_time} seconds. Consider increasing " - f"max_execution_time or optimizing the task." + f"{self.max_execution_time} seconds. Consider:\n" + f"1. Increasing max_execution_time\n" + f"2. Optimizing the task\n" + f"3. Breaking the task into smaller subtasks" ) raise TimeoutError(error_msg) @@ -388,7 +393,8 @@ class Agent(BaseAgent): TimeoutError: If the execution exceeds max_execution_time (if set) Exception: For other execution errors """ - self._times_executed = 0 + with self._times_executed_lock: + self._times_executed = 0 if self.max_execution_time is None: return self._execute_task_without_timeout(task, context, tools) @@ -516,6 +522,23 @@ class Agent(BaseAgent): return task_prompt + def _cleanup_timeout_resources(self) -> None: + """Clean up resources after a timeout occurs. + + This method is called when a task execution times out to ensure + that no resources are left in an inconsistent state. + """ + with self._times_executed_lock: + self._times_executed = 0 + + if self.max_rpm and self._rpm_controller: + self._rpm_controller.stop_rpm_counter() + + self._logger.log( + "info", + "Cleaned up resources after timeout" + ) + def _use_trained_data(self, task_prompt: str) -> str: """Use trained data for the agent task prompt to improve output.""" if data := CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).load():