mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 07:38:29 +00:00
Compare commits
4 Commits
bugfix/flo
...
devin/1742
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6fa8f3b73f | ||
|
|
4487d2538a | ||
|
|
88c5a0dd01 | ||
|
|
91675a4298 |
@@ -1,7 +1,8 @@
|
|||||||
|
import concurrent.futures
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
|
from typing import Any, Dict, List, Literal, NoReturn, Optional, Sequence, Union
|
||||||
|
|
||||||
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
|
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
|
||||||
|
|
||||||
@@ -17,7 +18,7 @@ from crewai.security import Fingerprint
|
|||||||
from crewai.task import Task
|
from crewai.task import Task
|
||||||
from crewai.tools import BaseTool
|
from crewai.tools import BaseTool
|
||||||
from crewai.tools.agent_tools.agent_tools import AgentTools
|
from crewai.tools.agent_tools.agent_tools import AgentTools
|
||||||
from crewai.utilities import Converter, Prompts
|
from crewai.utilities import AgentExecutionTimeoutError, Converter, Prompts
|
||||||
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
|
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
|
||||||
from crewai.utilities.converter import generate_model_description
|
from crewai.utilities.converter import generate_model_description
|
||||||
from crewai.utilities.events.agent_events import (
|
from crewai.utilities.events.agent_events import (
|
||||||
@@ -234,48 +235,39 @@ class Agent(BaseAgent):
|
|||||||
else:
|
else:
|
||||||
task_prompt = self._use_trained_data(task_prompt=task_prompt)
|
task_prompt = self._use_trained_data(task_prompt=task_prompt)
|
||||||
|
|
||||||
|
# Prepare the invoke parameters
|
||||||
|
invoke_params = {
|
||||||
|
"input": task_prompt,
|
||||||
|
"tool_names": self.agent_executor.tools_names,
|
||||||
|
"tools": self.agent_executor.tools_description,
|
||||||
|
"ask_for_human_input": task.human_input,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Emit the execution started event
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
self,
|
||||||
|
event=AgentExecutionStartedEvent(
|
||||||
|
agent=self,
|
||||||
|
tools=self.tools,
|
||||||
|
task_prompt=task_prompt,
|
||||||
|
task=task,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Execute with or without timeout based on configuration
|
||||||
try:
|
try:
|
||||||
crewai_event_bus.emit(
|
if self.max_execution_time is not None:
|
||||||
self,
|
result = self._execute_with_timeout(invoke_params, task, context, tools)
|
||||||
event=AgentExecutionStartedEvent(
|
else:
|
||||||
agent=self,
|
# No timeout, execute normally
|
||||||
tools=self.tools,
|
try:
|
||||||
task_prompt=task_prompt,
|
result = self.agent_executor.invoke(invoke_params)["output"]
|
||||||
task=task,
|
except Exception as e:
|
||||||
),
|
result = self._handle_execution_error(e, task, context, tools)
|
||||||
)
|
|
||||||
result = self.agent_executor.invoke(
|
|
||||||
{
|
|
||||||
"input": task_prompt,
|
|
||||||
"tool_names": self.agent_executor.tools_names,
|
|
||||||
"tools": self.agent_executor.tools_description,
|
|
||||||
"ask_for_human_input": task.human_input,
|
|
||||||
}
|
|
||||||
)["output"]
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if e.__class__.__module__.startswith("litellm"):
|
# This will catch any unhandled exceptions including timeout errors
|
||||||
# Do not retry on litellm errors
|
# that were re-raised from _execute_with_timeout
|
||||||
crewai_event_bus.emit(
|
raise e
|
||||||
self,
|
|
||||||
event=AgentExecutionErrorEvent(
|
|
||||||
agent=self,
|
|
||||||
task=task,
|
|
||||||
error=str(e),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
raise e
|
|
||||||
self._times_executed += 1
|
|
||||||
if self._times_executed > self.max_retry_limit:
|
|
||||||
crewai_event_bus.emit(
|
|
||||||
self,
|
|
||||||
event=AgentExecutionErrorEvent(
|
|
||||||
agent=self,
|
|
||||||
task=task,
|
|
||||||
error=str(e),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
raise e
|
|
||||||
result = self.execute_task(task, context, tools)
|
|
||||||
|
|
||||||
if self.max_rpm and self._rpm_controller:
|
if self.max_rpm and self._rpm_controller:
|
||||||
self._rpm_controller.stop_rpm_counter()
|
self._rpm_controller.stop_rpm_counter()
|
||||||
@@ -471,6 +463,105 @@ class Agent(BaseAgent):
|
|||||||
def __tools_names(tools) -> str:
|
def __tools_names(tools) -> str:
|
||||||
return ", ".join([t.name for t in tools])
|
return ", ".join([t.name for t in tools])
|
||||||
|
|
||||||
|
def _execute_with_timeout(
|
||||||
|
self,
|
||||||
|
invoke_params: Dict[str, Any],
|
||||||
|
task: Task,
|
||||||
|
context: Optional[str] = None,
|
||||||
|
tools: Optional[List[BaseTool]] = None
|
||||||
|
) -> str:
|
||||||
|
"""Execute a task with timeout handling.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
invoke_params: Parameters for agent executor invocation
|
||||||
|
task: Task to execute
|
||||||
|
context: Task context
|
||||||
|
tools: Available tools
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Output of the agent execution
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
|
future = executor.submit(self.agent_executor.invoke, invoke_params)
|
||||||
|
try:
|
||||||
|
return future.result(timeout=self.max_execution_time)["output"]
|
||||||
|
except concurrent.futures.TimeoutError:
|
||||||
|
# Cancel the future to stop the execution
|
||||||
|
future.cancel()
|
||||||
|
|
||||||
|
# Create a timeout error with context
|
||||||
|
error = AgentExecutionTimeoutError(
|
||||||
|
max_execution_time=self.max_execution_time,
|
||||||
|
agent_name=self.role,
|
||||||
|
task_description=task.description if task else None
|
||||||
|
)
|
||||||
|
|
||||||
|
# Emit the timeout error event
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
self,
|
||||||
|
event=AgentExecutionErrorEvent(
|
||||||
|
agent=self,
|
||||||
|
task=task,
|
||||||
|
error=str(error),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Raise the timeout error
|
||||||
|
raise error
|
||||||
|
except AgentExecutionTimeoutError:
|
||||||
|
# Re-raise timeout errors directly
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
# Handle other exceptions
|
||||||
|
return self._handle_execution_error(e, task, context, tools)
|
||||||
|
|
||||||
|
def _handle_execution_error(
|
||||||
|
self,
|
||||||
|
error: Exception,
|
||||||
|
task: Task,
|
||||||
|
context: Optional[str] = None,
|
||||||
|
tools: Optional[List[BaseTool]] = None
|
||||||
|
) -> Union[str, NoReturn]:
|
||||||
|
"""Handle execution errors with consistent logic.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
error: The caught exception
|
||||||
|
task: Current task being executed
|
||||||
|
context: Task context
|
||||||
|
tools: Available tools
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Retry result or re-raises the exception
|
||||||
|
"""
|
||||||
|
# Do not retry on litellm errors
|
||||||
|
if error.__class__.__module__.startswith("litellm"):
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
self,
|
||||||
|
event=AgentExecutionErrorEvent(
|
||||||
|
agent=self,
|
||||||
|
task=task,
|
||||||
|
error=str(error),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
raise error
|
||||||
|
|
||||||
|
# For other errors, follow retry logic
|
||||||
|
self._times_executed += 1
|
||||||
|
if self._times_executed > self.max_retry_limit:
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
self,
|
||||||
|
event=AgentExecutionErrorEvent(
|
||||||
|
agent=self,
|
||||||
|
task=task,
|
||||||
|
error=str(error),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
raise error
|
||||||
|
|
||||||
|
# Retry execution
|
||||||
|
return self.execute_task(task, context, tools)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})"
|
return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})"
|
||||||
|
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ from .rpm_controller import RPMController
|
|||||||
from .exceptions.context_window_exceeding_exception import (
|
from .exceptions.context_window_exceeding_exception import (
|
||||||
LLMContextLengthExceededException,
|
LLMContextLengthExceededException,
|
||||||
)
|
)
|
||||||
|
from .exceptions.agent_execution_timeout_error import AgentExecutionTimeoutError
|
||||||
from .embedding_configurator import EmbeddingConfigurator
|
from .embedding_configurator import EmbeddingConfigurator
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
@@ -24,5 +25,6 @@ __all__ = [
|
|||||||
"RPMController",
|
"RPMController",
|
||||||
"YamlParser",
|
"YamlParser",
|
||||||
"LLMContextLengthExceededException",
|
"LLMContextLengthExceededException",
|
||||||
|
"AgentExecutionTimeoutError",
|
||||||
"EmbeddingConfigurator",
|
"EmbeddingConfigurator",
|
||||||
]
|
]
|
||||||
|
|||||||
1
src/crewai/utilities/exceptions/__init__.py
Normal file
1
src/crewai/utilities/exceptions/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
# This file is intentionally left empty to make the directory a Python package
|
||||||
@@ -0,0 +1,24 @@
|
|||||||
|
class AgentExecutionTimeoutError(Exception):
|
||||||
|
"""Exception raised when an agent execution exceeds the maximum allowed time."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
max_execution_time: int,
|
||||||
|
agent_name: str | None = None,
|
||||||
|
task_description: str | None = None,
|
||||||
|
message: str | None = None
|
||||||
|
):
|
||||||
|
self.max_execution_time = max_execution_time
|
||||||
|
self.agent_name = agent_name
|
||||||
|
self.task_description = task_description
|
||||||
|
|
||||||
|
# Generate a detailed error message if not provided
|
||||||
|
if not message:
|
||||||
|
message = f"Agent execution exceeded maximum allowed time of {max_execution_time} seconds"
|
||||||
|
if agent_name:
|
||||||
|
message += f" for agent: {agent_name}"
|
||||||
|
if task_description:
|
||||||
|
message += f" while executing task: {task_description}"
|
||||||
|
|
||||||
|
self.message = message
|
||||||
|
super().__init__(self.message)
|
||||||
1
tests/test_timeout/__init__.py
Normal file
1
tests/test_timeout/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
# This file is intentionally left empty to make the directory a Python package
|
||||||
107
tests/test_timeout/test_agent_timeout.py
Normal file
107
tests/test_timeout/test_agent_timeout.py
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
import time
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from crewai import Agent, Task
|
||||||
|
from crewai.utilities import AgentExecutionTimeoutError
|
||||||
|
|
||||||
|
|
||||||
|
def test_agent_max_execution_time():
|
||||||
|
"""Test that max_execution_time parameter is enforced."""
|
||||||
|
# Create a simple test function that will be used to simulate a long-running task
|
||||||
|
def test_timeout():
|
||||||
|
# Create an agent with a 1-second timeout
|
||||||
|
with patch('crewai.agent.Agent.create_agent_executor'):
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test timeout functionality",
|
||||||
|
backstory="I am testing the timeout functionality",
|
||||||
|
max_execution_time=1,
|
||||||
|
verbose=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a task that will take longer than 1 second
|
||||||
|
task = Task(
|
||||||
|
description="Sleep for 5 seconds and then return a result",
|
||||||
|
expected_output="The result after sleeping",
|
||||||
|
agent=agent
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mock the agent_executor to simulate a long-running task
|
||||||
|
mock_executor = MagicMock()
|
||||||
|
def side_effect(*args, **kwargs):
|
||||||
|
# Sleep for longer than the timeout to trigger the timeout mechanism
|
||||||
|
time.sleep(2)
|
||||||
|
return {"output": "This should never be returned due to timeout"}
|
||||||
|
|
||||||
|
mock_executor.invoke.side_effect = side_effect
|
||||||
|
mock_executor.tools_names = []
|
||||||
|
mock_executor.tools_description = []
|
||||||
|
|
||||||
|
# Replace the agent's executor with our mock
|
||||||
|
agent.agent_executor = mock_executor
|
||||||
|
|
||||||
|
# Mock the event bus to avoid any real event emissions
|
||||||
|
with patch('crewai.agent.crewai_event_bus'):
|
||||||
|
# Execute the task and measure the time
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
# We expect an Exception to be raised due to timeout
|
||||||
|
with pytest.raises(Exception) as excinfo:
|
||||||
|
agent.execute_task(task)
|
||||||
|
|
||||||
|
# Check that the execution time is close to 1 second (the timeout)
|
||||||
|
execution_time = time.time() - start_time
|
||||||
|
assert execution_time <= 2.1, f"Execution took {execution_time:.2f} seconds, expected ~1 second"
|
||||||
|
|
||||||
|
# Check that the exception message mentions timeout or execution time
|
||||||
|
error_message = str(excinfo.value).lower()
|
||||||
|
assert any(term in error_message for term in ["timeout", "execution time", "exceeded maximum"])
|
||||||
|
|
||||||
|
# Run the test function
|
||||||
|
test_timeout()
|
||||||
|
|
||||||
|
|
||||||
|
def test_agent_timeout_error_message():
|
||||||
|
"""Test that the timeout error message includes agent and task information."""
|
||||||
|
# Create an agent with a very short timeout
|
||||||
|
with patch('crewai.agent.Agent.create_agent_executor'):
|
||||||
|
agent = Agent(
|
||||||
|
role="Test Agent",
|
||||||
|
goal="Test timeout error messaging",
|
||||||
|
backstory="I am testing the timeout error messaging",
|
||||||
|
max_execution_time=1, # Short timeout
|
||||||
|
verbose=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a task
|
||||||
|
task = Task(
|
||||||
|
description="This task should timeout quickly",
|
||||||
|
expected_output="This should never be returned",
|
||||||
|
agent=agent
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mock the agent_executor
|
||||||
|
mock_executor = MagicMock()
|
||||||
|
def side_effect(*args, **kwargs):
|
||||||
|
# Sleep to trigger timeout
|
||||||
|
time.sleep(2)
|
||||||
|
return {"output": "This should never be returned"}
|
||||||
|
|
||||||
|
mock_executor.invoke.side_effect = side_effect
|
||||||
|
mock_executor.tools_names = []
|
||||||
|
mock_executor.tools_description = []
|
||||||
|
|
||||||
|
# Replace the agent's executor
|
||||||
|
agent.agent_executor = mock_executor
|
||||||
|
|
||||||
|
# Execute the task and expect an exception
|
||||||
|
with patch('crewai.agent.crewai_event_bus'):
|
||||||
|
with pytest.raises(Exception) as excinfo:
|
||||||
|
agent.execute_task(task)
|
||||||
|
|
||||||
|
# Verify error message contains agent name and task description
|
||||||
|
error_message = str(excinfo.value).lower()
|
||||||
|
assert "test agent" in error_message or "agent: test agent" in error_message
|
||||||
|
assert "this task should timeout" in error_message or "task: this task should timeout" in error_message
|
||||||
Reference in New Issue
Block a user