mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 15:48:29 +00:00
test: add task_max_execution_time test cases
This commit is contained in:
@@ -13,6 +13,7 @@ from crewai import Agent, Crew, Process, Task
|
||||
from crewai.tasks.conditional_task import ConditionalTask
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.utilities.converter import Converter
|
||||
from concurrent.futures import TimeoutError as FuturesTimeoutError
|
||||
|
||||
|
||||
def test_task_tool_reflect_agent_tools():
|
||||
@@ -1283,3 +1284,93 @@ def test_interpolate_valid_types():
|
||||
assert parsed["optional"] is None
|
||||
assert parsed["nested"]["flag"] is True
|
||||
assert parsed["nested"]["empty"] is None
|
||||
|
||||
|
||||
def test_task_completes_within_max_execution_time():
|
||||
"""Test task completes successfully within specified timeout"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=100 # Ample time for completion
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
with patch.object(Agent, '_execute_task_without_timeout', return_value="Success") as mock_execute:
|
||||
result = task.execute_sync(agent=researcher)
|
||||
assert result.raw == "Success"
|
||||
mock_execute.assert_called_once()
|
||||
|
||||
|
||||
def test_task_exceeds_max_execution_time():
|
||||
"""Test task raises TimeoutError when exceeding max execution time"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=1 # Very short timeout
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
# Mock the OpenAI API call to avoid authentication
|
||||
with patch('litellm.completion', side_effect=TimeoutError("Request timed out")):
|
||||
with pytest.raises(TimeoutError) as excinfo:
|
||||
task.execute_sync(agent=researcher)
|
||||
assert "timed out" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_task_no_max_execution_time():
|
||||
"""Test task executes normally without timeout setting"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=None # No timeout
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
with patch.object(Agent, 'execute_task', return_value="Success") as mock_execute:
|
||||
result = task.execute_sync(agent=researcher)
|
||||
assert result.raw == "Success"
|
||||
mock_execute.assert_called_once()
|
||||
|
||||
|
||||
def test_task_max_execution_time_zero():
|
||||
"""Test immediate timeout with max_execution_time=0"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=0 # Immediate timeout
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
# Simulate immediate timeout
|
||||
with patch('concurrent.futures.ThreadPoolExecutor') as mock_executor:
|
||||
mock_future = MagicMock()
|
||||
mock_future.result.side_effect = FuturesTimeoutError()
|
||||
mock_executor.return_value.submit.return_value = mock_future
|
||||
|
||||
with pytest.raises(TimeoutError) as excinfo:
|
||||
task.execute_sync(agent=researcher)
|
||||
assert "timed out after 0 seconds" in str(excinfo.value)
|
||||
|
||||
Reference in New Issue
Block a user