Address PR feedback: Add thread safety, improve error messages, enhance type hints, and implement resource cleanup

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-03-31 20:07:10 +00:00
parent 142f3bbc60
commit 1454af98d6

View File

@@ -3,7 +3,7 @@ import re
import shutil import shutil
import subprocess import subprocess
import threading import threading
from typing import Any, Dict, List, Literal, Optional, Sequence, Union from typing import Any, Dict, List, Literal, Optional, Sequence, TypeVar, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -59,6 +59,7 @@ class Agent(BaseAgent):
""" """
_times_executed: int = PrivateAttr(default=0) _times_executed: int = PrivateAttr(default=0)
_times_executed_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
max_execution_time: Optional[int] = Field( max_execution_time: Optional[int] = Field(
default=None, default=None,
description="Maximum execution time for an agent to execute a task", description="Maximum execution time for an agent to execute a task",
@@ -266,17 +267,18 @@ class Agent(BaseAgent):
), ),
) )
raise e raise e
self._times_executed += 1 with self._times_executed_lock:
if self._times_executed > self.max_retry_limit: self._times_executed += 1
crewai_event_bus.emit( if self._times_executed > self.max_retry_limit:
self, crewai_event_bus.emit(
event=AgentExecutionErrorEvent( self,
agent=self, event=AgentExecutionErrorEvent(
task=task, agent=self,
error=str(e), task=task,
), error=str(e),
) ),
raise e )
raise e
result = self._execute_task_without_timeout(task, context, tools) result = self._execute_task_without_timeout(task, context, tools)
if self.max_rpm and self._rpm_controller: if self.max_rpm and self._rpm_controller:
@@ -299,7 +301,7 @@ class Agent(BaseAgent):
task: Task, task: Task,
context: Optional[str] = None, context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None, tools: Optional[List[BaseTool]] = None,
) -> str: ) -> Union[str, Any]:
"""Execute a task with a timeout. """Execute a task with a timeout.
Args: Args:
@@ -325,6 +327,7 @@ class Agent(BaseAgent):
return future.result(timeout=self.max_execution_time) return future.result(timeout=self.max_execution_time)
except concurrent.futures.TimeoutError: except concurrent.futures.TimeoutError:
future.cancel() future.cancel()
self._cleanup_timeout_resources()
self._logger.log( self._logger.log(
"warning", "warning",
@@ -363,8 +366,10 @@ class Agent(BaseAgent):
error_msg = ( error_msg = (
f"Task '{task.description}' execution timed out after " f"Task '{task.description}' execution timed out after "
f"{self.max_execution_time} seconds. Consider increasing " f"{self.max_execution_time} seconds. Consider:\n"
f"max_execution_time or optimizing the task." f"1. Increasing max_execution_time\n"
f"2. Optimizing the task\n"
f"3. Breaking the task into smaller subtasks"
) )
raise TimeoutError(error_msg) raise TimeoutError(error_msg)
@@ -388,7 +393,8 @@ class Agent(BaseAgent):
TimeoutError: If the execution exceeds max_execution_time (if set) TimeoutError: If the execution exceeds max_execution_time (if set)
Exception: For other execution errors Exception: For other execution errors
""" """
self._times_executed = 0 with self._times_executed_lock:
self._times_executed = 0
if self.max_execution_time is None: if self.max_execution_time is None:
return self._execute_task_without_timeout(task, context, tools) return self._execute_task_without_timeout(task, context, tools)
@@ -516,6 +522,23 @@ class Agent(BaseAgent):
return task_prompt return task_prompt
def _cleanup_timeout_resources(self) -> None:
"""Clean up resources after a timeout occurs.
This method is called when a task execution times out to ensure
that no resources are left in an inconsistent state.
"""
with self._times_executed_lock:
self._times_executed = 0
if self.max_rpm and self._rpm_controller:
self._rpm_controller.stop_rpm_counter()
self._logger.log(
"info",
"Cleaned up resources after timeout"
)
def _use_trained_data(self, task_prompt: str) -> str: def _use_trained_data(self, task_prompt: str) -> str:
"""Use trained data for the agent task prompt to improve output.""" """Use trained data for the agent task prompt to improve output."""
if data := CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).load(): if data := CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).load():