Compare commits

..

6 Commits

Author SHA1 Message Date
Devin AI
33da3e1797 feat: use force_final_answer prompt on timeout
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-09 23:45:32 +00:00
João Moura
495baeaf38 Merge branch 'main' into feature/max-execution-time 2025-02-09 20:30:53 -03:00
hafsatariq18
80a5018f6a Enhance task execution timeout handling with better error messages and resource cleanup 2025-02-04 14:27:31 +05:30
hafsatariq18
447fbec6f9 test: add task_max_execution_time test cases 2025-02-03 17:26:00 +05:30
hafsatariq18
b1f277cc3a chore: Add timeout-decorator dependency 2025-02-03 16:37:44 +05:30
hafsatariq18
1411c8c794 feat: Add max_execution_time support for agent tasks 2025-02-03 16:25:43 +05:30
6 changed files with 343 additions and 241 deletions

View File

@@ -282,19 +282,6 @@ my_crew = Crew(
### Using Google AI embeddings
#### Prerequisites
Before using Google AI embeddings, ensure you have:
- Access to the Gemini API
- The necessary API keys and permissions
You will need to update your *pyproject.toml* dependencies:
```YAML
dependencies = [
"google-generativeai>=0.8.4", #main version in January/2025 - crewai v.0.100.0 and crewai-tools 0.33.0
"crewai[tools]>=0.100.0,<1.0.0"
]
```
```python Code
from crewai import Crew, Agent, Task, Process

View File

@@ -13,6 +13,7 @@ dependencies = [
"openai>=1.13.3",
"litellm==1.60.2",
"instructor>=1.3.3",
"timeout-decorator>=0.5.0",
# Text Processing
"pdfplumber>=0.11.4",
"regex>=2024.9.11",

View File

@@ -1,9 +1,11 @@
import re
import shutil
import subprocess
import threading
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
import timeout_decorator
from crewai.agents import CacheHandler
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -64,6 +66,7 @@ class Agent(BaseAgent):
"""
_times_executed: int = PrivateAttr(default=0)
_have_forced_answer: bool = PrivateAttr(default=False)
max_execution_time: Optional[int] = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
@@ -159,6 +162,77 @@ class Agent(BaseAgent):
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
def _execute_with_timeout(
self,
task: Task,
context: Optional[str],
tools: Optional[List[BaseTool]],
timeout: int
) -> str:
"""Execute task with timeout using thread-based timeout.
Args:
task: The task to execute
context: Optional context for the task
tools: Optional list of tools to use
timeout: Maximum execution time in seconds (must be > 0)
Returns:
The result of the task execution, with force_final_answer prompt appended on timeout
Raises:
ValueError: If timeout is not a positive integer
Exception: Any error that occurs during execution
"""
# Validate timeout before creating any resources
if not isinstance(timeout, int) or timeout <= 0:
raise ValueError("Timeout must be a positive integer greater than zero")
completion_event: threading.Event = threading.Event()
result_container: List[Optional[str]] = [None]
error_container: List[Optional[Exception]] = [None]
def target() -> None:
try:
result_container[0] = self._execute_task_without_timeout(task, context, tools)
except Exception as e:
error_container[0] = e
finally:
completion_event.set()
thread: threading.Thread = threading.Thread(target=target)
thread.daemon = True # Ensures thread doesn't prevent program exit
thread.start()
# Wait for either completion or timeout
completed: bool = completion_event.wait(timeout=timeout)
if not completed:
self._logger.log("warning", f"Task execution timed out after {timeout} seconds")
thread.join(timeout=0.1)
# Clean up resources
if hasattr(self, 'agent_executor') and self.agent_executor:
self.agent_executor.llm = None # Release LLM resources
if hasattr(self.agent_executor, 'close'):
self.agent_executor.close()
# Force final answer using the prompt
self._have_forced_answer = True
forced_answer = self.i18n.errors("force_final_answer")
return f"{result_container[0] if result_container[0] else ''}\n{forced_answer}"
if error_container[0]:
error = error_container[0]
self._logger.log("error", f"Task execution failed: {str(error)}")
raise error
if result_container[0] is None:
self._logger.log("warning", "Task execution completed but returned no result")
raise timeout_decorator.TimeoutError("Task execution completed but returned no result") # This is a different kind of failure than timeout
return result_container[0]
def execute_task(
self,
task: Task,
@@ -174,7 +248,42 @@ class Agent(BaseAgent):
Returns:
Output of the agent
Raises:
TimeoutError: If the task execution exceeds max_execution_time (if set)
Exception: For other execution errors
"""
if self.max_execution_time is None:
return self._execute_task_without_timeout(task, context, tools)
original_llm_timeout = getattr(self.llm, 'timeout', None)
try:
if hasattr(self.llm, 'timeout'):
self.llm.timeout = self.max_execution_time
result = self._execute_with_timeout(task, context, tools, self.max_execution_time)
if self._have_forced_answer:
self._logger.log("warning", f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds. Using forced answer.")
return result
except timeout_decorator.TimeoutError:
# This is a different kind of failure (e.g., no result at all)
error_msg = (
f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds "
f"and produced no result. Consider increasing max_execution_time or optimizing the task."
)
self._logger.log("error", error_msg)
raise TimeoutError(error_msg)
finally:
if original_llm_timeout is not None and hasattr(self.llm, 'timeout'):
self.llm.timeout = original_llm_timeout
def _execute_task_without_timeout(
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute task without timeout - contains the original execute_task logic."""
if self.tools_handler:
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")

View File

@@ -1,138 +1,28 @@
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Union
from urllib.parse import urlparse
from typing import Dict, List
from pydantic import Field, field_validator
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
class ExcelKnowledgeSource(BaseKnowledgeSource):
class ExcelKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries Excel file content using embeddings."""
# override content to be a dict of file paths to sheet names to csv content
_logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default=None,
description="[Deprecated] The path to the file. Use file_paths instead.",
)
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default_factory=list, description="The path to the file"
)
chunks: List[str] = Field(default_factory=list)
content: Dict[Path, Dict[str, str]] = Field(default_factory=dict)
safe_file_paths: List[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
def validate_file_path(cls, v, info):
"""Validate that at least one of file_path or file_paths is provided."""
# Single check if both are None, O(1) instead of nested conditions
if (
v is None
and info.data.get(
"file_path" if info.field_name == "file_paths" else "file_paths"
)
is None
):
raise ValueError("Either file_path or file_paths must be provided")
return v
def _process_file_paths(self) -> List[Path]:
"""Convert file_path to a list of Path objects."""
if hasattr(self, "file_path") and self.file_path is not None:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
if self.file_paths is None:
raise ValueError("Your source must be provided with a file_paths: []")
# Convert single path to list
path_list: List[Union[Path, str]] = (
[self.file_paths]
if isinstance(self.file_paths, (str, Path))
else list(self.file_paths)
if isinstance(self.file_paths, list)
else []
)
if not path_list:
raise ValueError(
"file_path/file_paths must be a Path, str, or a list of these types"
)
return [self.convert_to_path(path) for path in path_list]
def validate_content(self):
"""Validate the paths."""
for path in self.safe_file_paths:
if not path.exists():
self._logger.log(
"error",
f"File not found: {path}. Try adding sources to the knowledge directory. If it's inside the knowledge directory, use the relative path.",
color="red",
)
raise FileNotFoundError(f"File not found: {path}")
if not path.is_file():
self._logger.log(
"error",
f"Path is not a file: {path}",
color="red",
)
def model_post_init(self, _) -> None:
if self.file_path:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
self.safe_file_paths = self._process_file_paths()
self.validate_content()
self.content = self._load_content()
def _load_content(self) -> Dict[Path, Dict[str, str]]:
"""Load and preprocess Excel file content from multiple sheets.
Each sheet's content is converted to CSV format and stored.
Returns:
Dict[Path, Dict[str, str]]: A mapping of file paths to their respective sheet contents.
Raises:
ImportError: If required dependencies are missing.
FileNotFoundError: If the specified Excel file cannot be opened.
"""
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess Excel file content."""
pd = self._import_dependencies()
content_dict = {}
for file_path in self.safe_file_paths:
file_path = self.convert_to_path(file_path)
with pd.ExcelFile(file_path) as xl:
sheet_dict = {
str(sheet_name): str(
pd.read_excel(xl, sheet_name).to_csv(index=False)
)
for sheet_name in xl.sheet_names
}
content_dict[file_path] = sheet_dict
df = pd.read_excel(file_path)
content = df.to_csv(index=False)
content_dict[file_path] = content
return content_dict
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
def _import_dependencies(self):
"""Dynamically import dependencies."""
try:
import openpyxl # noqa
import pandas as pd
return pd
@@ -148,14 +38,10 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
and save the embeddings.
"""
# Convert dictionary values to a single string if content is a dictionary
# Updated to account for .xlsx workbooks with multiple tabs/sheets
content_str = ""
for value in self.content.values():
if isinstance(value, dict):
for sheet_value in value.values():
content_str += str(sheet_value) + "\n"
else:
content_str += str(value) + "\n"
if isinstance(self.content, dict):
content_str = "\n".join(str(value) for value in self.content.values())
else:
content_str = str(self.content)
new_chunks = self._chunk_text(content_str)
self.chunks.extend(new_chunks)

View File

@@ -51,7 +51,6 @@ writer = Agent(
def test_crew_with_only_conditional_tasks_raises_error():
"""Test that creating a crew with only conditional tasks raises an error."""
def condition_func(task_output: TaskOutput) -> bool:
return True
@@ -83,7 +82,6 @@ def test_crew_with_only_conditional_tasks_raises_error():
tasks=[conditional1, conditional2, conditional3],
)
def test_crew_config_conditional_requirement():
with pytest.raises(ValueError):
Crew(process=Process.sequential)
@@ -591,12 +589,12 @@ def test_crew_with_delegating_agents_should_not_override_task_tools():
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
assert any(
isinstance(tool, TestTool) for tool in tools
), "TestTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in tools
), "Delegation tool should be present"
assert any(isinstance(tool, TestTool) for tool in tools), (
"TestTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in tools), (
"Delegation tool should be present"
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -655,12 +653,12 @@ def test_crew_with_delegating_agents_should_not_override_agent_tools():
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
assert any(
isinstance(tool, TestTool) for tool in new_ceo.tools
), "TestTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in tools
), "Delegation tool should be present"
assert any(isinstance(tool, TestTool) for tool in new_ceo.tools), (
"TestTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in tools), (
"Delegation tool should be present"
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -784,17 +782,17 @@ def test_task_tools_override_agent_tools_with_allow_delegation():
used_tools = kwargs["tools"]
# Confirm AnotherTestTool is present but TestTool is not
assert any(
isinstance(tool, AnotherTestTool) for tool in used_tools
), "AnotherTestTool should be present"
assert not any(
isinstance(tool, TestTool) for tool in used_tools
), "TestTool should not be present among used tools"
assert any(isinstance(tool, AnotherTestTool) for tool in used_tools), (
"AnotherTestTool should be present"
)
assert not any(isinstance(tool, TestTool) for tool in used_tools), (
"TestTool should not be present among used tools"
)
# Confirm delegation tool(s) are present
assert any(
"delegate" in tool.name.lower() for tool in used_tools
), "Delegation tool should be present"
assert any("delegate" in tool.name.lower() for tool in used_tools), (
"Delegation tool should be present"
)
# Finally, make sure the agent's original tools remain unchanged
assert len(researcher_with_delegation.tools) == 1
@@ -1595,9 +1593,9 @@ def test_code_execution_flag_adds_code_tool_upon_kickoff():
# Verify that exactly one tool was used and it was a CodeInterpreterTool
assert len(used_tools) == 1, "Should have exactly one tool"
assert isinstance(
used_tools[0], CodeInterpreterTool
), "Tool should be CodeInterpreterTool"
assert isinstance(used_tools[0], CodeInterpreterTool), (
"Tool should be CodeInterpreterTool"
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -1954,7 +1952,6 @@ def test_task_callback_on_crew():
def test_task_callback_both_on_task_and_crew():
from unittest.mock import MagicMock, patch
mock_callback_on_task = MagicMock()
mock_callback_on_crew = MagicMock()
@@ -2104,22 +2101,21 @@ def test_conditional_task_uses_last_output():
expected_output="First output",
agent=researcher,
)
def condition_fails(task_output: TaskOutput) -> bool:
# This condition will never be met
return "never matches" in task_output.raw.lower()
def condition_succeeds(task_output: TaskOutput) -> bool:
# This condition will match first task's output
return "first success" in task_output.raw.lower()
conditional_task1 = ConditionalTask(
description="Second task - conditional that fails condition",
expected_output="Second output",
agent=researcher,
condition=condition_fails,
)
conditional_task2 = ConditionalTask(
description="Third task - conditional that succeeds using first task output",
expected_output="Third output",
@@ -2138,37 +2134,35 @@ def test_conditional_task_uses_last_output():
raw="First success output", # Will be used by third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Second task output",
raw="", # Empty output since condition fails
agent=researcher.role,
)
mock_third = TaskOutput(
description="Third task output",
raw="Third task executed", # Output when condition succeeds using first task output
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_first, mock_third]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify outputs collection:
# First executed task output, followed by an automatically generated (skipped) output, then the conditional execution
# Verify outputs collection
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "First success output"
) # First task succeeded
assert (
result.tasks_output[1].raw == ""
) # Second task skipped (condition failed)
assert (
result.tasks_output[2].raw == "Third task executed"
) # Third task used first task's output
assert result.tasks_output[0].raw == "First success output" # First task succeeded
assert result.tasks_output[1].raw == "" # Second task skipped (condition failed)
assert result.tasks_output[2].raw == "Third task executed" # Third task used first task's output
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_tasks_result_collection():
@@ -2178,20 +2172,20 @@ def test_conditional_tasks_result_collection():
expected_output="First output",
agent=researcher,
)
def condition_never_met(task_output: TaskOutput) -> bool:
return "never matches" in task_output.raw.lower()
def condition_always_met(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
task2 = ConditionalTask(
description="Conditional task that never executes",
expected_output="Second output",
agent=researcher,
condition=condition_never_met,
)
task3 = ConditionalTask(
description="Conditional task that always executes",
expected_output="Third output",
@@ -2210,46 +2204,35 @@ def test_conditional_tasks_result_collection():
raw="Success output", # Triggers third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Skipped output",
raw="", # Empty output for skipped task
agent=researcher.role,
)
mock_conditional = TaskOutput(
description="Conditional output",
raw="Conditional task executed",
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_success, mock_conditional]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify task output collection:
# There should be three outputs: normal task, skipped conditional task (empty output),
# and the conditional task that executed.
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
# Verify task output collection
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
assert result.tasks_output[0].raw == "Success output" # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert result.tasks_output[2].raw == "Conditional task executed" # Third task executed
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_conditional_tasks():
@@ -2259,20 +2242,20 @@ def test_multiple_conditional_tasks():
expected_output="Research output",
agent=researcher,
)
def condition1(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
def condition2(task_output: TaskOutput) -> bool:
return "proceed" in task_output.raw.lower()
task2 = ConditionalTask(
description="First conditional task",
expected_output="Conditional output 1",
agent=writer,
condition=condition1,
)
task3 = ConditionalTask(
description="Second conditional task",
expected_output="Conditional output 2",
@@ -2291,7 +2274,7 @@ def test_multiple_conditional_tasks():
raw="Success and proceed output",
agent=researcher.role,
)
# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
@@ -2299,7 +2282,6 @@ def test_multiple_conditional_tasks():
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch
@@ -3418,9 +3400,9 @@ def test_fetch_inputs():
expected_placeholders = {"role_detail", "topic", "field"}
actual_placeholders = crew.fetch_inputs()
assert (
actual_placeholders == expected_placeholders
), f"Expected {expected_placeholders}, but got {actual_placeholders}"
assert actual_placeholders == expected_placeholders, (
f"Expected {expected_placeholders}, but got {actual_placeholders}"
)
def test_task_tools_preserve_code_execution_tools():
@@ -3493,20 +3475,20 @@ def test_task_tools_preserve_code_execution_tools():
used_tools = kwargs["tools"]
# Verify all expected tools are present
assert any(
isinstance(tool, TestTool) for tool in used_tools
), "Task's TestTool should be present"
assert any(
isinstance(tool, CodeInterpreterTool) for tool in used_tools
), "CodeInterpreterTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in used_tools
), "Delegation tool should be present"
assert any(isinstance(tool, TestTool) for tool in used_tools), (
"Task's TestTool should be present"
)
assert any(isinstance(tool, CodeInterpreterTool) for tool in used_tools), (
"CodeInterpreterTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in used_tools), (
"Delegation tool should be present"
)
# Verify the total number of tools (TestTool + CodeInterpreter + 2 delegation tools)
assert (
len(used_tools) == 4
), "Should have TestTool, CodeInterpreter, and 2 delegation tools"
assert len(used_tools) == 4, (
"Should have TestTool, CodeInterpreter, and 2 delegation tools"
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -3550,9 +3532,9 @@ def test_multimodal_flag_adds_multimodal_tools():
used_tools = kwargs["tools"]
# Check that the multimodal tool was added
assert any(
isinstance(tool, AddImageTool) for tool in used_tools
), "AddImageTool should be present when agent is multimodal"
assert any(isinstance(tool, AddImageTool) for tool in used_tools), (
"AddImageTool should be present when agent is multimodal"
)
# Verify we have exactly one tool (just the AddImageTool)
assert len(used_tools) == 1, "Should only have the AddImageTool"
@@ -3778,9 +3760,9 @@ def test_crew_guardrail_feedback_in_context():
assert len(execution_contexts) > 1, "Task should have been executed multiple times"
# Verify that the second execution included the guardrail feedback
assert (
"Output must contain the keyword 'IMPORTANT'" in execution_contexts[1]
), "Guardrail feedback should be included in retry context"
assert "Output must contain the keyword 'IMPORTANT'" in execution_contexts[1], (
"Guardrail feedback should be included in retry context"
)
# Verify final output meets guardrail requirements
assert "IMPORTANT" in result.raw, "Final output should contain required keyword"

View File

@@ -13,6 +13,7 @@ from crewai import Agent, Crew, Process, Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.converter import Converter
from concurrent.futures import TimeoutError as FuturesTimeoutError
def test_task_tool_reflect_agent_tools():
@@ -1283,3 +1284,139 @@ def test_interpolate_valid_types():
assert parsed["optional"] is None
assert parsed["nested"]["flag"] is True
assert parsed["nested"]["empty"] is None
def test_task_completes_within_max_execution_time():
"""Test task completes successfully within specified timeout"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=100 # Ample time for completion
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
with patch.object(Agent, '_execute_task_without_timeout', return_value="Success") as mock_execute:
result = task.execute_sync(agent=researcher)
assert result.raw == "Success"
mock_execute.assert_called_once()
def test_task_exceeds_max_execution_time():
"""Test task raises TimeoutError when exceeding max execution time"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=1 # Very short timeout
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
# Mock the OpenAI API call to avoid authentication
with patch('litellm.completion', side_effect=TimeoutError("Request timed out")):
with pytest.raises(TimeoutError) as excinfo:
task.execute_sync(agent=researcher)
assert "timed out" in str(excinfo.value)
def test_task_no_max_execution_time():
"""Test task executes normally without timeout setting"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=None # No timeout
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
with patch.object(Agent, 'execute_task', return_value="Success") as mock_execute:
result = task.execute_sync(agent=researcher)
assert result.raw == "Success"
mock_execute.assert_called_once()
def test_task_max_execution_time_zero():
"""Test immediate timeout with max_execution_time=0"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=1 # Set to minimum valid value
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
# Simulate immediate timeout using FuturesTimeoutError
with patch('concurrent.futures.ThreadPoolExecutor') as mock_executor:
mock_future = MagicMock()
mock_future.result.side_effect = FuturesTimeoutError()
mock_executor.return_value.submit.return_value = mock_future
with pytest.raises(TimeoutError) as excinfo:
task.execute_sync(agent=researcher)
assert "timed out after 1 seconds" in str(excinfo.value)
def test_task_force_final_answer_on_timeout():
"""Test that force_final_answer is used when task times out"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=1 # Very short timeout
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
# Mock the task execution to simulate a partial result before timeout
mock_i18n = MagicMock()
mock_i18n.errors.return_value = "MUST give your absolute best final answer"
researcher.i18n = mock_i18n
class MockThread:
def __init__(self, target, *args, **kwargs):
self.target = target
self.daemon = kwargs.get('daemon', False)
self.args = args
self.kwargs = kwargs
def start(self):
# Execute the target function to set the result
self.target()
def join(self, timeout=None):
pass
def mock_thread(*args, **kwargs):
return MockThread(*args, **kwargs)
with patch('threading.Thread', side_effect=mock_thread), \
patch('threading.Event.wait', return_value=False), \
patch('litellm.completion'), \
patch.object(Agent, '_execute_task_without_timeout', return_value="Partial result"):
result = task.execute_sync(agent=researcher)
assert "MUST give your absolute best final answer" in result.raw
assert "Partial result" in result.raw # Should include partial result