Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
88641a49f7 fix: sort imports in crew_test.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-09 23:26:34 +00:00
Devin AI
f838909220 feat: enable custom LLM support for Crew.test()
- Added new llm parameter to Crew.test() that accepts string or LLM instance
- Maintained backward compatibility with openai_model_name parameter
- Updated CrewEvaluator to handle any LLM implementation
- Added comprehensive test coverage

Fixes #2081

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-09 23:25:02 +00:00
8 changed files with 207 additions and 458 deletions

View File

@@ -91,7 +91,7 @@ result = crew.kickoff(inputs={"question": "What city does John live in and how o
```
Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including MD, PDF, DOCX, HTML, and more.
Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including TXT, PDF, DOCX, HTML, and more.
<Note>
You need to install `docling` for the following example to work: `uv add docling`
@@ -152,10 +152,10 @@ Here are examples of how to use different types of knowledge sources:
### Text File Knowledge Source
```python
from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
# Create a text file knowledge source
text_source = TextFileKnowledgeSource(
text_source = CrewDoclingSource(
file_paths=["document.txt", "another.txt"]
)

View File

@@ -282,19 +282,6 @@ my_crew = Crew(
### Using Google AI embeddings
#### Prerequisites
Before using Google AI embeddings, ensure you have:
- Access to the Gemini API
- The necessary API keys and permissions
You will need to update your *pyproject.toml* dependencies:
```YAML
dependencies = [
"google-generativeai>=0.8.4", #main version in January/2025 - crewai v.0.100.0 and crewai-tools 0.33.0
"crewai[tools]>=0.100.0,<1.0.0"
]
```
```python Code
from crewai import Crew, Agent, Task, Process
@@ -447,38 +434,6 @@ my_crew = Crew(
)
```
### Using Amazon Bedrock embeddings
```python Code
# Note: Ensure you have installed `boto3` for Bedrock embeddings to work.
import os
import boto3
from crewai import Crew, Agent, Task, Process
boto3_session = boto3.Session(
region_name=os.environ.get("AWS_REGION_NAME"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY")
)
my_crew = Crew(
agents=[...],
tasks=[...],
process=Process.sequential,
memory=True,
embedder={
"provider": "bedrock",
"config":{
"session": boto3_session,
"model": "amazon.titan-embed-text-v2:0",
"vector_dimension": 1024
}
}
verbose=True
)
```
### Adding Custom Embedding Function
```python Code

View File

@@ -268,7 +268,7 @@ analysis_task = Task(
Task guardrails provide a way to validate and transform task outputs before they
are passed to the next task. This feature helps ensure data quality and provides
feedback to agents when their output doesn't meet specific criteria.
efeedback to agents when their output doesn't meet specific criteria.
### Using Task Guardrails

View File

@@ -1,3 +0,0 @@
[pytest]
markers =
agentops: Tests for AgentOps integration

View File

@@ -1,6 +1,5 @@
import asyncio
import json
import os
import re
import uuid
import warnings
@@ -55,11 +54,8 @@ from crewai.utilities.training_handler import CrewTrainingHandler
try:
import agentops # type: ignore
from agentops.exceptions import AgentOpsError, AuthenticationError # type: ignore
except ImportError:
agentops = None
AgentOpsError = None
AuthenticationError = None
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
@@ -94,8 +90,6 @@ class Crew(BaseModel):
__hash__ = object.__hash__ # type: ignore
_execution_span: Any = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr()
_agentops: Optional['agentops.AgentOps'] = PrivateAttr(default=None)
_telemetry: Optional[Telemetry] = PrivateAttr(default=None)
_logger: Logger = PrivateAttr()
_file_handler: FileHandler = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
@@ -246,72 +240,19 @@ class Crew(BaseModel):
# TODO: Improve typing
return json.loads(v) if isinstance(v, Json) else v # type: ignore
def _validate_api_key(self, api_key: Optional[str]) -> bool:
"""Validate the AgentOps API key.
Args:
api_key: The API key to validate
Returns:
bool: True if the API key is valid, False otherwise
"""
if not api_key:
return False
stripped_key = api_key.strip()
return bool(stripped_key and len(stripped_key) > 10)
@model_validator(mode="after")
def set_private_attrs(self) -> "Crew":
"""Initialize private attributes including AgentOps integration.
This method sets up:
- Logger and file handler for output logging
- RPM controller for rate limiting
- AgentOps integration for monitoring (if available and configured)
"""
"""Set private attributes."""
self._cache_handler = CacheHandler()
self._logger = Logger(verbose=self.verbose)
if self.output_log_file:
self._file_handler = FileHandler(self.output_log_file)
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
self._telemetry = Telemetry()
self._telemetry.set_tracer()
# Initialize agentops if available and API key is present
if agentops:
api_key = os.getenv("AGENTOPS_API_KEY")
if self._validate_api_key(api_key):
try:
agentops.init(api_key)
self._agentops = agentops
self._logger.log(
"info",
"Successfully initialized agentops",
color="green"
)
except (ConnectionError, AuthenticationError) as e:
self._logger.log(
"warning",
f"Failed to connect to agentops: {e}",
color="yellow"
)
self._agentops = None
except (ValueError, AgentOpsError) as e:
self._logger.log(
"warning",
f"Invalid agentops configuration: {e}",
color="yellow"
)
self._agentops = None
else:
self._logger.log(
"warning",
"Invalid AGENTOPS_API_KEY provided",
color="yellow"
)
self._agentops = None
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = create_llm(self.function_calling_llm)
self._telemetry = Telemetry()
self._telemetry.set_tracer()
return self
@model_validator(mode="after")
@@ -602,8 +543,7 @@ class Crew(BaseModel):
inputs = before_callback(inputs)
"""Starts the crew to work on its assigned tasks."""
if self._telemetry:
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
self._task_output_handler.reset()
self._logging_color = "bold_purple"
@@ -1181,22 +1121,16 @@ class Crew(BaseModel):
for agent in self.agents:
agent.interpolate_inputs(inputs)
def _finish_execution(self, final_output: Union[str, CrewOutput]) -> None:
"""Finish execution and cleanup.
Args:
final_output: The final output from crew execution, either as string or CrewOutput
"""
def _finish_execution(self, final_string_output: str) -> None:
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
if self._telemetry:
self._telemetry.end_crew(self, final_output)
if self._agentops:
self._agentops.end_session(
if agentops:
agentops.end_session(
end_state="Success",
end_state_reason="Finished Execution",
is_auto_end=True,
)
self._telemetry.end_crew(self, final_string_output)
def calculate_usage_metrics(self) -> UsageMetrics:
"""Calculates and returns the usage metrics."""
@@ -1214,19 +1148,31 @@ class Crew(BaseModel):
def test(
self,
n_iterations: int,
llm: Optional[Union[str, LLM]] = None,
openai_model_name: Optional[str] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
"""Test and evaluate the Crew with the given inputs for n iterations concurrently.
Args:
n_iterations: Number of test iterations to run
llm: LLM instance or model name string to use for evaluation
openai_model_name: (Deprecated) OpenAI model name string (kept for backward compatibility)
inputs: Optional dictionary of inputs to pass to each test iteration
"""
test_crew = self.copy()
model = llm or openai_model_name
if model is None:
raise ValueError("Either llm or openai_model_name must be provided")
self._test_execution_span = test_crew._telemetry.test_execution_span(
test_crew,
n_iterations,
inputs,
openai_model_name, # type: ignore[arg-type]
) # type: ignore[arg-type]
evaluator = CrewEvaluator(test_crew, openai_model_name) # type: ignore[arg-type]
str(model) if isinstance(model, LLM) else model,
)
evaluator = CrewEvaluator(test_crew, model)
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)

View File

@@ -1,138 +1,28 @@
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Union
from urllib.parse import urlparse
from typing import Dict, List
from pydantic import Field, field_validator
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
class ExcelKnowledgeSource(BaseKnowledgeSource):
class ExcelKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries Excel file content using embeddings."""
# override content to be a dict of file paths to sheet names to csv content
_logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default=None,
description="[Deprecated] The path to the file. Use file_paths instead.",
)
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default_factory=list, description="The path to the file"
)
chunks: List[str] = Field(default_factory=list)
content: Dict[Path, Dict[str, str]] = Field(default_factory=dict)
safe_file_paths: List[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
def validate_file_path(cls, v, info):
"""Validate that at least one of file_path or file_paths is provided."""
# Single check if both are None, O(1) instead of nested conditions
if (
v is None
and info.data.get(
"file_path" if info.field_name == "file_paths" else "file_paths"
)
is None
):
raise ValueError("Either file_path or file_paths must be provided")
return v
def _process_file_paths(self) -> List[Path]:
"""Convert file_path to a list of Path objects."""
if hasattr(self, "file_path") and self.file_path is not None:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
if self.file_paths is None:
raise ValueError("Your source must be provided with a file_paths: []")
# Convert single path to list
path_list: List[Union[Path, str]] = (
[self.file_paths]
if isinstance(self.file_paths, (str, Path))
else list(self.file_paths)
if isinstance(self.file_paths, list)
else []
)
if not path_list:
raise ValueError(
"file_path/file_paths must be a Path, str, or a list of these types"
)
return [self.convert_to_path(path) for path in path_list]
def validate_content(self):
"""Validate the paths."""
for path in self.safe_file_paths:
if not path.exists():
self._logger.log(
"error",
f"File not found: {path}. Try adding sources to the knowledge directory. If it's inside the knowledge directory, use the relative path.",
color="red",
)
raise FileNotFoundError(f"File not found: {path}")
if not path.is_file():
self._logger.log(
"error",
f"Path is not a file: {path}",
color="red",
)
def model_post_init(self, _) -> None:
if self.file_path:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
self.safe_file_paths = self._process_file_paths()
self.validate_content()
self.content = self._load_content()
def _load_content(self) -> Dict[Path, Dict[str, str]]:
"""Load and preprocess Excel file content from multiple sheets.
Each sheet's content is converted to CSV format and stored.
Returns:
Dict[Path, Dict[str, str]]: A mapping of file paths to their respective sheet contents.
Raises:
ImportError: If required dependencies are missing.
FileNotFoundError: If the specified Excel file cannot be opened.
"""
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess Excel file content."""
pd = self._import_dependencies()
content_dict = {}
for file_path in self.safe_file_paths:
file_path = self.convert_to_path(file_path)
with pd.ExcelFile(file_path) as xl:
sheet_dict = {
str(sheet_name): str(
pd.read_excel(xl, sheet_name).to_csv(index=False)
)
for sheet_name in xl.sheet_names
}
content_dict[file_path] = sheet_dict
df = pd.read_excel(file_path)
content = df.to_csv(index=False)
content_dict[file_path] = content
return content_dict
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
def _import_dependencies(self):
"""Dynamically import dependencies."""
try:
import openpyxl # noqa
import pandas as pd
return pd
@@ -148,14 +38,10 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
and save the embeddings.
"""
# Convert dictionary values to a single string if content is a dictionary
# Updated to account for .xlsx workbooks with multiple tabs/sheets
content_str = ""
for value in self.content.values():
if isinstance(value, dict):
for sheet_value in value.values():
content_str += str(sheet_value) + "\n"
else:
content_str += str(value) + "\n"
if isinstance(self.content, dict):
content_str = "\n".join(str(value) for value in self.content.values())
else:
content_str = str(self.content)
new_chunks = self._chunk_text(content_str)
self.chunks.extend(new_chunks)

View File

@@ -1,4 +1,5 @@
from collections import defaultdict
from typing import Union
from pydantic import BaseModel, Field
from rich.box import HEAVY_EDGE
@@ -6,6 +7,7 @@ from rich.console import Console
from rich.table import Table
from crewai.agent import Agent
from crewai.llm import LLM
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
@@ -32,9 +34,15 @@ class CrewEvaluator:
run_execution_times: defaultdict = defaultdict(list)
iteration: int = 0
def __init__(self, crew, openai_model_name: str):
def __init__(self, crew, llm: Union[str, LLM]):
"""Initialize the CrewEvaluator.
Args:
crew: The crew to evaluate
llm: LLM instance or model name string to use for evaluation
"""
self.crew = crew
self.openai_model_name = openai_model_name
self.llm = llm if isinstance(llm, LLM) else LLM(model=llm)
self._telemetry = Telemetry()
self._setup_for_evaluating()
@@ -51,7 +59,7 @@ class CrewEvaluator:
),
backstory="Evaluator agent for crew evaluation with precise capabilities to evaluate the performance of the agents in the crew based on the tasks they have performed",
verbose=False,
llm=self.openai_model_name,
llm=self.llm,
)
def _evaluation_task(
@@ -95,9 +103,20 @@ class CrewEvaluator:
│ Execution Time (s) │ 42 │ 79 │ 52 │ 57 │ │
└────────────────────┴───────┴───────┴───────┴────────────┴──────────────────────────────┘
"""
# Handle empty task scores
if not self.tasks_scores:
return
task_scores_list = list(zip(*self.tasks_scores.values()))
if not task_scores_list:
return
task_averages = [
sum(scores) / len(scores) for scores in zip(*self.tasks_scores.values())
sum(scores) / len(scores) for scores in task_scores_list
]
if not task_averages:
return
crew_average = sum(task_averages) / len(task_averages)
table = Table(title="Tasks Scores \n (1-10 Higher is better)", box=HEAVY_EDGE)
@@ -177,11 +196,12 @@ class CrewEvaluator:
evaluation_result = evaluation_task.execute_sync()
if isinstance(evaluation_result.pydantic, TaskEvaluationPydanticOutput):
model_name = str(self.llm) if isinstance(self.llm, LLM) else self.llm
self._test_result_span = self._telemetry.individual_test_result_span(
self.crew,
evaluation_result.pydantic.quality,
current_task.execution_duration,
self.openai_model_name,
model_name,
)
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
self.run_execution_times[self.iteration].append(

View File

@@ -2,11 +2,11 @@
import hashlib
import json
from collections import defaultdict
from concurrent.futures import Future
from unittest import mock
from unittest.mock import MagicMock, patch
import agentops
import instructor
import pydantic_core
import pytest
@@ -26,102 +26,15 @@ from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import Logger
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
@pytest.fixture
def researcher():
"""Fixture to create a researcher agent."""
return Agent(
role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups.",
allow_delegation=False,
)
@pytest.fixture
def mock_agentops():
"""Fixture to mock agentops for testing."""
mock_agentops = MagicMock()
mock_agentops.init = MagicMock()
return mock_agentops
@pytest.mark.agentops
class TestAgentOpsIntegration:
"""Tests for AgentOps integration."""
def test_initialization_with_api_key(self, mock_agentops, monkeypatch):
"""Test that agentops is properly initialized when API key is present."""
monkeypatch.setattr("crewai.crew.agentops", mock_agentops)
monkeypatch.setenv("AGENTOPS_API_KEY", "test-key-12345")
crew = Crew(agents=[researcher], tasks=[Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)])
crew.set_private_attrs()
mock_agentops.init.assert_called_once_with("test-key-12345")
def test_initialization_without_api_key(self, mock_agentops):
"""Test that agentops is not initialized when API key is not present."""
crew = Crew(agents=[researcher], tasks=[Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)])
mock_agentops.assert_not_called()
def test_initialization_with_invalid_api_key(self, mock_agentops, monkeypatch):
"""Test that agentops is not initialized when API key is invalid."""
monkeypatch.setenv("AGENTOPS_API_KEY", " ")
crew = Crew(agents=[researcher], tasks=[Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)])
mock_agentops.assert_not_called()
def test_gemini_llm_integration(self, mock_agentops, monkeypatch):
"""Test that Gemini LLM works correctly with agentops."""
# Mock agentops
monkeypatch.setattr("crewai.crew.agentops", mock_agentops)
# Set API keys
monkeypatch.setenv("AGENTOPS_API_KEY", "test-key-12345")
monkeypatch.setenv("GOOGLE_API_KEY", "test-key")
# Create crew with Gemini LLM
llm = LLM(model="gemini-pro")
agent = Agent(
role="test",
goal="test",
backstory="test",
llm=llm
)
task = Task(
description="test task",
expected_output="test output",
agent=agent
)
crew = Crew(agents=[agent], tasks=[task])
crew.set_private_attrs()
# Mock the agent execution to avoid actual API calls
with patch.object(Task, 'execute_sync', return_value=TaskOutput(
description="test",
raw="test output",
agent=agent.role
)):
# Run crew
crew.kickoff()
# Verify agentops.end_session was called correctly
mock_agentops.end_session.assert_called_once_with(
end_state="Success",
end_state_reason="Finished Execution",
is_auto_end=True
)
def crew_evaluator():
evaluator = mock.MagicMock(spec=CrewEvaluator)
evaluator.print_crew_evaluation_result = mock.MagicMock()
return evaluator
ceo = Agent(
role="CEO",
@@ -147,7 +60,6 @@ writer = Agent(
def test_crew_with_only_conditional_tasks_raises_error():
"""Test that creating a crew with only conditional tasks raises an error."""
def condition_func(task_output: TaskOutput) -> bool:
return True
@@ -179,7 +91,6 @@ def test_crew_with_only_conditional_tasks_raises_error():
tasks=[conditional1, conditional2, conditional3],
)
def test_crew_config_conditional_requirement():
with pytest.raises(ValueError):
Crew(process=Process.sequential)
@@ -687,12 +598,12 @@ def test_crew_with_delegating_agents_should_not_override_task_tools():
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
assert any(
isinstance(tool, TestTool) for tool in tools
), "TestTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in tools
), "Delegation tool should be present"
assert any(isinstance(tool, TestTool) for tool in tools), (
"TestTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in tools), (
"Delegation tool should be present"
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -751,12 +662,12 @@ def test_crew_with_delegating_agents_should_not_override_agent_tools():
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
assert any(
isinstance(tool, TestTool) for tool in new_ceo.tools
), "TestTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in tools
), "Delegation tool should be present"
assert any(isinstance(tool, TestTool) for tool in new_ceo.tools), (
"TestTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in tools), (
"Delegation tool should be present"
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -880,17 +791,17 @@ def test_task_tools_override_agent_tools_with_allow_delegation():
used_tools = kwargs["tools"]
# Confirm AnotherTestTool is present but TestTool is not
assert any(
isinstance(tool, AnotherTestTool) for tool in used_tools
), "AnotherTestTool should be present"
assert not any(
isinstance(tool, TestTool) for tool in used_tools
), "TestTool should not be present among used tools"
assert any(isinstance(tool, AnotherTestTool) for tool in used_tools), (
"AnotherTestTool should be present"
)
assert not any(isinstance(tool, TestTool) for tool in used_tools), (
"TestTool should not be present among used tools"
)
# Confirm delegation tool(s) are present
assert any(
"delegate" in tool.name.lower() for tool in used_tools
), "Delegation tool should be present"
assert any("delegate" in tool.name.lower() for tool in used_tools), (
"Delegation tool should be present"
)
# Finally, make sure the agent's original tools remain unchanged
assert len(researcher_with_delegation.tools) == 1
@@ -1691,9 +1602,9 @@ def test_code_execution_flag_adds_code_tool_upon_kickoff():
# Verify that exactly one tool was used and it was a CodeInterpreterTool
assert len(used_tools) == 1, "Should have exactly one tool"
assert isinstance(
used_tools[0], CodeInterpreterTool
), "Tool should be CodeInterpreterTool"
assert isinstance(used_tools[0], CodeInterpreterTool), (
"Tool should be CodeInterpreterTool"
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -2050,7 +1961,6 @@ def test_task_callback_on_crew():
def test_task_callback_both_on_task_and_crew():
from unittest.mock import MagicMock, patch
mock_callback_on_task = MagicMock()
mock_callback_on_crew = MagicMock()
@@ -2200,22 +2110,21 @@ def test_conditional_task_uses_last_output():
expected_output="First output",
agent=researcher,
)
def condition_fails(task_output: TaskOutput) -> bool:
# This condition will never be met
return "never matches" in task_output.raw.lower()
def condition_succeeds(task_output: TaskOutput) -> bool:
# This condition will match first task's output
return "first success" in task_output.raw.lower()
conditional_task1 = ConditionalTask(
description="Second task - conditional that fails condition",
expected_output="Second output",
agent=researcher,
condition=condition_fails,
)
conditional_task2 = ConditionalTask(
description="Third task - conditional that succeeds using first task output",
expected_output="Third output",
@@ -2234,37 +2143,35 @@ def test_conditional_task_uses_last_output():
raw="First success output", # Will be used by third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Second task output",
raw="", # Empty output since condition fails
agent=researcher.role,
)
mock_third = TaskOutput(
description="Third task output",
raw="Third task executed", # Output when condition succeeds using first task output
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_first, mock_third]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify outputs collection:
# First executed task output, followed by an automatically generated (skipped) output, then the conditional execution
# Verify outputs collection
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "First success output"
) # First task succeeded
assert (
result.tasks_output[1].raw == ""
) # Second task skipped (condition failed)
assert (
result.tasks_output[2].raw == "Third task executed"
) # Third task used first task's output
assert result.tasks_output[0].raw == "First success output" # First task succeeded
assert result.tasks_output[1].raw == "" # Second task skipped (condition failed)
assert result.tasks_output[2].raw == "Third task executed" # Third task used first task's output
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_tasks_result_collection():
@@ -2274,20 +2181,20 @@ def test_conditional_tasks_result_collection():
expected_output="First output",
agent=researcher,
)
def condition_never_met(task_output: TaskOutput) -> bool:
return "never matches" in task_output.raw.lower()
def condition_always_met(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
task2 = ConditionalTask(
description="Conditional task that never executes",
expected_output="Second output",
agent=researcher,
condition=condition_never_met,
)
task3 = ConditionalTask(
description="Conditional task that always executes",
expected_output="Third output",
@@ -2306,46 +2213,35 @@ def test_conditional_tasks_result_collection():
raw="Success output", # Triggers third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Skipped output",
raw="", # Empty output for skipped task
agent=researcher.role,
)
mock_conditional = TaskOutput(
description="Conditional output",
raw="Conditional task executed",
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_success, mock_conditional]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify task output collection:
# There should be three outputs: normal task, skipped conditional task (empty output),
# and the conditional task that executed.
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
# Verify task output collection
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
assert result.tasks_output[0].raw == "Success output" # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert result.tasks_output[2].raw == "Conditional task executed" # Third task executed
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_conditional_tasks():
@@ -2355,20 +2251,20 @@ def test_multiple_conditional_tasks():
expected_output="Research output",
agent=researcher,
)
def condition1(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
def condition2(task_output: TaskOutput) -> bool:
return "proceed" in task_output.raw.lower()
task2 = ConditionalTask(
description="First conditional task",
expected_output="Conditional output 1",
agent=writer,
condition=condition1,
)
task3 = ConditionalTask(
description="Second conditional task",
expected_output="Conditional output 2",
@@ -2387,7 +2283,7 @@ def test_multiple_conditional_tasks():
raw="Success and proceed output",
agent=researcher.role,
)
# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
@@ -2395,7 +2291,6 @@ def test_multiple_conditional_tasks():
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch
@@ -3453,6 +3348,56 @@ def test_crew_testing_function(kickoff_mock, copy_mock, crew_evaluator):
]
)
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch("crewai.crew.Crew.copy")
@mock.patch("crewai.crew.Crew.kickoff")
def test_crew_testing_with_llm_instance(kickoff_mock, copy_mock, evaluator_mock):
task = Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
llm = LLM(model="gpt-4")
# Create a mock for the copied crew
copy_mock.return_value = crew
# Create a mock evaluator instance with required methods
mock_evaluator = mock.MagicMock()
mock_evaluator.set_iteration = mock.MagicMock()
mock_evaluator.evaluate = mock.MagicMock()
mock_evaluator.print_crew_evaluation_result = mock.MagicMock()
# Set up the mock class to track constructor calls and return our mock instance
evaluator_mock.side_effect = lambda crew_arg, model_arg: mock_evaluator
# Run the test
crew.test(n_iterations=2, llm=llm)
# Verify the evaluator was used correctly
kickoff_mock.assert_has_calls([
mock.call(inputs=None),
mock.call(inputs=None)
])
# Verify CrewEvaluator was instantiated with the LLM instance
evaluator_mock.assert_called_once_with(crew, llm)
# Verify print_crew_evaluation_result was called
mock_evaluator.print_crew_evaluation_result.assert_called_once()
def test_crew_testing_with_missing_model():
crew = Crew(agents=[researcher], tasks=[Task(
description="Test task",
expected_output="Test output",
agent=researcher,
)])
with pytest.raises(ValueError, match="Either llm or openai_model_name must be provided"):
crew.test(n_iterations=2)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_hierarchical_verbose_manager_agent():
@@ -3514,9 +3459,9 @@ def test_fetch_inputs():
expected_placeholders = {"role_detail", "topic", "field"}
actual_placeholders = crew.fetch_inputs()
assert (
actual_placeholders == expected_placeholders
), f"Expected {expected_placeholders}, but got {actual_placeholders}"
assert actual_placeholders == expected_placeholders, (
f"Expected {expected_placeholders}, but got {actual_placeholders}"
)
def test_task_tools_preserve_code_execution_tools():
@@ -3589,20 +3534,20 @@ def test_task_tools_preserve_code_execution_tools():
used_tools = kwargs["tools"]
# Verify all expected tools are present
assert any(
isinstance(tool, TestTool) for tool in used_tools
), "Task's TestTool should be present"
assert any(
isinstance(tool, CodeInterpreterTool) for tool in used_tools
), "CodeInterpreterTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in used_tools
), "Delegation tool should be present"
assert any(isinstance(tool, TestTool) for tool in used_tools), (
"Task's TestTool should be present"
)
assert any(isinstance(tool, CodeInterpreterTool) for tool in used_tools), (
"CodeInterpreterTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in used_tools), (
"Delegation tool should be present"
)
# Verify the total number of tools (TestTool + CodeInterpreter + 2 delegation tools)
assert (
len(used_tools) == 4
), "Should have TestTool, CodeInterpreter, and 2 delegation tools"
assert len(used_tools) == 4, (
"Should have TestTool, CodeInterpreter, and 2 delegation tools"
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -3646,9 +3591,9 @@ def test_multimodal_flag_adds_multimodal_tools():
used_tools = kwargs["tools"]
# Check that the multimodal tool was added
assert any(
isinstance(tool, AddImageTool) for tool in used_tools
), "AddImageTool should be present when agent is multimodal"
assert any(isinstance(tool, AddImageTool) for tool in used_tools), (
"AddImageTool should be present when agent is multimodal"
)
# Verify we have exactly one tool (just the AddImageTool)
assert len(used_tools) == 1, "Should only have the AddImageTool"
@@ -3874,9 +3819,9 @@ def test_crew_guardrail_feedback_in_context():
assert len(execution_contexts) > 1, "Task should have been executed multiple times"
# Verify that the second execution included the guardrail feedback
assert (
"Output must contain the keyword 'IMPORTANT'" in execution_contexts[1]
), "Guardrail feedback should be included in retry context"
assert "Output must contain the keyword 'IMPORTANT'" in execution_contexts[1], (
"Guardrail feedback should be included in retry context"
)
# Verify final output meets guardrail requirements
assert "IMPORTANT" in result.raw, "Final output should contain required keyword"