Compare commits

..

5 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
9aad450b4a Merge branch 'main' into brandon/gemini-google-docs 2025-02-10 12:11:16 -05:00
Brandon Hancock (bhancock_ai)
c408368267 fix linting issues in new tests (#2089)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-10 12:10:53 -05:00
Brandon Hancock (bhancock_ai)
612119f114 Merge branch 'main' into brandon/gemini-google-docs 2025-02-10 12:01:33 -05:00
Kevin King
90b3145e92 Updated excel_knowledge_source.py to account for excel files with multiple tabs. (#1921)
* Updated excel_knowledge_source.py to account for excel sheets that have multiple tabs. The old implementation contained a single df=pd.read_excel(excel_file_path), which only reads the first or most recently used excel sheet. The updated functionality reads all sheets in the excel workbook.

* updated load_content() function in excel_knowledge_source.py to reduce memory usage and provide better documentation

* accidentally didn't delete the old load_content() function in last commit - corrected this

* Added an override for the content field from the inheritted BaseFileKnowledgeSource to account for the change in the load_content method to support excel files with multiple tabs/sheets. This change should ensure it passes the type check test, as it failed before since content was assigned a different type in BaseFileKnowledgeSource

* Now removed the commented out imports in _import_dependencies, as requested

* Updated excel_knowledge_source to fix linter errors and type errors. Changed inheritence from basefileknowledgesource to baseknowledgesource because basefileknowledgesource's types conflicted (in particular the load_content function and the content class variable.

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-10 08:56:32 -08:00
Brandon Hancock
5b1d200b5b incorporate Small update in memory.mdx, fixing Google AI parameters #2008 2025-02-10 10:13:46 -05:00
6 changed files with 241 additions and 343 deletions

View File

@@ -282,6 +282,19 @@ my_crew = Crew(
### Using Google AI embeddings
#### Prerequisites
Before using Google AI embeddings, ensure you have:
- Access to the Gemini API
- The necessary API keys and permissions
You will need to update your *pyproject.toml* dependencies:
```YAML
dependencies = [
"google-generativeai>=0.8.4", #main version in January/2025 - crewai v.0.100.0 and crewai-tools 0.33.0
"crewai[tools]>=0.100.0,<1.0.0"
]
```
```python Code
from crewai import Crew, Agent, Task, Process

View File

@@ -13,7 +13,6 @@ dependencies = [
"openai>=1.13.3",
"litellm==1.60.2",
"instructor>=1.3.3",
"timeout-decorator>=0.5.0",
# Text Processing
"pdfplumber>=0.11.4",
"regex>=2024.9.11",

View File

@@ -1,11 +1,9 @@
import re
import shutil
import subprocess
import threading
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
import timeout_decorator
from crewai.agents import CacheHandler
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -66,7 +64,6 @@ class Agent(BaseAgent):
"""
_times_executed: int = PrivateAttr(default=0)
_have_forced_answer: bool = PrivateAttr(default=False)
max_execution_time: Optional[int] = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
@@ -162,77 +159,6 @@ class Agent(BaseAgent):
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
def _execute_with_timeout(
self,
task: Task,
context: Optional[str],
tools: Optional[List[BaseTool]],
timeout: int
) -> str:
"""Execute task with timeout using thread-based timeout.
Args:
task: The task to execute
context: Optional context for the task
tools: Optional list of tools to use
timeout: Maximum execution time in seconds (must be > 0)
Returns:
The result of the task execution, with force_final_answer prompt appended on timeout
Raises:
ValueError: If timeout is not a positive integer
Exception: Any error that occurs during execution
"""
# Validate timeout before creating any resources
if not isinstance(timeout, int) or timeout <= 0:
raise ValueError("Timeout must be a positive integer greater than zero")
completion_event: threading.Event = threading.Event()
result_container: List[Optional[str]] = [None]
error_container: List[Optional[Exception]] = [None]
def target() -> None:
try:
result_container[0] = self._execute_task_without_timeout(task, context, tools)
except Exception as e:
error_container[0] = e
finally:
completion_event.set()
thread: threading.Thread = threading.Thread(target=target)
thread.daemon = True # Ensures thread doesn't prevent program exit
thread.start()
# Wait for either completion or timeout
completed: bool = completion_event.wait(timeout=timeout)
if not completed:
self._logger.log("warning", f"Task execution timed out after {timeout} seconds")
thread.join(timeout=0.1)
# Clean up resources
if hasattr(self, 'agent_executor') and self.agent_executor:
self.agent_executor.llm = None # Release LLM resources
if hasattr(self.agent_executor, 'close'):
self.agent_executor.close()
# Force final answer using the prompt
self._have_forced_answer = True
forced_answer = self.i18n.errors("force_final_answer")
return f"{result_container[0] if result_container[0] else ''}\n{forced_answer}"
if error_container[0]:
error = error_container[0]
self._logger.log("error", f"Task execution failed: {str(error)}")
raise error
if result_container[0] is None:
self._logger.log("warning", "Task execution completed but returned no result")
raise timeout_decorator.TimeoutError("Task execution completed but returned no result") # This is a different kind of failure than timeout
return result_container[0]
def execute_task(
self,
task: Task,
@@ -248,42 +174,7 @@ class Agent(BaseAgent):
Returns:
Output of the agent
Raises:
TimeoutError: If the task execution exceeds max_execution_time (if set)
Exception: For other execution errors
"""
if self.max_execution_time is None:
return self._execute_task_without_timeout(task, context, tools)
original_llm_timeout = getattr(self.llm, 'timeout', None)
try:
if hasattr(self.llm, 'timeout'):
self.llm.timeout = self.max_execution_time
result = self._execute_with_timeout(task, context, tools, self.max_execution_time)
if self._have_forced_answer:
self._logger.log("warning", f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds. Using forced answer.")
return result
except timeout_decorator.TimeoutError:
# This is a different kind of failure (e.g., no result at all)
error_msg = (
f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds "
f"and produced no result. Consider increasing max_execution_time or optimizing the task."
)
self._logger.log("error", error_msg)
raise TimeoutError(error_msg)
finally:
if original_llm_timeout is not None and hasattr(self.llm, 'timeout'):
self.llm.timeout = original_llm_timeout
def _execute_task_without_timeout(
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute task without timeout - contains the original execute_task logic."""
if self.tools_handler:
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")

View File

@@ -1,28 +1,138 @@
from pathlib import Path
from typing import Dict, List
from typing import Dict, Iterator, List, Optional, Union
from urllib.parse import urlparse
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
from pydantic import Field, field_validator
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
class ExcelKnowledgeSource(BaseFileKnowledgeSource):
class ExcelKnowledgeSource(BaseKnowledgeSource):
"""A knowledge source that stores and queries Excel file content using embeddings."""
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess Excel file content."""
pd = self._import_dependencies()
# override content to be a dict of file paths to sheet names to csv content
_logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default=None,
description="[Deprecated] The path to the file. Use file_paths instead.",
)
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default_factory=list, description="The path to the file"
)
chunks: List[str] = Field(default_factory=list)
content: Dict[Path, Dict[str, str]] = Field(default_factory=dict)
safe_file_paths: List[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
def validate_file_path(cls, v, info):
"""Validate that at least one of file_path or file_paths is provided."""
# Single check if both are None, O(1) instead of nested conditions
if (
v is None
and info.data.get(
"file_path" if info.field_name == "file_paths" else "file_paths"
)
is None
):
raise ValueError("Either file_path or file_paths must be provided")
return v
def _process_file_paths(self) -> List[Path]:
"""Convert file_path to a list of Path objects."""
if hasattr(self, "file_path") and self.file_path is not None:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
if self.file_paths is None:
raise ValueError("Your source must be provided with a file_paths: []")
# Convert single path to list
path_list: List[Union[Path, str]] = (
[self.file_paths]
if isinstance(self.file_paths, (str, Path))
else list(self.file_paths)
if isinstance(self.file_paths, list)
else []
)
if not path_list:
raise ValueError(
"file_path/file_paths must be a Path, str, or a list of these types"
)
return [self.convert_to_path(path) for path in path_list]
def validate_content(self):
"""Validate the paths."""
for path in self.safe_file_paths:
if not path.exists():
self._logger.log(
"error",
f"File not found: {path}. Try adding sources to the knowledge directory. If it's inside the knowledge directory, use the relative path.",
color="red",
)
raise FileNotFoundError(f"File not found: {path}")
if not path.is_file():
self._logger.log(
"error",
f"Path is not a file: {path}",
color="red",
)
def model_post_init(self, _) -> None:
if self.file_path:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
self.safe_file_paths = self._process_file_paths()
self.validate_content()
self.content = self._load_content()
def _load_content(self) -> Dict[Path, Dict[str, str]]:
"""Load and preprocess Excel file content from multiple sheets.
Each sheet's content is converted to CSV format and stored.
Returns:
Dict[Path, Dict[str, str]]: A mapping of file paths to their respective sheet contents.
Raises:
ImportError: If required dependencies are missing.
FileNotFoundError: If the specified Excel file cannot be opened.
"""
pd = self._import_dependencies()
content_dict = {}
for file_path in self.safe_file_paths:
file_path = self.convert_to_path(file_path)
df = pd.read_excel(file_path)
content = df.to_csv(index=False)
content_dict[file_path] = content
with pd.ExcelFile(file_path) as xl:
sheet_dict = {
str(sheet_name): str(
pd.read_excel(xl, sheet_name).to_csv(index=False)
)
for sheet_name in xl.sheet_names
}
content_dict[file_path] = sheet_dict
return content_dict
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
def _import_dependencies(self):
"""Dynamically import dependencies."""
try:
import openpyxl # noqa
import pandas as pd
return pd
@@ -38,10 +148,14 @@ class ExcelKnowledgeSource(BaseFileKnowledgeSource):
and save the embeddings.
"""
# Convert dictionary values to a single string if content is a dictionary
if isinstance(self.content, dict):
content_str = "\n".join(str(value) for value in self.content.values())
else:
content_str = str(self.content)
# Updated to account for .xlsx workbooks with multiple tabs/sheets
content_str = ""
for value in self.content.values():
if isinstance(value, dict):
for sheet_value in value.values():
content_str += str(sheet_value) + "\n"
else:
content_str += str(value) + "\n"
new_chunks = self._chunk_text(content_str)
self.chunks.extend(new_chunks)

View File

@@ -51,6 +51,7 @@ writer = Agent(
def test_crew_with_only_conditional_tasks_raises_error():
"""Test that creating a crew with only conditional tasks raises an error."""
def condition_func(task_output: TaskOutput) -> bool:
return True
@@ -82,6 +83,7 @@ def test_crew_with_only_conditional_tasks_raises_error():
tasks=[conditional1, conditional2, conditional3],
)
def test_crew_config_conditional_requirement():
with pytest.raises(ValueError):
Crew(process=Process.sequential)
@@ -589,12 +591,12 @@ def test_crew_with_delegating_agents_should_not_override_task_tools():
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
assert any(isinstance(tool, TestTool) for tool in tools), (
"TestTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in tools), (
"Delegation tool should be present"
)
assert any(
isinstance(tool, TestTool) for tool in tools
), "TestTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in tools
), "Delegation tool should be present"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -653,12 +655,12 @@ def test_crew_with_delegating_agents_should_not_override_agent_tools():
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
assert any(isinstance(tool, TestTool) for tool in new_ceo.tools), (
"TestTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in tools), (
"Delegation tool should be present"
)
assert any(
isinstance(tool, TestTool) for tool in new_ceo.tools
), "TestTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in tools
), "Delegation tool should be present"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -782,17 +784,17 @@ def test_task_tools_override_agent_tools_with_allow_delegation():
used_tools = kwargs["tools"]
# Confirm AnotherTestTool is present but TestTool is not
assert any(isinstance(tool, AnotherTestTool) for tool in used_tools), (
"AnotherTestTool should be present"
)
assert not any(isinstance(tool, TestTool) for tool in used_tools), (
"TestTool should not be present among used tools"
)
assert any(
isinstance(tool, AnotherTestTool) for tool in used_tools
), "AnotherTestTool should be present"
assert not any(
isinstance(tool, TestTool) for tool in used_tools
), "TestTool should not be present among used tools"
# Confirm delegation tool(s) are present
assert any("delegate" in tool.name.lower() for tool in used_tools), (
"Delegation tool should be present"
)
assert any(
"delegate" in tool.name.lower() for tool in used_tools
), "Delegation tool should be present"
# Finally, make sure the agent's original tools remain unchanged
assert len(researcher_with_delegation.tools) == 1
@@ -1593,9 +1595,9 @@ def test_code_execution_flag_adds_code_tool_upon_kickoff():
# Verify that exactly one tool was used and it was a CodeInterpreterTool
assert len(used_tools) == 1, "Should have exactly one tool"
assert isinstance(used_tools[0], CodeInterpreterTool), (
"Tool should be CodeInterpreterTool"
)
assert isinstance(
used_tools[0], CodeInterpreterTool
), "Tool should be CodeInterpreterTool"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -1952,6 +1954,7 @@ def test_task_callback_on_crew():
def test_task_callback_both_on_task_and_crew():
from unittest.mock import MagicMock, patch
mock_callback_on_task = MagicMock()
mock_callback_on_crew = MagicMock()
@@ -2101,21 +2104,22 @@ def test_conditional_task_uses_last_output():
expected_output="First output",
agent=researcher,
)
def condition_fails(task_output: TaskOutput) -> bool:
# This condition will never be met
return "never matches" in task_output.raw.lower()
def condition_succeeds(task_output: TaskOutput) -> bool:
# This condition will match first task's output
return "first success" in task_output.raw.lower()
conditional_task1 = ConditionalTask(
description="Second task - conditional that fails condition",
expected_output="Second output",
agent=researcher,
condition=condition_fails,
)
conditional_task2 = ConditionalTask(
description="Third task - conditional that succeeds using first task output",
expected_output="Third output",
@@ -2134,35 +2138,37 @@ def test_conditional_task_uses_last_output():
raw="First success output", # Will be used by third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Second task output",
raw="", # Empty output since condition fails
agent=researcher.role,
)
mock_third = TaskOutput(
description="Third task output",
raw="Third task executed", # Output when condition succeeds using first task output
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_first, mock_third]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify outputs collection
# Verify outputs collection:
# First executed task output, followed by an automatically generated (skipped) output, then the conditional execution
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "First success output" # First task succeeded
assert result.tasks_output[1].raw == "" # Second task skipped (condition failed)
assert result.tasks_output[2].raw == "Third task executed" # Third task used first task's output
assert (
result.tasks_output[0].raw == "First success output"
) # First task succeeded
assert (
result.tasks_output[1].raw == ""
) # Second task skipped (condition failed)
assert (
result.tasks_output[2].raw == "Third task executed"
) # Third task used first task's output
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_tasks_result_collection():
@@ -2172,20 +2178,20 @@ def test_conditional_tasks_result_collection():
expected_output="First output",
agent=researcher,
)
def condition_never_met(task_output: TaskOutput) -> bool:
return "never matches" in task_output.raw.lower()
def condition_always_met(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
task2 = ConditionalTask(
description="Conditional task that never executes",
expected_output="Second output",
agent=researcher,
condition=condition_never_met,
)
task3 = ConditionalTask(
description="Conditional task that always executes",
expected_output="Third output",
@@ -2204,35 +2210,46 @@ def test_conditional_tasks_result_collection():
raw="Success output", # Triggers third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Skipped output",
raw="", # Empty output for skipped task
agent=researcher.role,
)
mock_conditional = TaskOutput(
description="Conditional output",
raw="Conditional task executed",
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_success, mock_conditional]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify task output collection:
# There should be three outputs: normal task, skipped conditional task (empty output),
# and the conditional task that executed.
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
# Verify task output collection
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "Success output" # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert result.tasks_output[2].raw == "Conditional task executed" # Third task executed
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_conditional_tasks():
@@ -2242,20 +2259,20 @@ def test_multiple_conditional_tasks():
expected_output="Research output",
agent=researcher,
)
def condition1(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
def condition2(task_output: TaskOutput) -> bool:
return "proceed" in task_output.raw.lower()
task2 = ConditionalTask(
description="First conditional task",
expected_output="Conditional output 1",
agent=writer,
condition=condition1,
)
task3 = ConditionalTask(
description="Second conditional task",
expected_output="Conditional output 2",
@@ -2274,7 +2291,7 @@ def test_multiple_conditional_tasks():
raw="Success and proceed output",
agent=researcher.role,
)
# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
@@ -2282,6 +2299,7 @@ def test_multiple_conditional_tasks():
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch
@@ -3400,9 +3418,9 @@ def test_fetch_inputs():
expected_placeholders = {"role_detail", "topic", "field"}
actual_placeholders = crew.fetch_inputs()
assert actual_placeholders == expected_placeholders, (
f"Expected {expected_placeholders}, but got {actual_placeholders}"
)
assert (
actual_placeholders == expected_placeholders
), f"Expected {expected_placeholders}, but got {actual_placeholders}"
def test_task_tools_preserve_code_execution_tools():
@@ -3475,20 +3493,20 @@ def test_task_tools_preserve_code_execution_tools():
used_tools = kwargs["tools"]
# Verify all expected tools are present
assert any(isinstance(tool, TestTool) for tool in used_tools), (
"Task's TestTool should be present"
)
assert any(isinstance(tool, CodeInterpreterTool) for tool in used_tools), (
"CodeInterpreterTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in used_tools), (
"Delegation tool should be present"
)
assert any(
isinstance(tool, TestTool) for tool in used_tools
), "Task's TestTool should be present"
assert any(
isinstance(tool, CodeInterpreterTool) for tool in used_tools
), "CodeInterpreterTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in used_tools
), "Delegation tool should be present"
# Verify the total number of tools (TestTool + CodeInterpreter + 2 delegation tools)
assert len(used_tools) == 4, (
"Should have TestTool, CodeInterpreter, and 2 delegation tools"
)
assert (
len(used_tools) == 4
), "Should have TestTool, CodeInterpreter, and 2 delegation tools"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -3532,9 +3550,9 @@ def test_multimodal_flag_adds_multimodal_tools():
used_tools = kwargs["tools"]
# Check that the multimodal tool was added
assert any(isinstance(tool, AddImageTool) for tool in used_tools), (
"AddImageTool should be present when agent is multimodal"
)
assert any(
isinstance(tool, AddImageTool) for tool in used_tools
), "AddImageTool should be present when agent is multimodal"
# Verify we have exactly one tool (just the AddImageTool)
assert len(used_tools) == 1, "Should only have the AddImageTool"
@@ -3760,9 +3778,9 @@ def test_crew_guardrail_feedback_in_context():
assert len(execution_contexts) > 1, "Task should have been executed multiple times"
# Verify that the second execution included the guardrail feedback
assert "Output must contain the keyword 'IMPORTANT'" in execution_contexts[1], (
"Guardrail feedback should be included in retry context"
)
assert (
"Output must contain the keyword 'IMPORTANT'" in execution_contexts[1]
), "Guardrail feedback should be included in retry context"
# Verify final output meets guardrail requirements
assert "IMPORTANT" in result.raw, "Final output should contain required keyword"

View File

@@ -13,7 +13,6 @@ from crewai import Agent, Crew, Process, Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.converter import Converter
from concurrent.futures import TimeoutError as FuturesTimeoutError
def test_task_tool_reflect_agent_tools():
@@ -1284,139 +1283,3 @@ def test_interpolate_valid_types():
assert parsed["optional"] is None
assert parsed["nested"]["flag"] is True
assert parsed["nested"]["empty"] is None
def test_task_completes_within_max_execution_time():
"""Test task completes successfully within specified timeout"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=100 # Ample time for completion
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
with patch.object(Agent, '_execute_task_without_timeout', return_value="Success") as mock_execute:
result = task.execute_sync(agent=researcher)
assert result.raw == "Success"
mock_execute.assert_called_once()
def test_task_exceeds_max_execution_time():
"""Test task raises TimeoutError when exceeding max execution time"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=1 # Very short timeout
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
# Mock the OpenAI API call to avoid authentication
with patch('litellm.completion', side_effect=TimeoutError("Request timed out")):
with pytest.raises(TimeoutError) as excinfo:
task.execute_sync(agent=researcher)
assert "timed out" in str(excinfo.value)
def test_task_no_max_execution_time():
"""Test task executes normally without timeout setting"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=None # No timeout
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
with patch.object(Agent, 'execute_task', return_value="Success") as mock_execute:
result = task.execute_sync(agent=researcher)
assert result.raw == "Success"
mock_execute.assert_called_once()
def test_task_max_execution_time_zero():
"""Test immediate timeout with max_execution_time=0"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=1 # Set to minimum valid value
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
# Simulate immediate timeout using FuturesTimeoutError
with patch('concurrent.futures.ThreadPoolExecutor') as mock_executor:
mock_future = MagicMock()
mock_future.result.side_effect = FuturesTimeoutError()
mock_executor.return_value.submit.return_value = mock_future
with pytest.raises(TimeoutError) as excinfo:
task.execute_sync(agent=researcher)
assert "timed out after 1 seconds" in str(excinfo.value)
def test_task_force_final_answer_on_timeout():
"""Test that force_final_answer is used when task times out"""
researcher = Agent(
role="Researcher",
goal="Test goal",
backstory="Test backstory",
max_execution_time=1 # Very short timeout
)
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher
)
# Mock the task execution to simulate a partial result before timeout
mock_i18n = MagicMock()
mock_i18n.errors.return_value = "MUST give your absolute best final answer"
researcher.i18n = mock_i18n
class MockThread:
def __init__(self, target, *args, **kwargs):
self.target = target
self.daemon = kwargs.get('daemon', False)
self.args = args
self.kwargs = kwargs
def start(self):
# Execute the target function to set the result
self.target()
def join(self, timeout=None):
pass
def mock_thread(*args, **kwargs):
return MockThread(*args, **kwargs)
with patch('threading.Thread', side_effect=mock_thread), \
patch('threading.Event.wait', return_value=False), \
patch('litellm.completion'), \
patch.object(Agent, '_execute_task_without_timeout', return_value="Partial result"):
result = task.execute_sync(agent=researcher)
assert "MUST give your absolute best final answer" in result.raw
assert "Partial result" in result.raw # Should include partial result