mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-10 04:52:40 +00:00
Compare commits
6 Commits
brandon/ge
...
pr-2024
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
33da3e1797 | ||
|
|
495baeaf38 | ||
|
|
80a5018f6a | ||
|
|
447fbec6f9 | ||
|
|
b1f277cc3a | ||
|
|
1411c8c794 |
@@ -282,19 +282,6 @@ my_crew = Crew(
|
||||
|
||||
### Using Google AI embeddings
|
||||
|
||||
#### Prerequisites
|
||||
Before using Google AI embeddings, ensure you have:
|
||||
- Access to the Gemini API
|
||||
- The necessary API keys and permissions
|
||||
|
||||
You will need to update your *pyproject.toml* dependencies:
|
||||
```YAML
|
||||
dependencies = [
|
||||
"google-generativeai>=0.8.4", #main version in January/2025 - crewai v.0.100.0 and crewai-tools 0.33.0
|
||||
"crewai[tools]>=0.100.0,<1.0.0"
|
||||
]
|
||||
```
|
||||
|
||||
```python Code
|
||||
from crewai import Crew, Agent, Task, Process
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ dependencies = [
|
||||
"openai>=1.13.3",
|
||||
"litellm==1.60.2",
|
||||
"instructor>=1.3.3",
|
||||
"timeout-decorator>=0.5.0",
|
||||
# Text Processing
|
||||
"pdfplumber>=0.11.4",
|
||||
"regex>=2024.9.11",
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
|
||||
|
||||
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
|
||||
import timeout_decorator
|
||||
|
||||
from crewai.agents import CacheHandler
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
@@ -64,6 +66,7 @@ class Agent(BaseAgent):
|
||||
"""
|
||||
|
||||
_times_executed: int = PrivateAttr(default=0)
|
||||
_have_forced_answer: bool = PrivateAttr(default=False)
|
||||
max_execution_time: Optional[int] = Field(
|
||||
default=None,
|
||||
description="Maximum execution time for an agent to execute a task",
|
||||
@@ -159,6 +162,77 @@ class Agent(BaseAgent):
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
|
||||
|
||||
def _execute_with_timeout(
|
||||
self,
|
||||
task: Task,
|
||||
context: Optional[str],
|
||||
tools: Optional[List[BaseTool]],
|
||||
timeout: int
|
||||
) -> str:
|
||||
"""Execute task with timeout using thread-based timeout.
|
||||
|
||||
Args:
|
||||
task: The task to execute
|
||||
context: Optional context for the task
|
||||
tools: Optional list of tools to use
|
||||
timeout: Maximum execution time in seconds (must be > 0)
|
||||
|
||||
Returns:
|
||||
The result of the task execution, with force_final_answer prompt appended on timeout
|
||||
|
||||
Raises:
|
||||
ValueError: If timeout is not a positive integer
|
||||
Exception: Any error that occurs during execution
|
||||
"""
|
||||
# Validate timeout before creating any resources
|
||||
if not isinstance(timeout, int) or timeout <= 0:
|
||||
raise ValueError("Timeout must be a positive integer greater than zero")
|
||||
|
||||
completion_event: threading.Event = threading.Event()
|
||||
result_container: List[Optional[str]] = [None]
|
||||
error_container: List[Optional[Exception]] = [None]
|
||||
|
||||
def target() -> None:
|
||||
try:
|
||||
result_container[0] = self._execute_task_without_timeout(task, context, tools)
|
||||
except Exception as e:
|
||||
error_container[0] = e
|
||||
finally:
|
||||
completion_event.set()
|
||||
|
||||
thread: threading.Thread = threading.Thread(target=target)
|
||||
thread.daemon = True # Ensures thread doesn't prevent program exit
|
||||
thread.start()
|
||||
|
||||
# Wait for either completion or timeout
|
||||
completed: bool = completion_event.wait(timeout=timeout)
|
||||
|
||||
if not completed:
|
||||
self._logger.log("warning", f"Task execution timed out after {timeout} seconds")
|
||||
thread.join(timeout=0.1)
|
||||
|
||||
# Clean up resources
|
||||
if hasattr(self, 'agent_executor') and self.agent_executor:
|
||||
self.agent_executor.llm = None # Release LLM resources
|
||||
if hasattr(self.agent_executor, 'close'):
|
||||
self.agent_executor.close()
|
||||
|
||||
# Force final answer using the prompt
|
||||
self._have_forced_answer = True
|
||||
forced_answer = self.i18n.errors("force_final_answer")
|
||||
return f"{result_container[0] if result_container[0] else ''}\n{forced_answer}"
|
||||
|
||||
if error_container[0]:
|
||||
error = error_container[0]
|
||||
self._logger.log("error", f"Task execution failed: {str(error)}")
|
||||
raise error
|
||||
|
||||
if result_container[0] is None:
|
||||
self._logger.log("warning", "Task execution completed but returned no result")
|
||||
raise timeout_decorator.TimeoutError("Task execution completed but returned no result") # This is a different kind of failure than timeout
|
||||
|
||||
return result_container[0]
|
||||
|
||||
def execute_task(
|
||||
self,
|
||||
task: Task,
|
||||
@@ -174,7 +248,42 @@ class Agent(BaseAgent):
|
||||
|
||||
Returns:
|
||||
Output of the agent
|
||||
|
||||
Raises:
|
||||
TimeoutError: If the task execution exceeds max_execution_time (if set)
|
||||
Exception: For other execution errors
|
||||
"""
|
||||
if self.max_execution_time is None:
|
||||
return self._execute_task_without_timeout(task, context, tools)
|
||||
|
||||
original_llm_timeout = getattr(self.llm, 'timeout', None)
|
||||
try:
|
||||
if hasattr(self.llm, 'timeout'):
|
||||
self.llm.timeout = self.max_execution_time
|
||||
|
||||
result = self._execute_with_timeout(task, context, tools, self.max_execution_time)
|
||||
if self._have_forced_answer:
|
||||
self._logger.log("warning", f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds. Using forced answer.")
|
||||
return result
|
||||
except timeout_decorator.TimeoutError:
|
||||
# This is a different kind of failure (e.g., no result at all)
|
||||
error_msg = (
|
||||
f"Task '{task.description}' execution timed out after {self.max_execution_time} seconds "
|
||||
f"and produced no result. Consider increasing max_execution_time or optimizing the task."
|
||||
)
|
||||
self._logger.log("error", error_msg)
|
||||
raise TimeoutError(error_msg)
|
||||
finally:
|
||||
if original_llm_timeout is not None and hasattr(self.llm, 'timeout'):
|
||||
self.llm.timeout = original_llm_timeout
|
||||
|
||||
def _execute_task_without_timeout(
|
||||
self,
|
||||
task: Task,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
) -> str:
|
||||
"""Execute task without timeout - contains the original execute_task logic."""
|
||||
if self.tools_handler:
|
||||
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")
|
||||
|
||||
|
||||
@@ -1,138 +1,28 @@
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterator, List, Optional, Union
|
||||
from urllib.parse import urlparse
|
||||
from typing import Dict, List
|
||||
|
||||
from pydantic import Field, field_validator
|
||||
|
||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
|
||||
from crewai.utilities.logger import Logger
|
||||
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
|
||||
|
||||
|
||||
class ExcelKnowledgeSource(BaseKnowledgeSource):
|
||||
class ExcelKnowledgeSource(BaseFileKnowledgeSource):
|
||||
"""A knowledge source that stores and queries Excel file content using embeddings."""
|
||||
|
||||
# override content to be a dict of file paths to sheet names to csv content
|
||||
|
||||
_logger: Logger = Logger(verbose=True)
|
||||
|
||||
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
|
||||
default=None,
|
||||
description="[Deprecated] The path to the file. Use file_paths instead.",
|
||||
)
|
||||
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
|
||||
default_factory=list, description="The path to the file"
|
||||
)
|
||||
chunks: List[str] = Field(default_factory=list)
|
||||
content: Dict[Path, Dict[str, str]] = Field(default_factory=dict)
|
||||
safe_file_paths: List[Path] = Field(default_factory=list)
|
||||
|
||||
@field_validator("file_path", "file_paths", mode="before")
|
||||
def validate_file_path(cls, v, info):
|
||||
"""Validate that at least one of file_path or file_paths is provided."""
|
||||
# Single check if both are None, O(1) instead of nested conditions
|
||||
if (
|
||||
v is None
|
||||
and info.data.get(
|
||||
"file_path" if info.field_name == "file_paths" else "file_paths"
|
||||
)
|
||||
is None
|
||||
):
|
||||
raise ValueError("Either file_path or file_paths must be provided")
|
||||
return v
|
||||
|
||||
def _process_file_paths(self) -> List[Path]:
|
||||
"""Convert file_path to a list of Path objects."""
|
||||
|
||||
if hasattr(self, "file_path") and self.file_path is not None:
|
||||
self._logger.log(
|
||||
"warning",
|
||||
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
|
||||
color="yellow",
|
||||
)
|
||||
self.file_paths = self.file_path
|
||||
|
||||
if self.file_paths is None:
|
||||
raise ValueError("Your source must be provided with a file_paths: []")
|
||||
|
||||
# Convert single path to list
|
||||
path_list: List[Union[Path, str]] = (
|
||||
[self.file_paths]
|
||||
if isinstance(self.file_paths, (str, Path))
|
||||
else list(self.file_paths)
|
||||
if isinstance(self.file_paths, list)
|
||||
else []
|
||||
)
|
||||
|
||||
if not path_list:
|
||||
raise ValueError(
|
||||
"file_path/file_paths must be a Path, str, or a list of these types"
|
||||
)
|
||||
|
||||
return [self.convert_to_path(path) for path in path_list]
|
||||
|
||||
def validate_content(self):
|
||||
"""Validate the paths."""
|
||||
for path in self.safe_file_paths:
|
||||
if not path.exists():
|
||||
self._logger.log(
|
||||
"error",
|
||||
f"File not found: {path}. Try adding sources to the knowledge directory. If it's inside the knowledge directory, use the relative path.",
|
||||
color="red",
|
||||
)
|
||||
raise FileNotFoundError(f"File not found: {path}")
|
||||
if not path.is_file():
|
||||
self._logger.log(
|
||||
"error",
|
||||
f"Path is not a file: {path}",
|
||||
color="red",
|
||||
)
|
||||
|
||||
def model_post_init(self, _) -> None:
|
||||
if self.file_path:
|
||||
self._logger.log(
|
||||
"warning",
|
||||
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
|
||||
color="yellow",
|
||||
)
|
||||
self.file_paths = self.file_path
|
||||
self.safe_file_paths = self._process_file_paths()
|
||||
self.validate_content()
|
||||
self.content = self._load_content()
|
||||
|
||||
def _load_content(self) -> Dict[Path, Dict[str, str]]:
|
||||
"""Load and preprocess Excel file content from multiple sheets.
|
||||
|
||||
Each sheet's content is converted to CSV format and stored.
|
||||
|
||||
Returns:
|
||||
Dict[Path, Dict[str, str]]: A mapping of file paths to their respective sheet contents.
|
||||
|
||||
Raises:
|
||||
ImportError: If required dependencies are missing.
|
||||
FileNotFoundError: If the specified Excel file cannot be opened.
|
||||
"""
|
||||
def load_content(self) -> Dict[Path, str]:
|
||||
"""Load and preprocess Excel file content."""
|
||||
pd = self._import_dependencies()
|
||||
|
||||
content_dict = {}
|
||||
for file_path in self.safe_file_paths:
|
||||
file_path = self.convert_to_path(file_path)
|
||||
with pd.ExcelFile(file_path) as xl:
|
||||
sheet_dict = {
|
||||
str(sheet_name): str(
|
||||
pd.read_excel(xl, sheet_name).to_csv(index=False)
|
||||
)
|
||||
for sheet_name in xl.sheet_names
|
||||
}
|
||||
content_dict[file_path] = sheet_dict
|
||||
df = pd.read_excel(file_path)
|
||||
content = df.to_csv(index=False)
|
||||
content_dict[file_path] = content
|
||||
return content_dict
|
||||
|
||||
def convert_to_path(self, path: Union[Path, str]) -> Path:
|
||||
"""Convert a path to a Path object."""
|
||||
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
|
||||
|
||||
def _import_dependencies(self):
|
||||
"""Dynamically import dependencies."""
|
||||
try:
|
||||
import openpyxl # noqa
|
||||
import pandas as pd
|
||||
|
||||
return pd
|
||||
@@ -148,14 +38,10 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
|
||||
and save the embeddings.
|
||||
"""
|
||||
# Convert dictionary values to a single string if content is a dictionary
|
||||
# Updated to account for .xlsx workbooks with multiple tabs/sheets
|
||||
content_str = ""
|
||||
for value in self.content.values():
|
||||
if isinstance(value, dict):
|
||||
for sheet_value in value.values():
|
||||
content_str += str(sheet_value) + "\n"
|
||||
else:
|
||||
content_str += str(value) + "\n"
|
||||
if isinstance(self.content, dict):
|
||||
content_str = "\n".join(str(value) for value in self.content.values())
|
||||
else:
|
||||
content_str = str(self.content)
|
||||
|
||||
new_chunks = self._chunk_text(content_str)
|
||||
self.chunks.extend(new_chunks)
|
||||
|
||||
@@ -51,7 +51,6 @@ writer = Agent(
|
||||
|
||||
def test_crew_with_only_conditional_tasks_raises_error():
|
||||
"""Test that creating a crew with only conditional tasks raises an error."""
|
||||
|
||||
def condition_func(task_output: TaskOutput) -> bool:
|
||||
return True
|
||||
|
||||
@@ -83,7 +82,6 @@ def test_crew_with_only_conditional_tasks_raises_error():
|
||||
tasks=[conditional1, conditional2, conditional3],
|
||||
)
|
||||
|
||||
|
||||
def test_crew_config_conditional_requirement():
|
||||
with pytest.raises(ValueError):
|
||||
Crew(process=Process.sequential)
|
||||
@@ -591,12 +589,12 @@ def test_crew_with_delegating_agents_should_not_override_task_tools():
|
||||
_, kwargs = mock_execute_sync.call_args
|
||||
tools = kwargs["tools"]
|
||||
|
||||
assert any(
|
||||
isinstance(tool, TestTool) for tool in tools
|
||||
), "TestTool should be present"
|
||||
assert any(
|
||||
"delegate" in tool.name.lower() for tool in tools
|
||||
), "Delegation tool should be present"
|
||||
assert any(isinstance(tool, TestTool) for tool in tools), (
|
||||
"TestTool should be present"
|
||||
)
|
||||
assert any("delegate" in tool.name.lower() for tool in tools), (
|
||||
"Delegation tool should be present"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@@ -655,12 +653,12 @@ def test_crew_with_delegating_agents_should_not_override_agent_tools():
|
||||
_, kwargs = mock_execute_sync.call_args
|
||||
tools = kwargs["tools"]
|
||||
|
||||
assert any(
|
||||
isinstance(tool, TestTool) for tool in new_ceo.tools
|
||||
), "TestTool should be present"
|
||||
assert any(
|
||||
"delegate" in tool.name.lower() for tool in tools
|
||||
), "Delegation tool should be present"
|
||||
assert any(isinstance(tool, TestTool) for tool in new_ceo.tools), (
|
||||
"TestTool should be present"
|
||||
)
|
||||
assert any("delegate" in tool.name.lower() for tool in tools), (
|
||||
"Delegation tool should be present"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@@ -784,17 +782,17 @@ def test_task_tools_override_agent_tools_with_allow_delegation():
|
||||
used_tools = kwargs["tools"]
|
||||
|
||||
# Confirm AnotherTestTool is present but TestTool is not
|
||||
assert any(
|
||||
isinstance(tool, AnotherTestTool) for tool in used_tools
|
||||
), "AnotherTestTool should be present"
|
||||
assert not any(
|
||||
isinstance(tool, TestTool) for tool in used_tools
|
||||
), "TestTool should not be present among used tools"
|
||||
assert any(isinstance(tool, AnotherTestTool) for tool in used_tools), (
|
||||
"AnotherTestTool should be present"
|
||||
)
|
||||
assert not any(isinstance(tool, TestTool) for tool in used_tools), (
|
||||
"TestTool should not be present among used tools"
|
||||
)
|
||||
|
||||
# Confirm delegation tool(s) are present
|
||||
assert any(
|
||||
"delegate" in tool.name.lower() for tool in used_tools
|
||||
), "Delegation tool should be present"
|
||||
assert any("delegate" in tool.name.lower() for tool in used_tools), (
|
||||
"Delegation tool should be present"
|
||||
)
|
||||
|
||||
# Finally, make sure the agent's original tools remain unchanged
|
||||
assert len(researcher_with_delegation.tools) == 1
|
||||
@@ -1595,9 +1593,9 @@ def test_code_execution_flag_adds_code_tool_upon_kickoff():
|
||||
|
||||
# Verify that exactly one tool was used and it was a CodeInterpreterTool
|
||||
assert len(used_tools) == 1, "Should have exactly one tool"
|
||||
assert isinstance(
|
||||
used_tools[0], CodeInterpreterTool
|
||||
), "Tool should be CodeInterpreterTool"
|
||||
assert isinstance(used_tools[0], CodeInterpreterTool), (
|
||||
"Tool should be CodeInterpreterTool"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@@ -1954,7 +1952,6 @@ def test_task_callback_on_crew():
|
||||
|
||||
def test_task_callback_both_on_task_and_crew():
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
mock_callback_on_task = MagicMock()
|
||||
mock_callback_on_crew = MagicMock()
|
||||
|
||||
@@ -2104,22 +2101,21 @@ def test_conditional_task_uses_last_output():
|
||||
expected_output="First output",
|
||||
agent=researcher,
|
||||
)
|
||||
|
||||
def condition_fails(task_output: TaskOutput) -> bool:
|
||||
# This condition will never be met
|
||||
return "never matches" in task_output.raw.lower()
|
||||
|
||||
|
||||
def condition_succeeds(task_output: TaskOutput) -> bool:
|
||||
# This condition will match first task's output
|
||||
return "first success" in task_output.raw.lower()
|
||||
|
||||
|
||||
conditional_task1 = ConditionalTask(
|
||||
description="Second task - conditional that fails condition",
|
||||
expected_output="Second output",
|
||||
agent=researcher,
|
||||
condition=condition_fails,
|
||||
)
|
||||
|
||||
|
||||
conditional_task2 = ConditionalTask(
|
||||
description="Third task - conditional that succeeds using first task output",
|
||||
expected_output="Third output",
|
||||
@@ -2138,37 +2134,35 @@ def test_conditional_task_uses_last_output():
|
||||
raw="First success output", # Will be used by third task's condition
|
||||
agent=researcher.role,
|
||||
)
|
||||
mock_skipped = TaskOutput(
|
||||
description="Second task output",
|
||||
raw="", # Empty output since condition fails
|
||||
agent=researcher.role,
|
||||
)
|
||||
mock_third = TaskOutput(
|
||||
description="Third task output",
|
||||
raw="Third task executed", # Output when condition succeeds using first task output
|
||||
agent=writer.role,
|
||||
)
|
||||
|
||||
|
||||
# Set up mocks for task execution and conditional logic
|
||||
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
|
||||
# First conditional fails, second succeeds
|
||||
mock_should_execute.side_effect = [False, True]
|
||||
|
||||
with patch.object(Task, "execute_sync") as mock_execute:
|
||||
mock_execute.side_effect = [mock_first, mock_third]
|
||||
result = crew.kickoff()
|
||||
|
||||
|
||||
# Verify execution behavior
|
||||
assert mock_execute.call_count == 2 # Only first and third tasks execute
|
||||
assert mock_should_execute.call_count == 2 # Both conditionals checked
|
||||
|
||||
# Verify outputs collection:
|
||||
# First executed task output, followed by an automatically generated (skipped) output, then the conditional execution
|
||||
|
||||
# Verify outputs collection
|
||||
assert len(result.tasks_output) == 3
|
||||
assert (
|
||||
result.tasks_output[0].raw == "First success output"
|
||||
) # First task succeeded
|
||||
assert (
|
||||
result.tasks_output[1].raw == ""
|
||||
) # Second task skipped (condition failed)
|
||||
assert (
|
||||
result.tasks_output[2].raw == "Third task executed"
|
||||
) # Third task used first task's output
|
||||
|
||||
assert result.tasks_output[0].raw == "First success output" # First task succeeded
|
||||
assert result.tasks_output[1].raw == "" # Second task skipped (condition failed)
|
||||
assert result.tasks_output[2].raw == "Third task executed" # Third task used first task's output
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_conditional_tasks_result_collection():
|
||||
@@ -2178,20 +2172,20 @@ def test_conditional_tasks_result_collection():
|
||||
expected_output="First output",
|
||||
agent=researcher,
|
||||
)
|
||||
|
||||
|
||||
def condition_never_met(task_output: TaskOutput) -> bool:
|
||||
return "never matches" in task_output.raw.lower()
|
||||
|
||||
|
||||
def condition_always_met(task_output: TaskOutput) -> bool:
|
||||
return "success" in task_output.raw.lower()
|
||||
|
||||
|
||||
task2 = ConditionalTask(
|
||||
description="Conditional task that never executes",
|
||||
expected_output="Second output",
|
||||
agent=researcher,
|
||||
condition=condition_never_met,
|
||||
)
|
||||
|
||||
|
||||
task3 = ConditionalTask(
|
||||
description="Conditional task that always executes",
|
||||
expected_output="Third output",
|
||||
@@ -2210,46 +2204,35 @@ def test_conditional_tasks_result_collection():
|
||||
raw="Success output", # Triggers third task's condition
|
||||
agent=researcher.role,
|
||||
)
|
||||
mock_skipped = TaskOutput(
|
||||
description="Skipped output",
|
||||
raw="", # Empty output for skipped task
|
||||
agent=researcher.role,
|
||||
)
|
||||
mock_conditional = TaskOutput(
|
||||
description="Conditional output",
|
||||
raw="Conditional task executed",
|
||||
agent=writer.role,
|
||||
)
|
||||
|
||||
|
||||
# Set up mocks for task execution and conditional logic
|
||||
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
|
||||
# First conditional fails, second succeeds
|
||||
mock_should_execute.side_effect = [False, True]
|
||||
|
||||
with patch.object(Task, "execute_sync") as mock_execute:
|
||||
mock_execute.side_effect = [mock_success, mock_conditional]
|
||||
result = crew.kickoff()
|
||||
|
||||
|
||||
# Verify execution behavior
|
||||
assert mock_execute.call_count == 2 # Only first and third tasks execute
|
||||
assert mock_should_execute.call_count == 2 # Both conditionals checked
|
||||
|
||||
# Verify task output collection:
|
||||
# There should be three outputs: normal task, skipped conditional task (empty output),
|
||||
# and the conditional task that executed.
|
||||
assert len(result.tasks_output) == 3
|
||||
assert (
|
||||
result.tasks_output[0].raw == "Success output"
|
||||
) # Normal task executed
|
||||
assert result.tasks_output[1].raw == "" # Second task skipped
|
||||
assert (
|
||||
result.tasks_output[2].raw == "Conditional task executed"
|
||||
) # Third task executed
|
||||
|
||||
|
||||
# Verify task output collection
|
||||
assert len(result.tasks_output) == 3
|
||||
assert (
|
||||
result.tasks_output[0].raw == "Success output"
|
||||
) # Normal task executed
|
||||
assert result.tasks_output[1].raw == "" # Second task skipped
|
||||
assert (
|
||||
result.tasks_output[2].raw == "Conditional task executed"
|
||||
) # Third task executed
|
||||
|
||||
assert result.tasks_output[0].raw == "Success output" # Normal task executed
|
||||
assert result.tasks_output[1].raw == "" # Second task skipped
|
||||
assert result.tasks_output[2].raw == "Conditional task executed" # Third task executed
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_multiple_conditional_tasks():
|
||||
@@ -2259,20 +2242,20 @@ def test_multiple_conditional_tasks():
|
||||
expected_output="Research output",
|
||||
agent=researcher,
|
||||
)
|
||||
|
||||
|
||||
def condition1(task_output: TaskOutput) -> bool:
|
||||
return "success" in task_output.raw.lower()
|
||||
|
||||
|
||||
def condition2(task_output: TaskOutput) -> bool:
|
||||
return "proceed" in task_output.raw.lower()
|
||||
|
||||
|
||||
task2 = ConditionalTask(
|
||||
description="First conditional task",
|
||||
expected_output="Conditional output 1",
|
||||
agent=writer,
|
||||
condition=condition1,
|
||||
)
|
||||
|
||||
|
||||
task3 = ConditionalTask(
|
||||
description="Second conditional task",
|
||||
expected_output="Conditional output 2",
|
||||
@@ -2291,7 +2274,7 @@ def test_multiple_conditional_tasks():
|
||||
raw="Success and proceed output",
|
||||
agent=researcher.role,
|
||||
)
|
||||
|
||||
|
||||
# Set up mocks for task execution
|
||||
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
|
||||
result = crew.kickoff()
|
||||
@@ -2299,7 +2282,6 @@ def test_multiple_conditional_tasks():
|
||||
assert mock_execute.call_count == 3
|
||||
assert len(result.tasks_output) == 3
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_using_contextual_memory():
|
||||
from unittest.mock import patch
|
||||
@@ -3418,9 +3400,9 @@ def test_fetch_inputs():
|
||||
expected_placeholders = {"role_detail", "topic", "field"}
|
||||
actual_placeholders = crew.fetch_inputs()
|
||||
|
||||
assert (
|
||||
actual_placeholders == expected_placeholders
|
||||
), f"Expected {expected_placeholders}, but got {actual_placeholders}"
|
||||
assert actual_placeholders == expected_placeholders, (
|
||||
f"Expected {expected_placeholders}, but got {actual_placeholders}"
|
||||
)
|
||||
|
||||
|
||||
def test_task_tools_preserve_code_execution_tools():
|
||||
@@ -3493,20 +3475,20 @@ def test_task_tools_preserve_code_execution_tools():
|
||||
used_tools = kwargs["tools"]
|
||||
|
||||
# Verify all expected tools are present
|
||||
assert any(
|
||||
isinstance(tool, TestTool) for tool in used_tools
|
||||
), "Task's TestTool should be present"
|
||||
assert any(
|
||||
isinstance(tool, CodeInterpreterTool) for tool in used_tools
|
||||
), "CodeInterpreterTool should be present"
|
||||
assert any(
|
||||
"delegate" in tool.name.lower() for tool in used_tools
|
||||
), "Delegation tool should be present"
|
||||
assert any(isinstance(tool, TestTool) for tool in used_tools), (
|
||||
"Task's TestTool should be present"
|
||||
)
|
||||
assert any(isinstance(tool, CodeInterpreterTool) for tool in used_tools), (
|
||||
"CodeInterpreterTool should be present"
|
||||
)
|
||||
assert any("delegate" in tool.name.lower() for tool in used_tools), (
|
||||
"Delegation tool should be present"
|
||||
)
|
||||
|
||||
# Verify the total number of tools (TestTool + CodeInterpreter + 2 delegation tools)
|
||||
assert (
|
||||
len(used_tools) == 4
|
||||
), "Should have TestTool, CodeInterpreter, and 2 delegation tools"
|
||||
assert len(used_tools) == 4, (
|
||||
"Should have TestTool, CodeInterpreter, and 2 delegation tools"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@@ -3550,9 +3532,9 @@ def test_multimodal_flag_adds_multimodal_tools():
|
||||
used_tools = kwargs["tools"]
|
||||
|
||||
# Check that the multimodal tool was added
|
||||
assert any(
|
||||
isinstance(tool, AddImageTool) for tool in used_tools
|
||||
), "AddImageTool should be present when agent is multimodal"
|
||||
assert any(isinstance(tool, AddImageTool) for tool in used_tools), (
|
||||
"AddImageTool should be present when agent is multimodal"
|
||||
)
|
||||
|
||||
# Verify we have exactly one tool (just the AddImageTool)
|
||||
assert len(used_tools) == 1, "Should only have the AddImageTool"
|
||||
@@ -3778,9 +3760,9 @@ def test_crew_guardrail_feedback_in_context():
|
||||
assert len(execution_contexts) > 1, "Task should have been executed multiple times"
|
||||
|
||||
# Verify that the second execution included the guardrail feedback
|
||||
assert (
|
||||
"Output must contain the keyword 'IMPORTANT'" in execution_contexts[1]
|
||||
), "Guardrail feedback should be included in retry context"
|
||||
assert "Output must contain the keyword 'IMPORTANT'" in execution_contexts[1], (
|
||||
"Guardrail feedback should be included in retry context"
|
||||
)
|
||||
|
||||
# Verify final output meets guardrail requirements
|
||||
assert "IMPORTANT" in result.raw, "Final output should contain required keyword"
|
||||
|
||||
@@ -13,6 +13,7 @@ from crewai import Agent, Crew, Process, Task
|
||||
from crewai.tasks.conditional_task import ConditionalTask
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.utilities.converter import Converter
|
||||
from concurrent.futures import TimeoutError as FuturesTimeoutError
|
||||
|
||||
|
||||
def test_task_tool_reflect_agent_tools():
|
||||
@@ -1283,3 +1284,139 @@ def test_interpolate_valid_types():
|
||||
assert parsed["optional"] is None
|
||||
assert parsed["nested"]["flag"] is True
|
||||
assert parsed["nested"]["empty"] is None
|
||||
|
||||
|
||||
def test_task_completes_within_max_execution_time():
|
||||
"""Test task completes successfully within specified timeout"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=100 # Ample time for completion
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
with patch.object(Agent, '_execute_task_without_timeout', return_value="Success") as mock_execute:
|
||||
result = task.execute_sync(agent=researcher)
|
||||
assert result.raw == "Success"
|
||||
mock_execute.assert_called_once()
|
||||
|
||||
|
||||
def test_task_exceeds_max_execution_time():
|
||||
"""Test task raises TimeoutError when exceeding max execution time"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=1 # Very short timeout
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
# Mock the OpenAI API call to avoid authentication
|
||||
with patch('litellm.completion', side_effect=TimeoutError("Request timed out")):
|
||||
with pytest.raises(TimeoutError) as excinfo:
|
||||
task.execute_sync(agent=researcher)
|
||||
assert "timed out" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_task_no_max_execution_time():
|
||||
"""Test task executes normally without timeout setting"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=None # No timeout
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
with patch.object(Agent, 'execute_task', return_value="Success") as mock_execute:
|
||||
result = task.execute_sync(agent=researcher)
|
||||
assert result.raw == "Success"
|
||||
mock_execute.assert_called_once()
|
||||
|
||||
|
||||
def test_task_max_execution_time_zero():
|
||||
"""Test immediate timeout with max_execution_time=0"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=1 # Set to minimum valid value
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
# Simulate immediate timeout using FuturesTimeoutError
|
||||
with patch('concurrent.futures.ThreadPoolExecutor') as mock_executor:
|
||||
mock_future = MagicMock()
|
||||
mock_future.result.side_effect = FuturesTimeoutError()
|
||||
mock_executor.return_value.submit.return_value = mock_future
|
||||
|
||||
with pytest.raises(TimeoutError) as excinfo:
|
||||
task.execute_sync(agent=researcher)
|
||||
assert "timed out after 1 seconds" in str(excinfo.value)
|
||||
|
||||
|
||||
def test_task_force_final_answer_on_timeout():
|
||||
"""Test that force_final_answer is used when task times out"""
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
max_execution_time=1 # Very short timeout
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
# Mock the task execution to simulate a partial result before timeout
|
||||
mock_i18n = MagicMock()
|
||||
mock_i18n.errors.return_value = "MUST give your absolute best final answer"
|
||||
researcher.i18n = mock_i18n
|
||||
|
||||
class MockThread:
|
||||
def __init__(self, target, *args, **kwargs):
|
||||
self.target = target
|
||||
self.daemon = kwargs.get('daemon', False)
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
def start(self):
|
||||
# Execute the target function to set the result
|
||||
self.target()
|
||||
|
||||
def join(self, timeout=None):
|
||||
pass
|
||||
|
||||
def mock_thread(*args, **kwargs):
|
||||
return MockThread(*args, **kwargs)
|
||||
|
||||
with patch('threading.Thread', side_effect=mock_thread), \
|
||||
patch('threading.Event.wait', return_value=False), \
|
||||
patch('litellm.completion'), \
|
||||
patch.object(Agent, '_execute_task_without_timeout', return_value="Partial result"):
|
||||
result = task.execute_sync(agent=researcher)
|
||||
assert "MUST give your absolute best final answer" in result.raw
|
||||
assert "Partial result" in result.raw # Should include partial result
|
||||
|
||||
Reference in New Issue
Block a user