mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 04:18:35 +00:00
Compare commits
4 Commits
1.2.0
...
devin/1742
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6fa8f3b73f | ||
|
|
4487d2538a | ||
|
|
88c5a0dd01 | ||
|
|
91675a4298 |
@@ -1,7 +1,8 @@
|
||||
import concurrent.futures
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
|
||||
from typing import Any, Dict, List, Literal, NoReturn, Optional, Sequence, Union
|
||||
|
||||
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
|
||||
|
||||
@@ -17,7 +18,7 @@ from crewai.security import Fingerprint
|
||||
from crewai.task import Task
|
||||
from crewai.tools import BaseTool
|
||||
from crewai.tools.agent_tools.agent_tools import AgentTools
|
||||
from crewai.utilities import Converter, Prompts
|
||||
from crewai.utilities import AgentExecutionTimeoutError, Converter, Prompts
|
||||
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
|
||||
from crewai.utilities.converter import generate_model_description
|
||||
from crewai.utilities.events.agent_events import (
|
||||
@@ -234,48 +235,39 @@ class Agent(BaseAgent):
|
||||
else:
|
||||
task_prompt = self._use_trained_data(task_prompt=task_prompt)
|
||||
|
||||
# Prepare the invoke parameters
|
||||
invoke_params = {
|
||||
"input": task_prompt,
|
||||
"tool_names": self.agent_executor.tools_names,
|
||||
"tools": self.agent_executor.tools_description,
|
||||
"ask_for_human_input": task.human_input,
|
||||
}
|
||||
|
||||
# Emit the execution started event
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=AgentExecutionStartedEvent(
|
||||
agent=self,
|
||||
tools=self.tools,
|
||||
task_prompt=task_prompt,
|
||||
task=task,
|
||||
),
|
||||
)
|
||||
|
||||
# Execute with or without timeout based on configuration
|
||||
try:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=AgentExecutionStartedEvent(
|
||||
agent=self,
|
||||
tools=self.tools,
|
||||
task_prompt=task_prompt,
|
||||
task=task,
|
||||
),
|
||||
)
|
||||
result = self.agent_executor.invoke(
|
||||
{
|
||||
"input": task_prompt,
|
||||
"tool_names": self.agent_executor.tools_names,
|
||||
"tools": self.agent_executor.tools_description,
|
||||
"ask_for_human_input": task.human_input,
|
||||
}
|
||||
)["output"]
|
||||
if self.max_execution_time is not None:
|
||||
result = self._execute_with_timeout(invoke_params, task, context, tools)
|
||||
else:
|
||||
# No timeout, execute normally
|
||||
try:
|
||||
result = self.agent_executor.invoke(invoke_params)["output"]
|
||||
except Exception as e:
|
||||
result = self._handle_execution_error(e, task, context, tools)
|
||||
except Exception as e:
|
||||
if e.__class__.__module__.startswith("litellm"):
|
||||
# Do not retry on litellm errors
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=AgentExecutionErrorEvent(
|
||||
agent=self,
|
||||
task=task,
|
||||
error=str(e),
|
||||
),
|
||||
)
|
||||
raise e
|
||||
self._times_executed += 1
|
||||
if self._times_executed > self.max_retry_limit:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=AgentExecutionErrorEvent(
|
||||
agent=self,
|
||||
task=task,
|
||||
error=str(e),
|
||||
),
|
||||
)
|
||||
raise e
|
||||
result = self.execute_task(task, context, tools)
|
||||
# This will catch any unhandled exceptions including timeout errors
|
||||
# that were re-raised from _execute_with_timeout
|
||||
raise e
|
||||
|
||||
if self.max_rpm and self._rpm_controller:
|
||||
self._rpm_controller.stop_rpm_counter()
|
||||
@@ -471,6 +463,105 @@ class Agent(BaseAgent):
|
||||
def __tools_names(tools) -> str:
|
||||
return ", ".join([t.name for t in tools])
|
||||
|
||||
def _execute_with_timeout(
|
||||
self,
|
||||
invoke_params: Dict[str, Any],
|
||||
task: Task,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None
|
||||
) -> str:
|
||||
"""Execute a task with timeout handling.
|
||||
|
||||
Args:
|
||||
invoke_params: Parameters for agent executor invocation
|
||||
task: Task to execute
|
||||
context: Task context
|
||||
tools: Available tools
|
||||
|
||||
Returns:
|
||||
Output of the agent execution
|
||||
"""
|
||||
try:
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
future = executor.submit(self.agent_executor.invoke, invoke_params)
|
||||
try:
|
||||
return future.result(timeout=self.max_execution_time)["output"]
|
||||
except concurrent.futures.TimeoutError:
|
||||
# Cancel the future to stop the execution
|
||||
future.cancel()
|
||||
|
||||
# Create a timeout error with context
|
||||
error = AgentExecutionTimeoutError(
|
||||
max_execution_time=self.max_execution_time,
|
||||
agent_name=self.role,
|
||||
task_description=task.description if task else None
|
||||
)
|
||||
|
||||
# Emit the timeout error event
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=AgentExecutionErrorEvent(
|
||||
agent=self,
|
||||
task=task,
|
||||
error=str(error),
|
||||
),
|
||||
)
|
||||
|
||||
# Raise the timeout error
|
||||
raise error
|
||||
except AgentExecutionTimeoutError:
|
||||
# Re-raise timeout errors directly
|
||||
raise
|
||||
except Exception as e:
|
||||
# Handle other exceptions
|
||||
return self._handle_execution_error(e, task, context, tools)
|
||||
|
||||
def _handle_execution_error(
|
||||
self,
|
||||
error: Exception,
|
||||
task: Task,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None
|
||||
) -> Union[str, NoReturn]:
|
||||
"""Handle execution errors with consistent logic.
|
||||
|
||||
Args:
|
||||
error: The caught exception
|
||||
task: Current task being executed
|
||||
context: Task context
|
||||
tools: Available tools
|
||||
|
||||
Returns:
|
||||
Retry result or re-raises the exception
|
||||
"""
|
||||
# Do not retry on litellm errors
|
||||
if error.__class__.__module__.startswith("litellm"):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=AgentExecutionErrorEvent(
|
||||
agent=self,
|
||||
task=task,
|
||||
error=str(error),
|
||||
),
|
||||
)
|
||||
raise error
|
||||
|
||||
# For other errors, follow retry logic
|
||||
self._times_executed += 1
|
||||
if self._times_executed > self.max_retry_limit:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=AgentExecutionErrorEvent(
|
||||
agent=self,
|
||||
task=task,
|
||||
error=str(error),
|
||||
),
|
||||
)
|
||||
raise error
|
||||
|
||||
# Retry execution
|
||||
return self.execute_task(task, context, tools)
|
||||
|
||||
def __repr__(self):
|
||||
return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})"
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ from .rpm_controller import RPMController
|
||||
from .exceptions.context_window_exceeding_exception import (
|
||||
LLMContextLengthExceededException,
|
||||
)
|
||||
from .exceptions.agent_execution_timeout_error import AgentExecutionTimeoutError
|
||||
from .embedding_configurator import EmbeddingConfigurator
|
||||
|
||||
__all__ = [
|
||||
@@ -24,5 +25,6 @@ __all__ = [
|
||||
"RPMController",
|
||||
"YamlParser",
|
||||
"LLMContextLengthExceededException",
|
||||
"AgentExecutionTimeoutError",
|
||||
"EmbeddingConfigurator",
|
||||
]
|
||||
|
||||
1
src/crewai/utilities/exceptions/__init__.py
Normal file
1
src/crewai/utilities/exceptions/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# This file is intentionally left empty to make the directory a Python package
|
||||
@@ -0,0 +1,24 @@
|
||||
class AgentExecutionTimeoutError(Exception):
|
||||
"""Exception raised when an agent execution exceeds the maximum allowed time."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
max_execution_time: int,
|
||||
agent_name: str | None = None,
|
||||
task_description: str | None = None,
|
||||
message: str | None = None
|
||||
):
|
||||
self.max_execution_time = max_execution_time
|
||||
self.agent_name = agent_name
|
||||
self.task_description = task_description
|
||||
|
||||
# Generate a detailed error message if not provided
|
||||
if not message:
|
||||
message = f"Agent execution exceeded maximum allowed time of {max_execution_time} seconds"
|
||||
if agent_name:
|
||||
message += f" for agent: {agent_name}"
|
||||
if task_description:
|
||||
message += f" while executing task: {task_description}"
|
||||
|
||||
self.message = message
|
||||
super().__init__(self.message)
|
||||
1
tests/test_timeout/__init__.py
Normal file
1
tests/test_timeout/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# This file is intentionally left empty to make the directory a Python package
|
||||
107
tests/test_timeout/test_agent_timeout.py
Normal file
107
tests/test_timeout/test_agent_timeout.py
Normal file
@@ -0,0 +1,107 @@
|
||||
import time
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai import Agent, Task
|
||||
from crewai.utilities import AgentExecutionTimeoutError
|
||||
|
||||
|
||||
def test_agent_max_execution_time():
|
||||
"""Test that max_execution_time parameter is enforced."""
|
||||
# Create a simple test function that will be used to simulate a long-running task
|
||||
def test_timeout():
|
||||
# Create an agent with a 1-second timeout
|
||||
with patch('crewai.agent.Agent.create_agent_executor'):
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test timeout functionality",
|
||||
backstory="I am testing the timeout functionality",
|
||||
max_execution_time=1,
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Create a task that will take longer than 1 second
|
||||
task = Task(
|
||||
description="Sleep for 5 seconds and then return a result",
|
||||
expected_output="The result after sleeping",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
# Mock the agent_executor to simulate a long-running task
|
||||
mock_executor = MagicMock()
|
||||
def side_effect(*args, **kwargs):
|
||||
# Sleep for longer than the timeout to trigger the timeout mechanism
|
||||
time.sleep(2)
|
||||
return {"output": "This should never be returned due to timeout"}
|
||||
|
||||
mock_executor.invoke.side_effect = side_effect
|
||||
mock_executor.tools_names = []
|
||||
mock_executor.tools_description = []
|
||||
|
||||
# Replace the agent's executor with our mock
|
||||
agent.agent_executor = mock_executor
|
||||
|
||||
# Mock the event bus to avoid any real event emissions
|
||||
with patch('crewai.agent.crewai_event_bus'):
|
||||
# Execute the task and measure the time
|
||||
start_time = time.time()
|
||||
|
||||
# We expect an Exception to be raised due to timeout
|
||||
with pytest.raises(Exception) as excinfo:
|
||||
agent.execute_task(task)
|
||||
|
||||
# Check that the execution time is close to 1 second (the timeout)
|
||||
execution_time = time.time() - start_time
|
||||
assert execution_time <= 2.1, f"Execution took {execution_time:.2f} seconds, expected ~1 second"
|
||||
|
||||
# Check that the exception message mentions timeout or execution time
|
||||
error_message = str(excinfo.value).lower()
|
||||
assert any(term in error_message for term in ["timeout", "execution time", "exceeded maximum"])
|
||||
|
||||
# Run the test function
|
||||
test_timeout()
|
||||
|
||||
|
||||
def test_agent_timeout_error_message():
|
||||
"""Test that the timeout error message includes agent and task information."""
|
||||
# Create an agent with a very short timeout
|
||||
with patch('crewai.agent.Agent.create_agent_executor'):
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test timeout error messaging",
|
||||
backstory="I am testing the timeout error messaging",
|
||||
max_execution_time=1, # Short timeout
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Create a task
|
||||
task = Task(
|
||||
description="This task should timeout quickly",
|
||||
expected_output="This should never be returned",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
# Mock the agent_executor
|
||||
mock_executor = MagicMock()
|
||||
def side_effect(*args, **kwargs):
|
||||
# Sleep to trigger timeout
|
||||
time.sleep(2)
|
||||
return {"output": "This should never be returned"}
|
||||
|
||||
mock_executor.invoke.side_effect = side_effect
|
||||
mock_executor.tools_names = []
|
||||
mock_executor.tools_description = []
|
||||
|
||||
# Replace the agent's executor
|
||||
agent.agent_executor = mock_executor
|
||||
|
||||
# Execute the task and expect an exception
|
||||
with patch('crewai.agent.crewai_event_bus'):
|
||||
with pytest.raises(Exception) as excinfo:
|
||||
agent.execute_task(task)
|
||||
|
||||
# Verify error message contains agent name and task description
|
||||
error_message = str(excinfo.value).lower()
|
||||
assert "test agent" in error_message or "agent: test agent" in error_message
|
||||
assert "this task should timeout" in error_message or "task: this task should timeout" in error_message
|
||||
Reference in New Issue
Block a user