Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
bbf012e800 Fix lint issues: remove unused imports
- Remove unused 'os' import from reproduce_issue.py
- Remove unused imports (os, Agent, Crew, Task) from test_mem0_storage.py
- Addresses lint check failure in CI

Co-Authored-By: Jo\u00E3o <joao@crewai.com>
2025-07-14 05:13:00 +00:00
Devin AI
46ad20b9f6 Fix mem0 external memory format issue #3152
- Convert string values to proper message format for mem0 API
- mem0 API expects messages as list of objects with role/content fields
- Add comprehensive tests for external memory type and message formatting
- Add reproduction script to verify the fix works
- Resolves 'Expected a list of items but got type str' error

Co-Authored-By: Jo\u00E3o <joao@crewai.com>
2025-07-14 05:09:59 +00:00
66 changed files with 3629 additions and 6093 deletions

3
.gitignore vendored
View File

@@ -26,5 +26,4 @@ test_flow.html
crewairules.mdc
plan.md
conceptual_plan.md
build_image
chromadb-*.lock
build_image

View File

@@ -9,7 +9,12 @@
},
"favicon": "/images/favicon.svg",
"contextual": {
"options": ["copy", "view", "chatgpt", "claude"]
"options": [
"copy",
"view",
"chatgpt",
"claude"
]
},
"navigation": {
"languages": [
@@ -50,22 +55,32 @@
"groups": [
{
"group": "Get Started",
"pages": ["en/introduction", "en/installation", "en/quickstart"]
"pages": [
"en/introduction",
"en/installation",
"en/quickstart"
]
},
{
"group": "Guides",
"pages": [
{
"group": "Strategy",
"pages": ["en/guides/concepts/evaluating-use-cases"]
"pages": [
"en/guides/concepts/evaluating-use-cases"
]
},
{
"group": "Agents",
"pages": ["en/guides/agents/crafting-effective-agents"]
"pages": [
"en/guides/agents/crafting-effective-agents"
]
},
{
"group": "Crews",
"pages": ["en/guides/crews/first-crew"]
"pages": [
"en/guides/crews/first-crew"
]
},
{
"group": "Flows",
@@ -79,6 +94,7 @@
"pages": [
"en/guides/advanced/customizing-prompts",
"en/guides/advanced/fingerprinting"
]
}
]
@@ -225,7 +241,6 @@
"en/observability/langtrace",
"en/observability/maxim",
"en/observability/mlflow",
"en/observability/neatlogs",
"en/observability/openlit",
"en/observability/opik",
"en/observability/patronus-evaluation",
@@ -259,7 +274,9 @@
},
{
"group": "Telemetry",
"pages": ["en/telemetry"]
"pages": [
"en/telemetry"
]
}
]
},
@@ -268,7 +285,9 @@
"groups": [
{
"group": "Getting Started",
"pages": ["en/enterprise/introduction"]
"pages": [
"en/enterprise/introduction"
]
},
{
"group": "Features",
@@ -323,7 +342,9 @@
},
{
"group": "Resources",
"pages": ["en/enterprise/resources/frequently-asked-questions"]
"pages": [
"en/enterprise/resources/frequently-asked-questions"
]
}
]
},
@@ -332,7 +353,9 @@
"groups": [
{
"group": "Getting Started",
"pages": ["en/api-reference/introduction"]
"pages": [
"en/api-reference/introduction"
]
},
{
"group": "Endpoints",
@@ -342,13 +365,16 @@
},
{
"tab": "Examples",
"groups": [
"groups": [
{
"group": "Examples",
"pages": ["en/examples/example"]
"pages": [
"en/examples/example"
]
}
]
}
]
},
{
@@ -399,15 +425,21 @@
"pages": [
{
"group": "Estratégia",
"pages": ["pt-BR/guides/concepts/evaluating-use-cases"]
"pages": [
"pt-BR/guides/concepts/evaluating-use-cases"
]
},
{
"group": "Agentes",
"pages": ["pt-BR/guides/agents/crafting-effective-agents"]
"pages": [
"pt-BR/guides/agents/crafting-effective-agents"
]
},
{
"group": "Crews",
"pages": ["pt-BR/guides/crews/first-crew"]
"pages": [
"pt-BR/guides/crews/first-crew"
]
},
{
"group": "Flows",
@@ -600,7 +632,9 @@
},
{
"group": "Telemetria",
"pages": ["pt-BR/telemetry"]
"pages": [
"pt-BR/telemetry"
]
}
]
},
@@ -609,7 +643,9 @@
"groups": [
{
"group": "Começando",
"pages": ["pt-BR/enterprise/introduction"]
"pages": [
"pt-BR/enterprise/introduction"
]
},
{
"group": "Funcionalidades",
@@ -674,7 +710,9 @@
"groups": [
{
"group": "Começando",
"pages": ["pt-BR/api-reference/introduction"]
"pages": [
"pt-BR/api-reference/introduction"
]
},
{
"group": "Endpoints",
@@ -684,13 +722,16 @@
},
{
"tab": "Exemplos",
"groups": [
"groups": [
{
"group": "Exemplos",
"pages": ["pt-BR/examples/example"]
"pages": [
"pt-BR/examples/example"
]
}
]
}
]
}
]

View File

@@ -172,60 +172,6 @@ def another_agent(self):
)
```
### Using Multiple MCP Servers with CrewBase
You can configure multiple MCP servers and assign different servers and tools to different agents. Use a dictionary to pass multiple named MCP servers.
```python
@CrewBase
class CrewWithMultipleMCP:
# ... define your agents and tasks config file ...
# MCP servers keyed by server name
mcp_server_params = {
"web_tools": {
"url": "http://localhost:8000/mcp",
"transport": "streamable-http"
},
"data_tools": {
"url":
"http://localhost:8001/sse",
"transport": "sse"
},
"local_tools": StdioServerParameters(
command="python3",
args=["servers/local_server.py"],
env={"UV_PYTHON": "3.12", **os.environ},
)
}
@agent
def web_researcher(self):
# Use tools from specific server
return Agent(
config=self.agents_config["web_researcher"],
tools=self.get_mcp_tools(server="web_tools")
)
@agent
def data_analyst(self):
# Use specific tools from specific server
return Agent(
config=self.agents_config["data_analyst"],
tools=self.get_mcp_tools("analyze_csv", "create_chart", server="data_tools")
)
@agent
def multi_tool_agent(self):
# Use tools from all servers
return Agent(
config=self.agents_config["multi_tool_agent"],
tools=self.get_mcp_tools() # No server specified = all tools
)
# ... rest of your crew setup ...
```
## Explore MCP Integrations
<CardGroup cols={2}>

View File

@@ -1,134 +0,0 @@
---
title: Neatlogs Integration
description: Understand, debug, and share your CrewAI agent runs
icon: magnifying-glass-chart
---
# Introduction
Neatlogs helps you **see what your agent did**, **why**, and **share it**.
It captures every step: thoughts, tool calls, responses, evaluations. No raw logs. Just clear, structured traces. Great for debugging and collaboration.
## Why use Neatlogs?
CrewAI agents use multiple tools and reasoning steps. When something goes wrong, you need context — not just errors.
Neatlogs lets you:
- Follow the full decision path
- Add feedback directly on steps
- Chat with the trace using AI assistant
- Share runs publicly for feedback
- Turn insights into tasks
All in one place.
Manage your traces effortlessly
![Traces](/images/neatlogs-1.png)
![Trace Response](/images/neatlogs-2.png)
The best UX to view a CrewAI trace. Post comments anywhere you want. Use AI to debug.
![Trace Details](/images/neatlogs-3.png)
![Ai Chat Bot With A Trace](/images/neatlogs-4.png)
![Comments Drawer](/images/neatlogs-5.png)
## Core Features
- **Trace Viewer**: Track thoughts, tools, and decisions in sequence
- **Inline Comments**: Tag teammates on any trace step
- **Feedback & Evaluation**: Mark outputs as correct or incorrect
- **Error Highlighting**: Automatic flagging of API/tool failures
- **Task Conversion**: Convert comments into assigned tasks
- **Ask the Trace (AI)**: Chat with your trace using Neatlogs AI bot
- **Public Sharing**: Publish trace links to your community
## Quick Setup with CrewAI
<Steps>
<Step title="Sign Up & Get API Key">
Visit [neatlogs.com](https://neatlogs.com/?utm_source=crewAI-docs), create a project, copy the API key.
</Step>
<Step title="Install SDK">
```bash
pip install neatlogs
```
(Latest version 0.8.0, Python 3.8+; MIT license)
</Step>
<Step title="Initialize Neatlogs">
Before starting Crew agents, add:
```python
import neatlogs
neatlogs.init("YOUR_PROJECT_API_KEY")
```
Agents run as usual. Neatlogs captures everything automatically.
</Step>
</Steps>
## Under the Hood
According to GitHub, Neatlogs:
- Captures thoughts, tool calls, responses, errors, and token stats
- Supports AI-powered task generation and robust evaluation workflows
All with just two lines of code.
## Watch It Work
### 🔍 Full Demo (4min)
<iframe
width="100%"
height="315"
src="https://www.youtube.com/embed/8KDme9T2I7Q?si=b8oHteaBwFNs_Duk"
title="YouTube video player"
frameBorder="0"
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture"
allowFullScreen
></iframe>
### ⚙️ CrewAI Integration (30s)
<iframe
className="w-full aspect-video rounded-xl"
src="https://www.loom.com/embed/9c78b552af43452bb3e4783cb8d91230?sid=e9d7d370-a91a-49b0-809e-2f375d9e801d"
title="Loom video player"
frameBorder="0"
allowFullScreen
></iframe>
## Links & Support
- 📘 [Neatlogs Docs](https://docs.neatlogs.com/)
- 🔐 [Dashboard & API Key](https://app.neatlogs.com/)
- 🐦 [Follow on Twitter](https://twitter.com/neatlogs)
- 📧 Contact: hello@neatlogs.com
- 🛠 [GitHub SDK](https://github.com/NeatLogs/neatlogs)
## TL;DR
With just:
```bash
pip install neatlogs
import neatlogs
neatlogs.init("YOUR_API_KEY")
You can now capture, understand, share, and act on your CrewAI agent runs in seconds.
No setup overhead. Full trace transparency. Full team collaboration.
```

Binary file not shown.

Before

Width:  |  Height:  |  Size: 222 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 329 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 590 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 216 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 277 KiB

View File

@@ -39,7 +39,6 @@ dependencies = [
"tomli>=2.0.2",
"blinker>=1.9.0",
"json5>=0.10.0",
"portalocker==2.7.0",
]
[project.urls]
@@ -48,7 +47,7 @@ Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools~=0.55.0"]
tools = ["crewai-tools~=0.51.0"]
embeddings = [
"tiktoken~=0.8.0"
]

54
reproduce_issue.py Normal file
View File

@@ -0,0 +1,54 @@
"""
Reproduction script for issue #3152 - mem0 external memory format error
Based on the code provided in the GitHub issue
"""
from crewai import Agent, Task, Crew
from crewai.memory.external.external_memory import ExternalMemory
def test_mem0_external_memory():
"""Test that reproduces the mem0 external memory format error"""
embedder_config = {
"provider": "mem0",
"config": {
"user_id": "test_user_123",
}
}
external_memory = ExternalMemory(embedder_config=embedder_config)
agent = Agent(
role="Test Agent",
goal="Test external memory functionality",
backstory="A test agent for reproducing the mem0 issue",
verbose=True
)
task = Task(
description="Test task for external memory",
expected_output="Test output",
agent=agent
)
crew = Crew(
agents=[agent],
tasks=[task],
external_memory=external_memory,
verbose=True
)
print("Testing mem0 external memory integration...")
try:
result = crew.kickoff()
print("SUCCESS: External memory integration worked!")
print(f"Result: {result}")
except Exception as e:
print(f"ERROR: {e}")
if "Expected a list of items but got type" in str(e):
print("CONFIRMED: This is the mem0 format error from issue #3152")
raise
if __name__ == "__main__":
test_mem0_external_memory()

View File

@@ -54,7 +54,7 @@ def _track_install_async():
_track_install_async()
__version__ = "0.148.0"
__version__ = "0.141.0"
__all__ = [
"Agent",
"Crew",

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.148.0,<1.0.0"
"crewai[tools]>=0.141.0,<1.0.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.148.0,<1.0.0",
"crewai[tools]>=0.141.0,<1.0.0",
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.148.0"
"crewai[tools]>=0.141.0"
]
[tool.crewai]

View File

@@ -1313,6 +1313,7 @@ class Crew(FlowTrackable, BaseModel):
n_iterations: int,
eval_llm: Union[str, InstanceOf[BaseLLM]],
inputs: Optional[Dict[str, Any]] = None,
include_agent_eval: Optional[bool] = False
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
try:
@@ -1332,13 +1333,28 @@ class Crew(FlowTrackable, BaseModel):
)
test_crew = self.copy()
# TODO: Refator to use a single Evaluator Manage class
evaluator = CrewEvaluator(test_crew, llm_instance)
if include_agent_eval:
from crewai.evaluation import create_default_evaluator
agent_evaluator = create_default_evaluator(crew=test_crew)
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)
if include_agent_eval:
agent_evaluator.set_iteration(i)
test_crew.kickoff(inputs=inputs)
# TODO: Refactor to use ListenerEvents instead of trigger each iteration manually
if include_agent_eval:
agent_evaluator.evaluate_current_iteration()
evaluator.print_crew_evaluation_result()
if include_agent_eval:
agent_evaluator.get_agent_evaluation(include_evaluation_feedback=True)
crewai_event_bus.emit(
self,

View File

@@ -1,35 +1,40 @@
from crewai.experimental.evaluation.base_evaluator import (
from crewai.evaluation.base_evaluator import (
BaseEvaluator,
EvaluationScore,
MetricCategory,
AgentEvaluationResult
)
from crewai.experimental.evaluation.metrics import (
SemanticQualityEvaluator,
GoalAlignmentEvaluator,
ReasoningEfficiencyEvaluator,
from crewai.evaluation.metrics.semantic_quality_metrics import (
SemanticQualityEvaluator
)
from crewai.evaluation.metrics.goal_metrics import (
GoalAlignmentEvaluator
)
from crewai.evaluation.metrics.reasoning_metrics import (
ReasoningEfficiencyEvaluator
)
from crewai.evaluation.metrics.tools_metrics import (
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.experimental.evaluation.evaluation_listener import (
from crewai.evaluation.evaluation_listener import (
EvaluationTraceCallback,
create_evaluation_callbacks
)
from crewai.experimental.evaluation.agent_evaluator import (
from crewai.evaluation.agent_evaluator import (
AgentEvaluator,
create_default_evaluator
)
from crewai.experimental.evaluation.experiment import (
ExperimentRunner,
ExperimentResults,
ExperimentResult
)
__all__ = [
"BaseEvaluator",
"EvaluationScore",
@@ -44,8 +49,5 @@ __all__ = [
"EvaluationTraceCallback",
"create_evaluation_callbacks",
"AgentEvaluator",
"create_default_evaluator",
"ExperimentRunner",
"ExperimentResults",
"ExperimentResult"
]
"create_default_evaluator"
]

View File

@@ -0,0 +1,178 @@
from crewai.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy
from crewai.agent import Agent
from crewai.task import Task
from crewai.evaluation.evaluation_display import EvaluationDisplayFormatter
from typing import Any, Dict
from collections import defaultdict
from crewai.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence
from crewai.crew import Crew
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
class AgentEvaluator:
def __init__(
self,
evaluators: Sequence[BaseEvaluator] | None = None,
crew: Crew | None = None,
):
self.crew: Crew | None = crew
self.evaluators: Sequence[BaseEvaluator] | None = evaluators
self.agent_evaluators: dict[str, Sequence[BaseEvaluator] | None] = {}
if crew is not None:
assert crew and crew.agents is not None
for agent in crew.agents:
self.agent_evaluators[str(agent.id)] = self.evaluators
self.callback = create_evaluation_callbacks()
self.console_formatter = ConsoleFormatter()
self.display_formatter = EvaluationDisplayFormatter()
self.iteration = 1
self.iterations_results: dict[int, dict[str, list[AgentEvaluationResult]]] = {}
def set_iteration(self, iteration: int) -> None:
self.iteration = iteration
def evaluate_current_iteration(self) -> dict[str, list[AgentEvaluationResult]]:
if not self.crew:
raise ValueError("Cannot evaluate: no crew was provided to the evaluator.")
if not self.callback:
raise ValueError("Cannot evaluate: no callback was set. Use set_callback() method first.")
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
evaluation_results: defaultdict[str, list[AgentEvaluationResult]] = defaultdict(list)
total_evals = 0
for agent in self.crew.agents:
for task in self.crew.tasks:
if task.agent and task.agent.id == agent.id and self.agent_evaluators.get(str(agent.id)):
total_evals += 1
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}[/bold blue]"),
BarColumn(),
TextColumn("{task.percentage:.0f}% completed"),
console=self.console_formatter.console
) as progress:
eval_task = progress.add_task(f"Evaluating agents (iteration {self.iteration})...", total=total_evals)
for agent in self.crew.agents:
evaluator = self.agent_evaluators.get(str(agent.id))
if not evaluator:
continue
for task in self.crew.tasks:
if task.agent and str(task.agent.id) != str(agent.id):
continue
trace = self.callback.get_trace(str(agent.id), str(task.id))
if not trace:
self.console_formatter.print(f"[yellow]Warning: No trace found for agent {agent.role} on task {task.description[:30]}...[/yellow]")
progress.update(eval_task, advance=1)
continue
with crewai_event_bus.scoped_handlers():
result = self.evaluate(
agent=agent,
task=task,
execution_trace=trace,
final_output=task.output
)
evaluation_results[agent.role].append(result)
progress.update(eval_task, advance=1)
self.iterations_results[self.iteration] = evaluation_results
return evaluation_results
def get_evaluation_results(self):
if self.iteration in self.iterations_results:
return self.iterations_results[self.iteration]
return self.evaluate_current_iteration()
def display_results_with_iterations(self):
self.display_formatter.display_summary_results(self.iterations_results)
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = False):
agent_results = {}
with crewai_event_bus.scoped_handlers():
task_results = self.get_evaluation_results()
for agent_role, results in task_results.items():
if not results:
continue
agent_id = results[0].agent_id
aggregated_result = self.display_formatter._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=results,
strategy=strategy
)
agent_results[agent_role] = aggregated_result
if self.iteration == max(self.iterations_results.keys()):
self.display_results_with_iterations()
if include_evaluation_feedback:
self.display_evaluation_with_feedback()
return agent_results
def display_evaluation_with_feedback(self):
self.display_formatter.display_evaluation_with_feedback(self.iterations_results)
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any
) -> AgentEvaluationResult:
result = AgentEvaluationResult(
agent_id=str(agent.id),
task_id=str(task.id)
)
assert self.evaluators is not None
for evaluator in self.evaluators:
try:
score = evaluator.evaluate(
agent=agent,
task=task,
execution_trace=execution_trace,
final_output=final_output
)
result.metrics[evaluator.metric_category] = score
except Exception as e:
self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}")
return result
def create_default_evaluator(crew, llm=None):
from crewai.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
)
evaluators = [
GoalAlignmentEvaluator(llm=llm),
SemanticQualityEvaluator(llm=llm),
ToolSelectionEvaluator(llm=llm),
ParameterExtractionEvaluator(llm=llm),
ToolInvocationEvaluator(llm=llm),
ReasoningEfficiencyEvaluator(llm=llm),
]
return AgentEvaluator(evaluators=evaluators, crew=crew)

View File

@@ -57,9 +57,9 @@ class BaseEvaluator(abc.ABC):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
pass

View File

@@ -3,8 +3,8 @@ from typing import Dict, Any, List
from rich.table import Table
from rich.box import HEAVY_EDGE, ROUNDED
from collections.abc import Sequence
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy, AgentEvaluationResult, MetricCategory
from crewai.experimental.evaluation import EvaluationScore
from crewai.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy, AgentEvaluationResult, MetricCategory
from crewai.evaluation import EvaluationScore
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.llm_utils import create_llm
@@ -17,6 +17,7 @@ class EvaluationDisplayFormatter:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
return
# Get all agent roles across all iterations
all_agent_roles: set[str] = set()
for iter_results in iterations_results.values():
all_agent_roles.update(iter_results.keys())
@@ -24,6 +25,7 @@ class EvaluationDisplayFormatter:
for agent_role in sorted(all_agent_roles):
self.console_formatter.print(f"\n[bold cyan]Agent: {agent_role}[/bold cyan]")
# Process each iteration
for iter_num, results in sorted(iterations_results.items()):
if agent_role not in results or not results[agent_role]:
continue
@@ -31,19 +33,23 @@ class EvaluationDisplayFormatter:
agent_results = results[agent_role]
agent_id = agent_results[0].agent_id
# Aggregate results for this agent in this iteration
aggregated_result = self._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=agent_results,
)
# Display iteration header
self.console_formatter.print(f"\n[bold]Iteration {iter_num}[/bold]")
# Create table for this iteration
table = Table(box=ROUNDED)
table.add_column("Metric", style="cyan")
table.add_column("Score (1-10)", justify="center")
table.add_column("Feedback", style="green")
# Add metrics to table
if aggregated_result.metrics:
for metric, evaluation_score in aggregated_result.metrics.items():
score = evaluation_score.score
@@ -85,6 +91,7 @@ class EvaluationDisplayFormatter:
"Overall agent evaluation score"
)
# Print the table for this iteration
self.console_formatter.print(table)
def display_summary_results(self, iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]]):
@@ -241,6 +248,7 @@ class EvaluationDisplayFormatter:
feedback_summary = None
if feedbacks:
if len(feedbacks) > 1:
# Use the summarization method for multiple feedbacks
feedback_summary = self._summarize_feedbacks(
agent_role=agent_role,
metric=category.title(),
@@ -299,7 +307,7 @@ class EvaluationDisplayFormatter:
strategy_guidance = "Focus on the highest-scoring aspects and strengths demonstrated."
elif strategy == AggregationStrategy.WORST_PERFORMANCE:
strategy_guidance = "Focus on areas that need improvement and common issues across tasks."
else:
else: # Default/average strategies
strategy_guidance = "Provide a balanced analysis of strengths and weaknesses across all tasks."
prompt = [

View File

@@ -9,9 +9,7 @@ from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent
AgentExecutionCompletedEvent
)
from crewai.utilities.events.tool_usage_events import (
ToolUsageFinishedEvent,
@@ -54,18 +52,10 @@ class EvaluationTraceCallback(BaseEventListener):
def on_agent_started(source, event: AgentExecutionStartedEvent):
self.on_agent_start(event.agent, event.task)
@event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_started(source, event: LiteAgentExecutionStartedEvent):
self.on_lite_agent_start(event.agent_info)
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_completed(source, event: AgentExecutionCompletedEvent):
self.on_agent_finish(event.agent, event.task, event.output)
@event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_completed(source, event: LiteAgentExecutionCompletedEvent):
self.on_lite_agent_finish(event.output)
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_completed(source, event: ToolUsageFinishedEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.output, success=True)
@@ -98,38 +88,19 @@ class EvaluationTraceCallback(BaseEventListener):
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.on_llm_call_end(event.messages, event.response)
def on_lite_agent_start(self, agent_info: dict[str, Any]):
self.current_agent_id = agent_info['id']
self.current_task_id = "lite_task"
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
self._init_trace(
trace_key=trace_key,
agent_id=self.current_agent_id,
task_id=self.current_task_id,
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
)
def _init_trace(self, trace_key: str, **kwargs: Any):
self.traces[trace_key] = kwargs
def on_agent_start(self, agent: Agent, task: Task):
self.current_agent_id = agent.id
self.current_task_id = task.id
trace_key = f"{agent.id}_{task.id}"
self._init_trace(
trace_key=trace_key,
agent_id=agent.id,
task_id=task.id,
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
)
self.traces[trace_key] = {
"agent_id": agent.id,
"task_id": task.id,
"tool_uses": [],
"llm_calls": [],
"start_time": datetime.now(),
"final_output": None
}
def on_agent_finish(self, agent: Agent, task: Task, output: Any):
trace_key = f"{agent.id}_{task.id}"
@@ -137,20 +108,9 @@ class EvaluationTraceCallback(BaseEventListener):
self.traces[trace_key]["final_output"] = output
self.traces[trace_key]["end_time"] = datetime.now()
self._reset_current()
def _reset_current(self):
self.current_agent_id = None
self.current_task_id = None
def on_lite_agent_finish(self, output: Any):
trace_key = f"{self.current_agent_id}_lite_task"
if trace_key in self.traces:
self.traces[trace_key]["final_output"] = output
self.traces[trace_key]["end_time"] = datetime.now()
self._reset_current()
def on_tool_use(self, tool_name: str, tool_args: dict[str, Any] | str, result: Any,
success: bool = True, error_type: str | None = None):
if not self.current_agent_id or not self.current_task_id:
@@ -227,8 +187,4 @@ class EvaluationTraceCallback(BaseEventListener):
def create_evaluation_callbacks() -> EvaluationTraceCallback:
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
callback = EvaluationTraceCallback()
callback.setup_listeners(crewai_event_bus)
return callback
return EvaluationTraceCallback()

View File

@@ -3,8 +3,8 @@ from typing import Any, Dict
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.evaluation.json_parser import extract_json_from_llm_response
class GoalAlignmentEvaluator(BaseEvaluator):
@property
@@ -14,14 +14,10 @@ class GoalAlignmentEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}\nExpected output: {task.expected_output}\n"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent's output aligns with its assigned task goal.
@@ -41,7 +37,8 @@ Return your evaluation as JSON with fields 'score' (number) and 'feedback' (stri
{"role": "user", "content": f"""
Agent role: {agent.role}
Agent goal: {agent.goal}
{task_context}
Task description: {task.description}
Expected output: {task.expected_output}
Agent's final output:
{final_output}

View File

@@ -16,8 +16,8 @@ from collections.abc import Sequence
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.evaluation.json_parser import extract_json_from_llm_response
from crewai.tasks.task_output import TaskOutput
class ReasoningPatternType(Enum):
@@ -36,14 +36,10 @@ class ReasoningEfficiencyEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: TaskOutput | str,
task: Task | None = None,
final_output: TaskOutput,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}\nExpected output: {task.expected_output}\n"
llm_calls = execution_trace.get("llm_calls", [])
if not llm_calls or len(llm_calls) < 2:
@@ -87,8 +83,6 @@ class ReasoningEfficiencyEvaluator(BaseEvaluator):
call_samples = self._get_call_samples(llm_calls)
final_output = final_output.raw if isinstance(final_output, TaskOutput) else final_output
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the reasoning efficiency of an AI agent's thought process.
@@ -123,7 +117,7 @@ Return your evaluation as JSON with the following structure:
}"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Task description: {task.description}
Reasoning efficiency metrics:
- Total LLM calls: {efficiency_metrics["total_llm_calls"]}
@@ -136,7 +130,7 @@ Sample of agent reasoning flow (chronological sequence):
{call_samples}
Agent's final output:
{final_output[:500]}... (truncated)
{final_output.raw[:500]}... (truncated)
Evaluate the reasoning efficiency of this agent based on these interaction patterns.
Identify any inefficient reasoning patterns and provide specific suggestions for optimization.

View File

@@ -3,8 +3,8 @@ from typing import Any, Dict
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.evaluation.json_parser import extract_json_from_llm_response
class SemanticQualityEvaluator(BaseEvaluator):
@property
@@ -14,13 +14,10 @@ class SemanticQualityEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the semantic quality of an AI agent's output.
@@ -40,7 +37,7 @@ Return your evaluation as JSON with fields 'score' (number) and 'feedback' (stri
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Task description: {task.description}
Agent's final output:
{final_output}

View File

@@ -1,8 +1,8 @@
import json
from typing import Dict, Any
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.evaluation.json_parser import extract_json_from_llm_response
from crewai.agent import Agent
from crewai.task import Task
@@ -16,14 +16,10 @@ class ToolSelectionEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
unique_tool_types = set([tool.get("tool", "Unknown tool") for tool in tool_uses])
@@ -76,7 +72,7 @@ Return your evaluation as JSON with these fields:
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Task description: {task.description}
Available tools for this agent:
{available_tools_info}
@@ -132,13 +128,10 @@ class ParameterExtractionEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
@@ -219,7 +212,7 @@ Return your evaluation as JSON with these fields:
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Task description: {task.description}
Parameter extraction examples:
{param_samples_text}
@@ -274,13 +267,10 @@ class ToolInvocationEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_errors = []
tool_count = len(tool_uses)
@@ -362,7 +352,7 @@ Return your evaluation as JSON with these fields:
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Task description: {task.description}
Tool invocation examples:
{invocation_samples_text}

View File

@@ -1,40 +0,0 @@
from crewai.experimental.evaluation import (
BaseEvaluator,
EvaluationScore,
MetricCategory,
AgentEvaluationResult,
SemanticQualityEvaluator,
GoalAlignmentEvaluator,
ReasoningEfficiencyEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
EvaluationTraceCallback,
create_evaluation_callbacks,
AgentEvaluator,
create_default_evaluator,
ExperimentRunner,
ExperimentResults,
ExperimentResult,
)
__all__ = [
"BaseEvaluator",
"EvaluationScore",
"MetricCategory",
"AgentEvaluationResult",
"SemanticQualityEvaluator",
"GoalAlignmentEvaluator",
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"EvaluationTraceCallback",
"create_evaluation_callbacks",
"AgentEvaluator",
"create_default_evaluator",
"ExperimentRunner",
"ExperimentResults",
"ExperimentResult"
]

View File

@@ -1,245 +0,0 @@
import threading
from typing import Any
from crewai.experimental.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.evaluation_display import EvaluationDisplayFormatter
from crewai.utilities.events.agent_events import AgentEvaluationStartedEvent, AgentEvaluationCompletedEvent, AgentEvaluationFailedEvent
from crewai.experimental.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.events.task_events import TaskCompletedEvent
from crewai.utilities.events.agent_events import LiteAgentExecutionCompletedEvent
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult, EvaluationScore, MetricCategory
class ExecutionState:
def __init__(self):
self.traces = {}
self.current_agent_id: str | None = None
self.current_task_id: str | None = None
self.iteration = 1
self.iterations_results = {}
self.agent_evaluators = {}
class AgentEvaluator:
def __init__(
self,
agents: list[Agent],
evaluators: Sequence[BaseEvaluator] | None = None,
):
self.agents: list[Agent] = agents
self.evaluators: Sequence[BaseEvaluator] | None = evaluators
self.callback = create_evaluation_callbacks()
self.console_formatter = ConsoleFormatter()
self.display_formatter = EvaluationDisplayFormatter()
self._thread_local: threading.local = threading.local()
for agent in self.agents:
self._execution_state.agent_evaluators[str(agent.id)] = self.evaluators
self._subscribe_to_events()
@property
def _execution_state(self) -> ExecutionState:
if not hasattr(self._thread_local, 'execution_state'):
self._thread_local.execution_state = ExecutionState()
return self._thread_local.execution_state
def _subscribe_to_events(self) -> None:
from typing import cast
crewai_event_bus.register_handler(TaskCompletedEvent, cast(Any, self._handle_task_completed))
crewai_event_bus.register_handler(LiteAgentExecutionCompletedEvent, cast(Any, self._handle_lite_agent_completed))
def _handle_task_completed(self, source: Any, event: TaskCompletedEvent) -> None:
assert event.task is not None
agent = event.task.agent
if agent and str(getattr(agent, 'id', 'unknown')) in self._execution_state.agent_evaluators:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=str(event.task.id))
state = ExecutionState()
state.current_agent_id = str(agent.id)
state.current_task_id = str(event.task.id)
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
if not trace:
return
result = self.evaluate(
agent=agent,
task=event.task,
execution_trace=trace,
final_output=event.output,
state=state
)
current_iteration = self._execution_state.iteration
if current_iteration not in self._execution_state.iterations_results:
self._execution_state.iterations_results[current_iteration] = {}
if agent.role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent.role] = []
self._execution_state.iterations_results[current_iteration][agent.role].append(result)
def _handle_lite_agent_completed(self, source: object, event: LiteAgentExecutionCompletedEvent) -> None:
agent_info = event.agent_info
agent_id = str(agent_info["id"])
if agent_id in self._execution_state.agent_evaluators:
state = ExecutionState()
state.current_agent_id = agent_id
state.current_task_id = "lite_task"
target_agent = None
for agent in self.agents:
if str(agent.id) == agent_id:
target_agent = agent
break
if not target_agent:
return
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
if not trace:
return
result = self.evaluate(
agent=target_agent,
execution_trace=trace,
final_output=event.output,
state=state
)
current_iteration = self._execution_state.iteration
if current_iteration not in self._execution_state.iterations_results:
self._execution_state.iterations_results[current_iteration] = {}
agent_role = target_agent.role
if agent_role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent_role] = []
self._execution_state.iterations_results[current_iteration][agent_role].append(result)
def set_iteration(self, iteration: int) -> None:
self._execution_state.iteration = iteration
def reset_iterations_results(self) -> None:
self._execution_state.iterations_results = {}
def get_evaluation_results(self) -> dict[str, list[AgentEvaluationResult]]:
if self._execution_state.iterations_results and self._execution_state.iteration in self._execution_state.iterations_results:
return self._execution_state.iterations_results[self._execution_state.iteration]
return {}
def display_results_with_iterations(self) -> None:
self.display_formatter.display_summary_results(self._execution_state.iterations_results)
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = True) -> dict[str, AgentAggregatedEvaluationResult]:
agent_results = {}
with crewai_event_bus.scoped_handlers():
task_results = self.get_evaluation_results()
for agent_role, results in task_results.items():
if not results:
continue
agent_id = results[0].agent_id
aggregated_result = self.display_formatter._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=results,
strategy=strategy
)
agent_results[agent_role] = aggregated_result
if self._execution_state.iterations_results and self._execution_state.iteration == max(self._execution_state.iterations_results.keys(), default=0):
self.display_results_with_iterations()
if include_evaluation_feedback:
self.display_evaluation_with_feedback()
return agent_results
def display_evaluation_with_feedback(self) -> None:
self.display_formatter.display_evaluation_with_feedback(self._execution_state.iterations_results)
def evaluate(
self,
agent: Agent,
execution_trace: dict[str, Any],
final_output: Any,
state: ExecutionState,
task: Task | None = None,
) -> AgentEvaluationResult:
result = AgentEvaluationResult(
agent_id=state.current_agent_id or str(agent.id),
task_id=state.current_task_id or (str(task.id) if task else "unknown_task")
)
assert self.evaluators is not None
task_id = str(task.id) if task else None
for evaluator in self.evaluators:
try:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id)
score = evaluator.evaluate(
agent=agent,
task=task,
execution_trace=execution_trace,
final_output=final_output
)
result.metrics[evaluator.metric_category] = score
self.emit_evaluation_completed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, metric_category=evaluator.metric_category, score=score)
except Exception as e:
self.emit_evaluation_failed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, error=str(e))
self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}")
return result
def emit_evaluation_started_event(self, agent_role: str, agent_id: str, task_id: str | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationStartedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration)
)
def emit_evaluation_completed_event(self, agent_role: str, agent_id: str, task_id: str | None = None, metric_category: MetricCategory | None = None, score: EvaluationScore | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationCompletedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, metric_category=metric_category, score=score)
)
def emit_evaluation_failed_event(self, agent_role: str, agent_id: str, error: str, task_id: str | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationFailedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, error=error)
)
def create_default_evaluator(agents: list[Agent], llm: None = None):
from crewai.experimental.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
)
evaluators = [
GoalAlignmentEvaluator(llm=llm),
SemanticQualityEvaluator(llm=llm),
ToolSelectionEvaluator(llm=llm),
ParameterExtractionEvaluator(llm=llm),
ToolInvocationEvaluator(llm=llm),
ReasoningEfficiencyEvaluator(llm=llm),
]
return AgentEvaluator(evaluators=evaluators, agents=agents)

View File

@@ -1,8 +0,0 @@
from crewai.experimental.evaluation.experiment.runner import ExperimentRunner
from crewai.experimental.evaluation.experiment.result import ExperimentResults, ExperimentResult
__all__ = [
"ExperimentRunner",
"ExperimentResults",
"ExperimentResult"
]

View File

@@ -1,122 +0,0 @@
import json
import os
from datetime import datetime, timezone
from typing import Any
from pydantic import BaseModel
class ExperimentResult(BaseModel):
identifier: str
inputs: dict[str, Any]
score: int | dict[str, int | float]
expected_score: int | dict[str, int | float]
passed: bool
agent_evaluations: dict[str, Any] | None = None
class ExperimentResults:
def __init__(self, results: list[ExperimentResult], metadata: dict[str, Any] | None = None):
self.results = results
self.metadata = metadata or {}
self.timestamp = datetime.now(timezone.utc)
from crewai.experimental.evaluation.experiment.result_display import ExperimentResultsDisplay
self.display = ExperimentResultsDisplay()
def to_json(self, filepath: str | None = None) -> dict[str, Any]:
data = {
"timestamp": self.timestamp.isoformat(),
"metadata": self.metadata,
"results": [r.model_dump(exclude={"agent_evaluations"}) for r in self.results]
}
if filepath:
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
self.display.console.print(f"[green]Results saved to {filepath}[/green]")
return data
def compare_with_baseline(self, baseline_filepath: str, save_current: bool = True, print_summary: bool = False) -> dict[str, Any]:
baseline_runs = []
if os.path.exists(baseline_filepath) and os.path.getsize(baseline_filepath) > 0:
try:
with open(baseline_filepath, 'r') as f:
baseline_data = json.load(f)
if isinstance(baseline_data, dict) and "timestamp" in baseline_data:
baseline_runs = [baseline_data]
elif isinstance(baseline_data, list):
baseline_runs = baseline_data
except (json.JSONDecodeError, FileNotFoundError) as e:
self.display.console.print(f"[yellow]Warning: Could not load baseline file: {str(e)}[/yellow]")
if not baseline_runs:
if save_current:
current_data = self.to_json()
with open(baseline_filepath, 'w') as f:
json.dump([current_data], f, indent=2)
self.display.console.print(f"[green]Saved current results as new baseline to {baseline_filepath}[/green]")
return {"is_baseline": True, "changes": {}}
baseline_runs.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
latest_run = baseline_runs[0]
comparison = self._compare_with_run(latest_run)
if print_summary:
self.display.comparison_summary(comparison, latest_run["timestamp"])
if save_current:
current_data = self.to_json()
baseline_runs.append(current_data)
with open(baseline_filepath, 'w') as f:
json.dump(baseline_runs, f, indent=2)
self.display.console.print(f"[green]Added current results to baseline file {baseline_filepath}[/green]")
return comparison
def _compare_with_run(self, baseline_run: dict[str, Any]) -> dict[str, Any]:
baseline_results = baseline_run.get("results", [])
baseline_lookup = {}
for result in baseline_results:
test_identifier = result.get("identifier")
if test_identifier:
baseline_lookup[test_identifier] = result
improved = []
regressed = []
unchanged = []
new_tests = []
for result in self.results:
test_identifier = result.identifier
if not test_identifier or test_identifier not in baseline_lookup:
new_tests.append(test_identifier)
continue
baseline_result = baseline_lookup[test_identifier]
baseline_passed = baseline_result.get("passed", False)
if result.passed and not baseline_passed:
improved.append(test_identifier)
elif not result.passed and baseline_passed:
regressed.append(test_identifier)
else:
unchanged.append(test_identifier)
missing_tests = []
current_test_identifiers = {result.identifier for result in self.results}
for result in baseline_results:
test_identifier = result.get("identifier")
if test_identifier and test_identifier not in current_test_identifiers:
missing_tests.append(test_identifier)
return {
"improved": improved,
"regressed": regressed,
"unchanged": unchanged,
"new_tests": new_tests,
"missing_tests": missing_tests,
"total_compared": len(improved) + len(regressed) + len(unchanged),
"baseline_timestamp": baseline_run.get("timestamp", "unknown")
}

View File

@@ -1,70 +0,0 @@
from typing import Dict, Any
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from crewai.experimental.evaluation.experiment.result import ExperimentResults
class ExperimentResultsDisplay:
def __init__(self):
self.console = Console()
def summary(self, experiment_results: ExperimentResults):
total = len(experiment_results.results)
passed = sum(1 for r in experiment_results.results if r.passed)
table = Table(title="Experiment Summary")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
table.add_row("Total Test Cases", str(total))
table.add_row("Passed", str(passed))
table.add_row("Failed", str(total - passed))
table.add_row("Success Rate", f"{(passed / total * 100):.1f}%" if total > 0 else "N/A")
self.console.print(table)
def comparison_summary(self, comparison: Dict[str, Any], baseline_timestamp: str):
self.console.print(Panel(f"[bold]Comparison with baseline run from {baseline_timestamp}[/bold]",
expand=False))
table = Table(title="Results Comparison")
table.add_column("Metric", style="cyan")
table.add_column("Count", style="white")
table.add_column("Details", style="dim")
improved = comparison.get("improved", [])
if improved:
details = ", ".join([f"{test_identifier}" for test_identifier in improved[:3]])
if len(improved) > 3:
details += f" and {len(improved) - 3} more"
table.add_row("✅ Improved", str(len(improved)), details)
else:
table.add_row("✅ Improved", "0", "")
regressed = comparison.get("regressed", [])
if regressed:
details = ", ".join([f"{test_identifier}" for test_identifier in regressed[:3]])
if len(regressed) > 3:
details += f" and {len(regressed) - 3} more"
table.add_row("❌ Regressed", str(len(regressed)), details, style="red")
else:
table.add_row("❌ Regressed", "0", "")
unchanged = comparison.get("unchanged", [])
table.add_row("⏺ Unchanged", str(len(unchanged)), "")
new_tests = comparison.get("new_tests", [])
if new_tests:
details = ", ".join(new_tests[:3])
if len(new_tests) > 3:
details += f" and {len(new_tests) - 3} more"
table.add_row(" New Tests", str(len(new_tests)), details)
missing_tests = comparison.get("missing_tests", [])
if missing_tests:
details = ", ".join(missing_tests[:3])
if len(missing_tests) > 3:
details += f" and {len(missing_tests) - 3} more"
table.add_row(" Missing Tests", str(len(missing_tests)), details)
self.console.print(table)

View File

@@ -1,125 +0,0 @@
from collections import defaultdict
from hashlib import md5
from typing import Any
from crewai import Crew, Agent
from crewai.experimental.evaluation import AgentEvaluator, create_default_evaluator
from crewai.experimental.evaluation.experiment.result_display import ExperimentResultsDisplay
from crewai.experimental.evaluation.experiment.result import ExperimentResults, ExperimentResult
from crewai.experimental.evaluation.evaluation_display import AgentAggregatedEvaluationResult
class ExperimentRunner:
def __init__(self, dataset: list[dict[str, Any]]):
self.dataset = dataset or []
self.evaluator: AgentEvaluator | None = None
self.display = ExperimentResultsDisplay()
def run(self, crew: Crew | None = None, agents: list[Agent] | None = None, print_summary: bool = False) -> ExperimentResults:
if crew and not agents:
agents = crew.agents
assert agents is not None
self.evaluator = create_default_evaluator(agents=agents)
results = []
for test_case in self.dataset:
self.evaluator.reset_iterations_results()
result = self._run_test_case(test_case=test_case, crew=crew, agents=agents)
results.append(result)
experiment_results = ExperimentResults(results)
if print_summary:
self.display.summary(experiment_results)
return experiment_results
def _run_test_case(self, test_case: dict[str, Any], agents: list[Agent], crew: Crew | None = None) -> ExperimentResult:
inputs = test_case["inputs"]
expected_score = test_case["expected_score"]
identifier = test_case.get("identifier") or md5(str(test_case).encode(), usedforsecurity=False).hexdigest()
try:
self.display.console.print(f"[dim]Running crew with input: {str(inputs)[:50]}...[/dim]")
self.display.console.print("\n")
if crew:
crew.kickoff(inputs=inputs)
else:
for agent in agents:
agent.kickoff(**inputs)
assert self.evaluator is not None
agent_evaluations = self.evaluator.get_agent_evaluation()
actual_score = self._extract_scores(agent_evaluations)
passed = self._assert_scores(expected_score, actual_score)
return ExperimentResult(
identifier=identifier,
inputs=inputs,
score=actual_score,
expected_score=expected_score,
passed=passed,
agent_evaluations=agent_evaluations
)
except Exception as e:
self.display.console.print(f"[red]Error running test case: {str(e)}[/red]")
return ExperimentResult(
identifier=identifier,
inputs=inputs,
score=0,
expected_score=expected_score,
passed=False
)
def _extract_scores(self, agent_evaluations: dict[str, AgentAggregatedEvaluationResult]) -> float | dict[str, float]:
all_scores: dict[str, list[float]] = defaultdict(list)
for evaluation in agent_evaluations.values():
for metric_name, score in evaluation.metrics.items():
if score.score is not None:
all_scores[metric_name.value].append(score.score)
avg_scores = {m: sum(s)/len(s) for m, s in all_scores.items()}
if len(avg_scores) == 1:
return list(avg_scores.values())[0]
return avg_scores
def _assert_scores(self, expected: float | dict[str, float],
actual: float | dict[str, float]) -> bool:
"""
Compare expected and actual scores, and return whether the test case passed.
The rules for comparison are as follows:
- If both expected and actual scores are single numbers, the actual score must be >= expected.
- If expected is a single number and actual is a dict, compare against the average of actual values.
- If expected is a dict and actual is a single number, actual must be >= all expected values.
- If both are dicts, actual must have matching keys with values >= expected values.
"""
if isinstance(expected, (int, float)) and isinstance(actual, (int, float)):
return actual >= expected
if isinstance(expected, dict) and isinstance(actual, (int, float)):
return all(actual >= exp_score for exp_score in expected.values())
if isinstance(expected, (int, float)) and isinstance(actual, dict):
if not actual:
return False
avg_score = sum(actual.values()) / len(actual)
return avg_score >= expected
if isinstance(expected, dict) and isinstance(actual, dict):
if not expected:
return True
matching_keys = set(expected.keys()) & set(actual.keys())
if not matching_keys:
return False
# All matching keys must have actual >= expected
return all(actual[key] >= expected[key] for key in matching_keys)
return False

View File

@@ -1,26 +0,0 @@
from crewai.experimental.evaluation.metrics.reasoning_metrics import (
ReasoningEfficiencyEvaluator
)
from crewai.experimental.evaluation.metrics.tools_metrics import (
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.experimental.evaluation.metrics.goal_metrics import (
GoalAlignmentEvaluator
)
from crewai.experimental.evaluation.metrics.semantic_quality_metrics import (
SemanticQualityEvaluator
)
__all__ = [
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"GoalAlignmentEvaluator",
"SemanticQualityEvaluator"
]

View File

@@ -1,52 +0,0 @@
import inspect
from typing_extensions import Any
import warnings
from crewai.experimental.evaluation.experiment import ExperimentResults, ExperimentRunner
from crewai import Crew, Agent
def assert_experiment_successfully(experiment_results: ExperimentResults, baseline_filepath: str | None = None) -> None:
failed_tests = [result for result in experiment_results.results if not result.passed]
if failed_tests:
detailed_failures: list[str] = []
for result in failed_tests:
expected = result.expected_score
actual = result.score
detailed_failures.append(f"- {result.identifier}: expected {expected}, got {actual}")
failure_details = "\n".join(detailed_failures)
raise AssertionError(f"The following test cases failed:\n{failure_details}")
baseline_filepath = baseline_filepath or _get_baseline_filepath_fallback()
comparison = experiment_results.compare_with_baseline(baseline_filepath=baseline_filepath)
assert_experiment_no_regression(comparison)
def assert_experiment_no_regression(comparison_result: dict[str, list[str]]) -> None:
regressed = comparison_result.get("regressed", [])
if regressed:
raise AssertionError(f"Regression detected! The following tests that previously passed now fail: {regressed}")
missing_tests = comparison_result.get("missing_tests", [])
if missing_tests:
warnings.warn(
f"Warning: {len(missing_tests)} tests from the baseline are missing in the current run: {missing_tests}",
UserWarning
)
def run_experiment(dataset: list[dict[str, Any]], crew: Crew | None = None, agents: list[Agent] | None = None, verbose: bool = False) -> ExperimentResults:
runner = ExperimentRunner(dataset=dataset)
return runner.run(agents=agents, crew=crew, print_summary=verbose)
def _get_baseline_filepath_fallback() -> str:
test_func_name = "experiment_fallback"
try:
current_frame = inspect.currentframe()
if current_frame is not None:
test_func_name = current_frame.f_back.f_back.f_code.co_name # type: ignore[union-attr]
except Exception:
...
return f"{test_func_name}_results.json"

View File

@@ -18,7 +18,6 @@ from crewai.utilities.chromadb import sanitize_collection_name
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
from crewai.utilities.paths import db_storage_path
from crewai.utilities.chromadb import create_persistent_client
@contextlib.contextmanager
@@ -85,11 +84,14 @@ class KnowledgeStorage(BaseKnowledgeStorage):
raise Exception("Collection not initialized")
def initialize_knowledge_storage(self):
self.app = create_persistent_client(
path=os.path.join(db_storage_path(), "knowledge"),
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
try:
collection_name = (
f"knowledge_{self.collection_name}"
@@ -109,8 +111,9 @@ class KnowledgeStorage(BaseKnowledgeStorage):
def reset(self):
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
if not self.app:
self.app = create_persistent_client(
path=base_path, settings=Settings(allow_reset=True)
self.app = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app.reset()

View File

@@ -305,7 +305,6 @@ class LiteAgent(FlowTrackable, BaseModel):
"""
# Create agent info for event emission
agent_info = {
"id": self.id,
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,

View File

@@ -93,7 +93,13 @@ class Mem0Storage(Storage):
if params:
if isinstance(self.memory, MemoryClient):
params["output_format"] = "v1.1"
self.memory.add(value, **params)
if isinstance(value, str):
messages = [{"role": "assistant", "content": value}]
else:
messages = value
self.memory.add(messages, **params)
def search(
self,

View File

@@ -4,12 +4,12 @@ import logging
import os
import shutil
import uuid
from typing import Any, Dict, List, Optional
from chromadb.api import ClientAPI
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.chromadb import create_persistent_client
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path
@@ -60,15 +60,17 @@ class RAGStorage(BaseRAGStorage):
self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self):
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
self.app = create_persistent_client(
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
self.collection = self.app.get_or_create_collection(
name=self.type, embedding_function=self.embedder_config
)

View File

@@ -1,7 +1,7 @@
import inspect
import logging
from pathlib import Path
from typing import Any, Callable, Dict, TypeVar, cast, List, Union
from typing import Any, Callable, Dict, TypeVar, cast, List
from crewai.tools import BaseTool
import yaml
@@ -28,8 +28,7 @@ def CrewBase(cls: T) -> T:
)
original_tasks_config_path = getattr(cls, "tasks_config", "config/tasks.yaml")
mcp_server_params: Union[list[str | dict[str, str]], dict[str, str], None] = getattr(cls, "mcp_server_params", None)
_mcp_server_adapter: Union[dict[str, Any], Any, None] = None
mcp_server_params: Any = getattr(cls, "mcp_server_params", None)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -68,57 +67,36 @@ def CrewBase(cls: T) -> T:
self._original_functions, "is_kickoff"
)
# Add close mcp servers method to after kickoff
bound_method = self._create_close_mcp_servers_method()
self._after_kickoff['_close_mcp_servers'] = bound_method
# Add close mcp server method to after kickoff
bound_method = self._create_close_mcp_server_method()
self._after_kickoff['_close_mcp_server'] = bound_method
def _create_close_mcp_servers_method(self):
def _close_mcp_servers(self, instance, outputs):
if self._mcp_server_adapter is None:
return outputs
for adapter in self._mcp_server_adapter.values():
def _create_close_mcp_server_method(self):
def _close_mcp_server(self, instance, outputs):
adapter = getattr(self, '_mcp_server_adapter', None)
if adapter is not None:
try:
adapter.stop()
except Exception as e:
logging.warning(f"Error stopping MCP server: {e}")
return outputs
_close_mcp_servers.is_after_kickoff = True
_close_mcp_server.is_after_kickoff = True
import types
return types.MethodType(_close_mcp_servers, self)
return types.MethodType(_close_mcp_server, self)
def get_mcp_tools(self, *tool_names: list[str], server: str | None = None) -> List[BaseTool]:
def get_mcp_tools(self, *tool_names: list[str]) -> List[BaseTool]:
if not self.mcp_server_params:
return []
from crewai_tools import MCPServerAdapter
if isinstance(self.mcp_server_params, list):
if self._mcp_server_adapter is None:
self._mcp_server_adapter = MCPServerAdapter(self.mcp_server_params)
if server is not None and len(self.mcp_server_params) > 1:
logging.warning("Using list of MCP server parameters. To use server parameter, please use a dictionary of MCP server parameters.")
# Type assertion: when mcp_server_params is a list, _mcp_server_adapter is a single MCPServerAdapter
adapter = cast(Any, self._mcp_server_adapter)
return adapter.tools.filter_by_names(tool_names or None)
adapter = getattr(self, '_mcp_server_adapter', None)
if not adapter:
self._mcp_server_adapter = MCPServerAdapter(self.mcp_server_params)
# Separated MCP adapters for each server.
elif isinstance(self.mcp_server_params, dict):
if self._mcp_server_adapter is None:
self._mcp_server_adapter = {}
aggregated_tools = []
for server_name, params in self.mcp_server_params.items():
if server is not None and server_name != server:
continue
adapter = self._mcp_server_adapter.get(server_name, None)
if not adapter:
self._mcp_server_adapter[server_name] = MCPServerAdapter(params)
aggregated_tools.extend(
self._mcp_server_adapter[server_name].tools.filter_by_names(tool_names or None))
return aggregated_tools
return self._mcp_server_adapter.tools.filter_by_names(tool_names or None)
def load_configurations(self):

View File

@@ -67,7 +67,6 @@ class Task(BaseModel):
description: Descriptive text detailing task's purpose and execution.
expected_output: Clear definition of expected task outcome.
output_file: File path for storing task output.
create_directory: Whether to create the directory for output_file if it doesn't exist.
output_json: Pydantic model for structuring JSON output.
output_pydantic: Pydantic model for task output.
security_config: Security configuration including fingerprinting.
@@ -116,10 +115,6 @@ class Task(BaseModel):
description="A file path to be used to create a file output.",
default=None,
)
create_directory: Optional[bool] = Field(
description="Whether to create the directory for output_file if it doesn't exist.",
default=True,
)
output: Optional[TaskOutput] = Field(
description="Task output, it's final result after being executed", default=None
)
@@ -758,10 +753,8 @@ Follow these guidelines:
resolved_path = Path(self.output_file).expanduser().resolve()
directory = resolved_path.parent
if self.create_directory and not directory.exists():
if not directory.exists():
directory.mkdir(parents=True, exist_ok=True)
elif not self.create_directory and not directory.exists():
raise RuntimeError(f"Directory {directory} does not exist and create_directory is False")
with resolved_path.open("w", encoding="utf-8") as file:
if isinstance(result, dict):

View File

@@ -1,10 +1,6 @@
import re
import portalocker
from chromadb import PersistentClient
from hashlib import md5
from typing import Optional
MIN_COLLECTION_LENGTH = 3
MAX_COLLECTION_LENGTH = 63
DEFAULT_COLLECTION = "default_collection"
@@ -64,16 +60,3 @@ def sanitize_collection_name(name: Optional[str], max_collection_length: int = M
sanitized = sanitized[:-1] + "z"
return sanitized
def create_persistent_client(path: str, **kwargs):
"""
Creates a persistent client for ChromaDB with a lock file to prevent
concurrent creations. Works for both multi-threads and multi-processes
environments.
"""
lockfile = f"chromadb-{md5(path.encode(), usedforsecurity=False).hexdigest()}.lock"
with portalocker.Lock(lockfile):
client = PersistentClient(path=path, **kwargs)
return client

View File

@@ -17,9 +17,6 @@ from .agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentEvaluationStartedEvent,
AgentEvaluationCompletedEvent,
AgentEvaluationFailedEvent,
)
from .task_events import (
TaskStartedEvent,
@@ -77,9 +74,6 @@ __all__ = [
"AgentExecutionStartedEvent",
"AgentExecutionCompletedEvent",
"AgentExecutionErrorEvent",
"AgentEvaluationStartedEvent",
"AgentEvaluationCompletedEvent",
"AgentEvaluationFailedEvent",
"TaskStartedEvent",
"TaskCompletedEvent",
"TaskFailedEvent",

View File

@@ -123,28 +123,3 @@ class AgentLogsExecutionEvent(BaseEvent):
type: str = "agent_logs_execution"
model_config = {"arbitrary_types_allowed": True}
# Agent Eval events
class AgentEvaluationStartedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
type: str = "agent_evaluation_started"
class AgentEvaluationCompletedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
metric_category: Any
score: Any
type: str = "agent_evaluation_completed"
class AgentEvaluationFailedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
error: str
type: str = "agent_evaluation_failed"

View File

@@ -4,7 +4,6 @@ from .agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
)
from .crew_events import (
CrewKickoffCompletedEvent,
@@ -81,7 +80,6 @@ EventTypes = Union[
CrewTrainFailedEvent,
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
LiteAgentExecutionCompletedEvent,
TaskStartedEvent,
TaskCompletedEvent,
TaskFailedEvent,

View File

@@ -1,237 +0,0 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Test Agent. An agent
created for testing purposes\nYour personal goal is: Complete test tasks successfully\n\nTo
give my best complete final answer to the task respond using the exact following
format:\n\nThought: I now can give a great answer\nFinal Answer: Your final
answer must be the great and the most complete as possible, it must be outcome
described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user",
"content": "Complete this task successfully"}], "model": "gpt-4o-mini", "stop":
["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '583'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFNNb9swDL3nVxA6J0U+HKTNbd0woMAOw7Bu6LbCUCXa1iqLgkgnzYr8
98FKWqdbB+wiQHx81OMj9TgCUM6qNSjTaDFt9JNL+TZ7N/dfrusPN01NyV6vPk3f/mrl5vLrXI17
Bt39RCNPrDNDbfQojsIBNgm1YF91tlrOl+fzxXKWgZYs+p5WR5kUNGldcJP5dF5MpqvJ7PzIbsgZ
ZLWG7yMAgMd89jqDxQe1hun4KdIis65RrZ+TAFQi30eUZnYsOogaD6ChIBiy9M8NdXUja7iCQFsw
OkDtNgga6l4/6MBbTAA/wnsXtIc3+b6Gjx41I8REG2cRWoStkwakQeCIxlXOgEXRzjNQgvzigwBV
OUU038OOOgiIFhr0MdPHoIOFK9g67wEDdwlBCI7OIjgB7oxB5qrzfpeznxRokIZS3wwk5EiB8ey0
54RVx7r3PXTenwA6BBLdzy27fXtE9s/+eqpjojv+g6oqFxw3ZULNFHovWSiqjO5HALd5jt2L0aiY
qI1SCt1jfu7i4lBODdszgEVxBIVE+yE+KxbjV8qVR79PFkEZbRq0A3XYGt1ZRyfA6KTpv9W8VvvQ
uAv1/5QfAGMwCtoyJrTOvOx4SEvYf65/pT2bnAUrxrRxBktxmPpBWKx05w8rr3jHgm1ZuVBjiskd
9r6K5aLQy0LjxcKo0X70GwAA//8DAMz2wVUFBAAA
headers:
CF-RAY:
- 95f93ea9af627e0b-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 15 Jul 2025 12:25:54 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=GRZmZLrjW5ZRHNmUJa4ccrMcy20D1rmeqK6Ptlv0mRY-1752582354-1.0.1.1-xKd_yga48Eedech5TRlThlEpDgsB2whxkWHlCyAGOVMqMcvH1Ju9FdXYbuQ9NdUQcVxPLgiGM35lYhqSLVQiXDyK01dnyp2Gvm560FBN9DY;
path=/; expires=Tue, 15-Jul-25 12:55:54 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=MYqswpSR7sqr4kGp6qZVkaL7HDYwMiww49PeN9QBP.A-1752582354973-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '4047'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '4440'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999885'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_5704c0f206a927ddc12aa1a19b612a75
status:
code: 200
message: OK
- request:
body: '{"messages": [{"role": "system", "content": "You are an expert evaluator
assessing how well an AI agent''s output aligns with its assigned task goal.\n\nScore
the agent''s goal alignment on a scale from 0-10 where:\n- 0: Complete misalignment,
agent did not understand or attempt the task goal\n- 5: Partial alignment, agent
attempted the task but missed key requirements\n- 10: Perfect alignment, agent
fully satisfied all task requirements\n\nConsider:\n1. Did the agent correctly
interpret the task goal?\n2. Did the final output directly address the requirements?\n3.
Did the agent focus on relevant aspects of the task?\n4. Did the agent provide
all requested information or deliverables?\n\nReturn your evaluation as JSON
with fields ''score'' (number) and ''feedback'' (string).\n"}, {"role": "user",
"content": "\nAgent role: Test Agent\nAgent goal: Complete test tasks successfully\n\n\nAgent''s
final output:\nPlease provide me with the specific details or context of the
task you need help with, and I will ensure to complete it successfully and provide
a thorough response.\n\nEvaluate how well the agent''s output aligns with the
assigned task goal.\n"}], "model": "gpt-4o-mini", "stop": []}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '1196'
content-type:
- application/json
cookie:
- __cf_bm=GRZmZLrjW5ZRHNmUJa4ccrMcy20D1rmeqK6Ptlv0mRY-1752582354-1.0.1.1-xKd_yga48Eedech5TRlThlEpDgsB2whxkWHlCyAGOVMqMcvH1Ju9FdXYbuQ9NdUQcVxPLgiGM35lYhqSLVQiXDyK01dnyp2Gvm560FBN9DY;
_cfuvid=MYqswpSR7sqr4kGp6qZVkaL7HDYwMiww49PeN9QBP.A-1752582354973-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAA4xUy27bQAy8+yuIPdtGbMdN4FvbSxM0QIsEKNA6MJhdSmK82hWWVFwj8L8XKz/k
9AH0ogOHnOFjVq8DAMPOLMDYCtXWjR990O+TT7dfZs/v5OtFy/ef7++mxfu7j83t/cONGeaK+PRM
Vo9VYxvrxpNyDHvYJkKlzDq5mk/n19PZfN4BdXTkc1nZ6OgyjmoOPJpeTC9HF1ejyfWhuopsScwC
fgwAAF67b+4zOPppFnAxPEZqEsGSzOKUBGBS9DliUIRFMagZ9qCNQSl0rb8uA8DSiI2JlmYB0+E+
UBC5J7TrHFuah4oASwoKjh2EqOCojkE0oRIgWE+YoA2OUhZzHEqIBWhFoChrKCP6IWwqthWwgEY4
bItASbRLEpDWWhIpWu+3Y7gJooRuCKyAsiYHRUxQx0TgSJG9DIGDY4ua5RA82nVW5cDKqPxCWYhC
iSXBhrU69TOGbxV7ysxSxY0Awoa951AGkq69/do67QLZk8vBJsUXdgQYtoBWW/SQSJoYpFPq2Ptp
MLjTttC51DFXVIPjRFb9drw0y7A7v0uiohXM3git92cAhhAVs7c6RzwekN3JAz6WTYpP8lupKTiw
VKtEKDHke4vGxnTobgDw2HmtfWMf06RYN7rSuKZObjo7eM30Fu/R6yOoUdH38dnkCLzhWx1ud+ZW
Y9FW5PrS3trYOo5nwOBs6j+7+Rv3fnIO5f/Q94C11Ci5VZPIsX07cZ+WKP8B/pV22nLXsBFKL2xp
pUwpX8JRga3fv0sjW1GqVwWHklKTuHuc+ZKD3eAXAAAA//8DADksFsafBAAA
headers:
CF-RAY:
- 95f93ec73a1c7e0b-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 15 Jul 2025 12:25:57 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '1544'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '1546'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999732'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_44930ba12ad8d1e3f0beed1d5e3d8b0c
status:
code: 200
message: OK
version: 1

File diff suppressed because one or more lines are too long

View File

@@ -427,140 +427,4 @@ interactions:
status:
code: 200
message: OK
- request:
body: '{"messages": [{"role": "system", "content": "You are an expert evaluator
assessing how well an AI agent''s output aligns with its assigned task goal.\n\nScore
the agent''s goal alignment on a scale from 0-10 where:\n- 0: Complete misalignment,
agent did not understand or attempt the task goal\n- 5: Partial alignment, agent
attempted the task but missed key requirements\n- 10: Perfect alignment, agent
fully satisfied all task requirements\n\nConsider:\n1. Did the agent correctly
interpret the task goal?\n2. Did the final output directly address the requirements?\n3.
Did the agent focus on relevant aspects of the task?\n4. Did the agent provide
all requested information or deliverables?\n\nReturn your evaluation as JSON
with fields ''score'' (number) and ''feedback'' (string).\n"}, {"role": "user",
"content": "\nAgent role: Test Agent\nAgent goal: Complete test tasks successfully\nTask
description: Test task description\nExpected output: Expected test output\n\nAgent''s
final output:\nThe expected test output is a comprehensive document that outlines
the specific parameters and criteria that define success for the task at hand.
It should include detailed descriptions of the tasks, the goals that need to
be achieved, and any specific formatting or structural requirements necessary
for the output. Each component of the task must be analyzed and addressed, providing
context as well as examples where applicable. Additionally, any tools or methodologies
that are relevant to executing the tasks successfully should be outlined, including
any potential risks or challenges that may arise during the process. This document
serves as a guiding framework to ensure that all aspects of the task are thoroughly
considered and executed to meet the high standards expected.\n\nEvaluate how
well the agent''s output aligns with the assigned task goal.\n"}], "model":
"gpt-4o-mini", "stop": []}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '1893'
content-type:
- application/json
cookie:
- _cfuvid=XwsgBfgvDGlKFQ4LiGYGIARIoSNTiwidqoo9UZcc.XY-1752087999227-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFRNbxs5DL37VxA6jwPHddrUxxwWi2BRtEAPRevCYCSOh41GUkWOnTTI
fy8kf4zT5rCXOfCRT4+P5DxNAAw7swRjO1TbJz+90dvFxy//vX0za7dfr29+3eo/n75++Mh0O/za
maZUxLsfZPVYdWFjnzwpx7CHbSZUKqyX767mV/PL2eKqAn105EvZJul0Eac9B57OZ/PFdPZuenl9
qO4iWxKzhG8TAICn+i06g6MHs4RZc4z0JIIbMstTEoDJ0ZeIQREWxaCmGUEbg1Ko0p9WAWBlxMZM
K7OEq2YfaIncHdr7EluZzx0BbigopBy37MgBgiNF9uTAkdjMqbQOsYVdhwraEdBDIqvkIA6aBgXp
4uAdcLB+cNTArmPbAQfHFpUEJPYEQ3CUi2LHYVPoCpOi3EOmnwNn6imoXMC/cUdbyk3FWw7oj8+4
SAIhKkgiyy1b9P4RHHneUn4pTEn0WIYC6YDX5866aqDH+yKHFRJm5cqInjeB3AWM7vQsUgzhTFb9
48GtUlloSwMkZ4bEDMetOaSg1QH9XldVwSrk2wY4iBLWSs/hmG47zGiVMouylZP7WHkzdRSEtwQu
2qH4dhyBjcWKHWsXhzJTEgpVAwagByySirgzRSfLDrtzsTKr8Hy+VJnaQbAsdhi8PwMwhKhYfKzr
/P2APJ8W2MdNyvFO/ig1LQeWbp0JJYayrKIxmYo+TwC+10MZXuy+STn2Sdca76k+92ax2POZ8T5H
9P31AdSo6Mf4YjFvXuFb71dezk7NWLQdubF0vEscHMczYHLW9d9qXuPed85h83/oR8BaSkpunTI5
ti87HtMy/agTfT3t5HIVbITyli2tlSmXSThqcfD7n4qRR1Hq1y2HDeWUuf5ZyiQnz5PfAAAA//8D
AEfUP8BcBQAA
headers:
CF-RAY:
- 95f365f1bfc87ded-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Mon, 14 Jul 2025 19:24:07 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=PcC3_3T8.MK_WpZlQLdZfwpNv9Pe45AIYmrXOSgJ65E-1752521047-1.0.1.1-eyqwSWfQC7ZV6.JwTsTihK1ZWCrEmxd52CtNcfe.fw1UjjBN9rdTU4G7hRZiNqHQYo4sVZMmgRgqM9k7HRSzN2zln0bKmMiOuSQTZh6xF_I;
path=/; expires=Mon, 14-Jul-25 19:54:07 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=JvQ1c4qYZefNwOPoVNgAtX8ET7ObU.JKDvGc43LOR6g-1752521047741-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '2729'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '2789'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999559'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_74f6e8ff49db25dbea3d3525cc149e8e
status:
code: 200
message: OK
version: 1

View File

@@ -1,123 +0,0 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Test Agent. An agent
created for testing purposes\nYour personal goal is: Complete test tasks successfully\nTo
give my best complete final answer to the task respond using the exact following
format:\n\nThought: I now can give a great answer\nFinal Answer: Your final
answer must be the great and the most complete as possible, it must be outcome
described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user",
"content": "\nCurrent Task: Test task description\n\nThis is the expected criteria
for your final answer: Expected test output\nyou MUST return the actual complete
content as the final answer, not a summary.\n\nBegin! This is VERY important
to you, use the tools available and give your best Final Answer, your job depends
on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '879'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFTBbhtHDL3rK4g5rwRbtaNYt9RoEaNoUaBODm0DgZnh7jKe5WyHXDmO
4X8vZiRLcupDLwvsPPLxPQ45jzMAx8GtwfkezQ9jnP9oeLv98N5+vfl9+4v89Mf76+XV7XDz8Yc/
r39T15SM9PkLeXvOWvg0jJGMk+xgnwmNCuv56nJ5+XZ1tbqswJACxZLWjTa/SPOBhefLs+XF/Gw1
P3+7z+4Te1K3hr9mAACP9Vt0SqCvbg1nzfPJQKrYkVsfggBcTrGcOFRlNRRzzRH0SYykSr8BSffg
UaDjLQFCV2QDit5TBvhbfmbBCO/q/xpue1ZgBesJ6OtI3iiAkRqkycbJGrjv2ffgk5S6CqkFhECG
HClAIPWZx9Kkgtz3aJVq37vChXoH2qcpBogp3UHkO1rAbU/QViW7Os8hLD5OgQBjBCFfOpEfgKVN
ecBSpoFAQxK1jMbSgY+Y2R6aWjJTT6K8JSHVBlACYOgpk3gCS4DyADqS55YpQDdxoMhCuoCbgwKf
tpSB0PeAJdaKseKpOsn0z8SZBhJrgESnXERY8S0JRsxWulkoilkKkDJ0JJQx8jcKi13DX3pWyuWm
FPDQN8jU7mW3KRfdSaj2r5ZLMEmgXOYg7K5OlcQYI1Cs4vSFavSVmLWnsDgdnEztpFiGV6YYTwAU
SVYbXkf20x55OgxpTN2Y02f9LtW1LKz9JhNqkjKQaml0FX2aAXyqyzC9mG835jSMtrF0R7Xc+Zvz
HZ877uARvXqzBy0ZxuP58nLVvMK32Q2rnqyT8+h7CsfU4+7hFDidALMT1/9V8xr3zjlL93/oj4D3
NBqFzZgpsH/p+BiW6Utd0dfDDl2ugl2ZK/a0MaZcbiJQi1PcPRxOH9Ro2LQsHeUxc309yk3Onmb/
AgAA//8DAAbYfvVABQAA
headers:
CF-RAY:
- 95f9c7ffa8331b11-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 15 Jul 2025 13:59:38 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=J_xe1AP.B5P6D2GVMCesyioeS5E9DnYT34rbwQUefFc-1752587978-1.0.1.1-5Dflk5cAj6YCsOSVbCFWWSpXpw_mXsczIdzWzs2h2OwDL01HQbduE5LAToy67sfjFjHeeO4xRrqPLUQpySy2QqyHXbI_fzX4UAt3.UdwHxU;
path=/; expires=Tue, 15-Jul-25 14:29:38 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=0rTD8RMpxBQQy42jzmum16_eoRtWNfaZMG_TJkhGS7I-1752587978437-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '2623'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '2626'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999813'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_ccc347e91010713379c920aa0efd1f4f
status:
code: 200
message: OK
version: 1

View File

View File

@@ -1,8 +1,8 @@
from unittest.mock import patch, MagicMock
from tests.experimental.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from tests.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from crewai.experimental.evaluation.base_evaluator import EvaluationScore
from crewai.experimental.evaluation.metrics.goal_metrics import GoalAlignmentEvaluator
from crewai.evaluation.base_evaluator import EvaluationScore
from crewai.evaluation.metrics.goal_metrics import GoalAlignmentEvaluator
from crewai.utilities.llm_utils import LLM

View File

@@ -3,12 +3,12 @@ from unittest.mock import patch, MagicMock
from typing import List, Dict, Any
from crewai.tasks.task_output import TaskOutput
from crewai.experimental.evaluation.metrics.reasoning_metrics import (
from crewai.evaluation.metrics.reasoning_metrics import (
ReasoningEfficiencyEvaluator,
)
from tests.experimental.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from tests.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from crewai.utilities.llm_utils import LLM
from crewai.experimental.evaluation.base_evaluator import EvaluationScore
from crewai.evaluation.base_evaluator import EvaluationScore
class TestReasoningEfficiencyEvaluator(BaseEvaluationMetricsTest):
@pytest.fixture

View File

@@ -1,8 +1,8 @@
from unittest.mock import patch, MagicMock
from crewai.experimental.evaluation.base_evaluator import EvaluationScore
from crewai.experimental.evaluation.metrics.semantic_quality_metrics import SemanticQualityEvaluator
from tests.experimental.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from crewai.evaluation.base_evaluator import EvaluationScore
from crewai.evaluation.metrics.semantic_quality_metrics import SemanticQualityEvaluator
from tests.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from crewai.utilities.llm_utils import LLM
class TestSemanticQualityEvaluator(BaseEvaluationMetricsTest):

View File

@@ -1,12 +1,12 @@
from unittest.mock import patch, MagicMock
from crewai.experimental.evaluation.metrics.tools_metrics import (
from crewai.evaluation.metrics.tools_metrics import (
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.utilities.llm_utils import LLM
from tests.experimental.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from tests.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
class TestToolSelectionEvaluator(BaseEvaluationMetricsTest):
def test_no_tools_available(self, mock_task, mock_agent):

View File

@@ -0,0 +1,95 @@
import pytest
from crewai.agent import Agent
from crewai.task import Task
from crewai.crew import Crew
from crewai.evaluation.agent_evaluator import AgentEvaluator
from crewai.evaluation.base_evaluator import AgentEvaluationResult
from crewai.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
)
from crewai.evaluation import create_default_evaluator
class TestAgentEvaluator:
@pytest.fixture
def mock_crew(self):
agent = Agent(
role="Test Agent",
goal="Complete test tasks successfully",
backstory="An agent created for testing purposes",
allow_delegation=False,
verbose=False
)
task = Task(
description="Test task description",
agent=agent,
expected_output="Expected test output"
)
crew = Crew(
agents=[agent],
tasks=[task]
)
return crew
def test_set_iteration(self):
agent_evaluator = AgentEvaluator()
agent_evaluator.set_iteration(3)
assert agent_evaluator.iteration == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_evaluate_current_iteration(self, mock_crew):
agent_evaluator = AgentEvaluator(crew=mock_crew, evaluators=[GoalAlignmentEvaluator()])
mock_crew.kickoff()
results = agent_evaluator.evaluate_current_iteration()
assert isinstance(results, dict)
agent, = mock_crew.agents
task, = mock_crew.tasks
assert len(mock_crew.agents) == 1
assert agent.role in results
assert len(results[agent.role]) == 1
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == str(task.id)
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 5.0
expected_feedback = "The agent's output demonstrates an understanding of the need for a comprehensive document"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 5' in goal_alignment.raw_response
def test_create_default_evaluator(self, mock_crew):
agent_evaluator = create_default_evaluator(crew=mock_crew)
assert isinstance(agent_evaluator, AgentEvaluator)
assert agent_evaluator.crew == mock_crew
expected_types = [
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
]
assert len(agent_evaluator.evaluators) == len(expected_types)
for evaluator, expected_type in zip(agent_evaluator.evaluators, expected_types):
assert isinstance(evaluator, expected_type)

View File

@@ -1,278 +0,0 @@
import pytest
from crewai.agent import Agent
from crewai.task import Task
from crewai.crew import Crew
from crewai.experimental.evaluation.agent_evaluator import AgentEvaluator
from crewai.experimental.evaluation.base_evaluator import AgentEvaluationResult
from crewai.experimental.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator,
MetricCategory,
EvaluationScore
)
from crewai.utilities.events.agent_events import AgentEvaluationStartedEvent, AgentEvaluationCompletedEvent, AgentEvaluationFailedEvent
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.experimental.evaluation import create_default_evaluator
class TestAgentEvaluator:
@pytest.fixture
def mock_crew(self):
agent = Agent(
role="Test Agent",
goal="Complete test tasks successfully",
backstory="An agent created for testing purposes",
allow_delegation=False,
verbose=False
)
task = Task(
description="Test task description",
agent=agent,
expected_output="Expected test output"
)
crew = Crew(
agents=[agent],
tasks=[task]
)
return crew
def test_set_iteration(self):
agent_evaluator = AgentEvaluator(agents=[])
agent_evaluator.set_iteration(3)
assert agent_evaluator._execution_state.iteration == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_evaluate_current_iteration(self, mock_crew):
agent_evaluator = AgentEvaluator(agents=mock_crew.agents, evaluators=[GoalAlignmentEvaluator()])
mock_crew.kickoff()
results = agent_evaluator.get_evaluation_results()
assert isinstance(results, dict)
agent, = mock_crew.agents
task, = mock_crew.tasks
assert len(mock_crew.agents) == 1
assert agent.role in results
assert len(results[agent.role]) == 1
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == str(task.id)
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 5.0
expected_feedback = "The agent's output demonstrates an understanding of the need for a comprehensive document outlining task"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 5' in goal_alignment.raw_response
def test_create_default_evaluator(self, mock_crew):
agent_evaluator = create_default_evaluator(agents=mock_crew.agents)
assert isinstance(agent_evaluator, AgentEvaluator)
assert agent_evaluator.agents == mock_crew.agents
expected_types = [
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
]
assert len(agent_evaluator.evaluators) == len(expected_types)
for evaluator, expected_type in zip(agent_evaluator.evaluators, expected_types):
assert isinstance(evaluator, expected_type)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_eval_lite_agent(self):
agent = Agent(
role="Test Agent",
goal="Complete test tasks successfully",
backstory="An agent created for testing purposes",
)
with crewai_event_bus.scoped_handlers():
events = {}
@crewai_event_bus.on(AgentEvaluationStartedEvent)
def capture_started(source, event):
events["started"] = event
@crewai_event_bus.on(AgentEvaluationCompletedEvent)
def capture_completed(source, event):
events["completed"] = event
@crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
events["failed"] = event
agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[GoalAlignmentEvaluator()])
agent.kickoff(messages="Complete this task successfully")
assert events.keys() == {"started", "completed"}
assert events["started"].agent_id == str(agent.id)
assert events["started"].agent_role == agent.role
assert events["started"].task_id is None
assert events["started"].iteration == 1
assert events["completed"].agent_id == str(agent.id)
assert events["completed"].agent_role == agent.role
assert events["completed"].task_id is None
assert events["completed"].iteration == 1
assert events["completed"].metric_category == MetricCategory.GOAL_ALIGNMENT
assert isinstance(events["completed"].score, EvaluationScore)
assert events["completed"].score.score == 2.0
results = agent_evaluator.get_evaluation_results()
assert isinstance(results, dict)
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == "lite_task"
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 2.0
expected_feedback = "The agent did not demonstrate a clear understanding of the task goal, which is to complete test tasks successfully"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 2' in goal_alignment.raw_response
@pytest.mark.vcr(filter_headers=["authorization"])
def test_eval_specific_agents_from_crew(self, mock_crew):
agent = Agent(
role="Test Agent Eval",
goal="Complete test tasks successfully",
backstory="An agent created for testing purposes",
)
task = Task(
description="Test task description",
agent=agent,
expected_output="Expected test output"
)
mock_crew.agents.append(agent)
mock_crew.tasks.append(task)
with crewai_event_bus.scoped_handlers():
events = {}
@crewai_event_bus.on(AgentEvaluationStartedEvent)
def capture_started(source, event):
events["started"] = event
@crewai_event_bus.on(AgentEvaluationCompletedEvent)
def capture_completed(source, event):
events["completed"] = event
@crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
events["failed"] = event
agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[GoalAlignmentEvaluator()])
mock_crew.kickoff()
assert events.keys() == {"started", "completed"}
assert events["started"].agent_id == str(agent.id)
assert events["started"].agent_role == agent.role
assert events["started"].task_id == str(task.id)
assert events["started"].iteration == 1
assert events["completed"].agent_id == str(agent.id)
assert events["completed"].agent_role == agent.role
assert events["completed"].task_id == str(task.id)
assert events["completed"].iteration == 1
assert events["completed"].metric_category == MetricCategory.GOAL_ALIGNMENT
assert isinstance(events["completed"].score, EvaluationScore)
assert events["completed"].score.score == 5.0
results = agent_evaluator.get_evaluation_results()
assert isinstance(results, dict)
assert len(results.keys()) == 1
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == str(task.id)
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 5.0
expected_feedback = "The agent provided a thorough guide on how to conduct a test task but failed to produce specific expected output"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 5' in goal_alignment.raw_response
@pytest.mark.vcr(filter_headers=["authorization"])
def test_failed_evaluation(self, mock_crew):
agent, = mock_crew.agents
task, = mock_crew.tasks
with crewai_event_bus.scoped_handlers():
events = {}
@crewai_event_bus.on(AgentEvaluationStartedEvent)
def capture_started(source, event):
events["started"] = event
@crewai_event_bus.on(AgentEvaluationCompletedEvent)
def capture_completed(source, event):
events["completed"] = event
@crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
events["failed"] = event
# Create a mock evaluator that will raise an exception
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator
from crewai.experimental.evaluation import MetricCategory
class FailingEvaluator(BaseEvaluator):
metric_category = MetricCategory.GOAL_ALIGNMENT
def evaluate(self, agent, task, execution_trace, final_output):
raise ValueError("Forced evaluation failure")
agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[FailingEvaluator()])
mock_crew.kickoff()
assert events.keys() == {"started", "failed"}
assert events["started"].agent_id == str(agent.id)
assert events["started"].agent_role == agent.role
assert events["started"].task_id == str(task.id)
assert events["started"].iteration == 1
assert events["failed"].agent_id == str(agent.id)
assert events["failed"].agent_role == agent.role
assert events["failed"].task_id == str(task.id)
assert events["failed"].iteration == 1
assert events["failed"].error == "Forced evaluation failure"
results = agent_evaluator.get_evaluation_results()
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == str(task.id)
assert result.metrics == {}

View File

@@ -1,111 +0,0 @@
import pytest
from unittest.mock import MagicMock, patch
from crewai.experimental.evaluation.experiment.result import ExperimentResult, ExperimentResults
class TestExperimentResult:
@pytest.fixture
def mock_results(self):
return [
ExperimentResult(
identifier="test-1",
inputs={"query": "What is the capital of France?"},
score=10,
expected_score=7,
passed=True
),
ExperimentResult(
identifier="test-2",
inputs={"query": "Who wrote Hamlet?"},
score={"relevance": 9, "factuality": 8},
expected_score={"relevance": 7, "factuality": 7},
passed=True,
agent_evaluations={"agent1": {"metrics": {"goal_alignment": {"score": 9}}}}
),
ExperimentResult(
identifier="test-3",
inputs={"query": "Any query"},
score={"relevance": 9, "factuality": 8},
expected_score={"relevance": 7, "factuality": 7},
passed=False,
agent_evaluations={"agent1": {"metrics": {"goal_alignment": {"score": 9}}}}
),
ExperimentResult(
identifier="test-4",
inputs={"query": "Another query"},
score={"relevance": 9, "factuality": 8},
expected_score={"relevance": 7, "factuality": 7},
passed=True,
agent_evaluations={"agent1": {"metrics": {"goal_alignment": {"score": 9}}}}
),
ExperimentResult(
identifier="test-6",
inputs={"query": "Yet another query"},
score={"relevance": 9, "factuality": 8},
expected_score={"relevance": 7, "factuality": 7},
passed=True,
agent_evaluations={"agent1": {"metrics": {"goal_alignment": {"score": 9}}}}
)
]
@patch('os.path.exists', return_value=True)
@patch('os.path.getsize', return_value=1)
@patch('json.load')
@patch('builtins.open', new_callable=MagicMock)
def test_experiment_results_compare_with_baseline(self, mock_open, mock_json_load, mock_path_getsize, mock_path_exists, mock_results):
baseline_data = {
"timestamp": "2023-01-01T00:00:00+00:00",
"results": [
{
"identifier": "test-1",
"inputs": {"query": "What is the capital of France?"},
"score": 7,
"expected_score": 7,
"passed": False
},
{
"identifier": "test-2",
"inputs": {"query": "Who wrote Hamlet?"},
"score": {"relevance": 8, "factuality": 7},
"expected_score": {"relevance": 7, "factuality": 7},
"passed": True
},
{
"identifier": "test-3",
"inputs": {"query": "Any query"},
"score": {"relevance": 8, "factuality": 7},
"expected_score": {"relevance": 7, "factuality": 7},
"passed": True
},
{
"identifier": "test-4",
"inputs": {"query": "Another query"},
"score": {"relevance": 8, "factuality": 7},
"expected_score": {"relevance": 7, "factuality": 7},
"passed": True
},
{
"identifier": "test-5",
"inputs": {"query": "Another query"},
"score": {"relevance": 8, "factuality": 7},
"expected_score": {"relevance": 7, "factuality": 7},
"passed": True
}
]
}
mock_json_load.return_value = baseline_data
results = ExperimentResults(results=mock_results)
results.display = MagicMock()
comparison = results.compare_with_baseline(baseline_filepath="baseline.json")
assert "baseline_timestamp" in comparison
assert comparison["baseline_timestamp"] == "2023-01-01T00:00:00+00:00"
assert comparison["improved"] == ["test-1"]
assert comparison["regressed"] == ["test-3"]
assert comparison["unchanged"] == ["test-2", "test-4"]
assert comparison["new_tests"] == ["test-6"]
assert comparison["missing_tests"] == ["test-5"]

View File

@@ -1,197 +0,0 @@
import pytest
from unittest.mock import MagicMock, patch
from crewai.crew import Crew
from crewai.experimental.evaluation.experiment.runner import ExperimentRunner
from crewai.experimental.evaluation.experiment.result import ExperimentResults
from crewai.experimental.evaluation.evaluation_display import AgentAggregatedEvaluationResult
from crewai.experimental.evaluation.base_evaluator import MetricCategory, EvaluationScore
class TestExperimentRunner:
@pytest.fixture
def mock_crew(self):
return MagicMock(llm=Crew)
@pytest.fixture
def mock_evaluator_results(self):
agent_evaluation = AgentAggregatedEvaluationResult(
agent_id="Test Agent",
agent_role="Test Agent Role",
metrics={
MetricCategory.GOAL_ALIGNMENT: EvaluationScore(
score=9,
feedback="Test feedback for goal alignment",
raw_response="Test raw response for goal alignment"
),
MetricCategory.REASONING_EFFICIENCY: EvaluationScore(
score=None,
feedback="Reasoning efficiency not applicable",
raw_response="Reasoning efficiency not applicable"
),
MetricCategory.PARAMETER_EXTRACTION: EvaluationScore(
score=7,
feedback="Test parameter extraction explanation",
raw_response="Test raw output"
),
MetricCategory.TOOL_SELECTION: EvaluationScore(
score=8,
feedback="Test tool selection explanation",
raw_response="Test raw output"
)
}
)
return {"Test Agent": agent_evaluation}
@patch('crewai.experimental.evaluation.experiment.runner.create_default_evaluator')
def test_run_success(self, mock_create_evaluator, mock_crew, mock_evaluator_results):
dataset = [
{
"identifier": "test-case-1",
"inputs": {"query": "Test query 1"},
"expected_score": 8
},
{
"identifier": "test-case-2",
"inputs": {"query": "Test query 2"},
"expected_score": {"goal_alignment": 7}
},
{
"inputs": {"query": "Test query 3"},
"expected_score": {"tool_selection": 9}
}
]
mock_evaluator = MagicMock()
mock_evaluator.get_agent_evaluation.return_value = mock_evaluator_results
mock_evaluator.reset_iterations_results = MagicMock()
mock_create_evaluator.return_value = mock_evaluator
runner = ExperimentRunner(dataset=dataset)
results = runner.run(crew=mock_crew)
assert isinstance(results, ExperimentResults)
result_1, result_2, result_3 = results.results
assert len(results.results) == 3
assert result_1.identifier == "test-case-1"
assert result_1.inputs == {"query": "Test query 1"}
assert result_1.expected_score == 8
assert result_1.passed is True
assert result_2.identifier == "test-case-2"
assert result_2.inputs == {"query": "Test query 2"}
assert isinstance(result_2.expected_score, dict)
assert "goal_alignment" in result_2.expected_score
assert result_2.passed is True
assert result_3.identifier == "c2ed49e63aa9a83af3ca382794134fd5"
assert result_3.inputs == {"query": "Test query 3"}
assert isinstance(result_3.expected_score, dict)
assert "tool_selection" in result_3.expected_score
assert result_3.passed is False
assert mock_crew.kickoff.call_count == 3
mock_crew.kickoff.assert_any_call(inputs={"query": "Test query 1"})
mock_crew.kickoff.assert_any_call(inputs={"query": "Test query 2"})
mock_crew.kickoff.assert_any_call(inputs={"query": "Test query 3"})
assert mock_evaluator.reset_iterations_results.call_count == 3
assert mock_evaluator.get_agent_evaluation.call_count == 3
@patch('crewai.experimental.evaluation.experiment.runner.create_default_evaluator')
def test_run_success_with_unknown_metric(self, mock_create_evaluator, mock_crew, mock_evaluator_results):
dataset = [
{
"identifier": "test-case-2",
"inputs": {"query": "Test query 2"},
"expected_score": {"goal_alignment": 7, "unknown_metric": 8}
}
]
mock_evaluator = MagicMock()
mock_evaluator.get_agent_evaluation.return_value = mock_evaluator_results
mock_evaluator.reset_iterations_results = MagicMock()
mock_create_evaluator.return_value = mock_evaluator
runner = ExperimentRunner(dataset=dataset)
results = runner.run(crew=mock_crew)
result, = results.results
assert result.identifier == "test-case-2"
assert result.inputs == {"query": "Test query 2"}
assert isinstance(result.expected_score, dict)
assert "goal_alignment" in result.expected_score.keys()
assert "unknown_metric" in result.expected_score.keys()
assert result.passed is True
@patch('crewai.experimental.evaluation.experiment.runner.create_default_evaluator')
def test_run_success_with_single_metric_evaluator_and_expected_specific_metric(self, mock_create_evaluator, mock_crew, mock_evaluator_results):
dataset = [
{
"identifier": "test-case-2",
"inputs": {"query": "Test query 2"},
"expected_score": {"goal_alignment": 7}
}
]
mock_evaluator = MagicMock()
mock_create_evaluator["Test Agent"].metrics = {
MetricCategory.GOAL_ALIGNMENT: EvaluationScore(
score=9,
feedback="Test feedback for goal alignment",
raw_response="Test raw response for goal alignment"
)
}
mock_evaluator.get_agent_evaluation.return_value = mock_evaluator_results
mock_evaluator.reset_iterations_results = MagicMock()
mock_create_evaluator.return_value = mock_evaluator
runner = ExperimentRunner(dataset=dataset)
results = runner.run(crew=mock_crew)
result, = results.results
assert result.identifier == "test-case-2"
assert result.inputs == {"query": "Test query 2"}
assert isinstance(result.expected_score, dict)
assert "goal_alignment" in result.expected_score.keys()
assert result.passed is True
@patch('crewai.experimental.evaluation.experiment.runner.create_default_evaluator')
def test_run_success_when_expected_metric_is_not_available(self, mock_create_evaluator, mock_crew, mock_evaluator_results):
dataset = [
{
"identifier": "test-case-2",
"inputs": {"query": "Test query 2"},
"expected_score": {"unknown_metric": 7}
}
]
mock_evaluator = MagicMock()
mock_create_evaluator["Test Agent"].metrics = {
MetricCategory.GOAL_ALIGNMENT: EvaluationScore(
score=5,
feedback="Test feedback for goal alignment",
raw_response="Test raw response for goal alignment"
)
}
mock_evaluator.get_agent_evaluation.return_value = mock_evaluator_results
mock_evaluator.reset_iterations_results = MagicMock()
mock_create_evaluator.return_value = mock_evaluator
runner = ExperimentRunner(dataset=dataset)
results = runner.run(crew=mock_crew)
result, = results.results
assert result.identifier == "test-case-2"
assert result.inputs == {"query": "Test query 2"}
assert isinstance(result.expected_score, dict)
assert "unknown_metric" in result.expected_score.keys()
assert result.passed is False

View File

@@ -329,3 +329,29 @@ def test_external_memory_save_events(custom_storage, external_memory_with_mocked
'agent_role': "test_agent",
'save_time_ms': ANY
}
def test_external_memory_with_mem0_storage_integration():
"""Test external memory integration with mem0 storage specifically"""
from crewai.memory.storage.mem0_storage import Mem0Storage
with patch('crewai.memory.external.external_memory.ExternalMemory._configure_mem0') as mock_configure:
mock_storage = MagicMock(spec=Mem0Storage)
mock_configure.return_value = mock_storage
embedder_config = {"provider": "mem0", "config": {"user_id": "test_user"}}
external_memory = ExternalMemory(embedder_config=embedder_config)
mock_crew = MagicMock()
external_memory.set_crew(mock_crew)
test_value = "Test external memory content"
test_metadata = {"task": "test_task"}
test_agent = "test_agent"
external_memory.save(value=test_value, metadata=test_metadata, agent=test_agent)
mock_storage.save.assert_called_once_with(
test_value,
{'task': 'test_task', 'agent': 'test_agent'}
)

View File

@@ -87,7 +87,7 @@ class InternalCrew:
@CrewBase
class InternalCrewWithMCP(InternalCrew):
mcp_server_params = [{"url": "localhost", "port": 8000}]
mcp_server_params = {"host": "localhost", "port": 8000}
@agent
def reporting_analyst(self):
@@ -97,19 +97,6 @@ class InternalCrewWithMCP(InternalCrew):
def researcher(self):
return Agent(config=self.agents_config["researcher"], tools=self.get_mcp_tools("simple_tool")) # type: ignore[index]
@CrewBase
class InternalCrewWithMultipleMCP(InternalCrew):
mcp_server_params = {"mcp1": {"url": "localhost", "port": 8000}, "mcp2": {"url": "localhost", "port": 8001}}
@agent
def reporting_analyst(self):
return Agent(config=self.agents_config["reporting_analyst"], tools=self.get_mcp_tools(server="mcp1")) # type: ignore[index]
@agent
def researcher(self):
return Agent(config=self.agents_config["researcher"], tools=self.get_mcp_tools("simple_tool", server="mcp2")) # type: ignore[index]
def test_agent_memoization():
crew = SimpleCrew()
first_call_result = crew.simple_agent()
@@ -283,21 +270,4 @@ def test_internal_crew_with_mcp():
assert crew.reporting_analyst().tools == [simple_tool, another_simple_tool]
assert crew.researcher().tools == [simple_tool]
adapter_mock.assert_called_once_with([{"url": "localhost", "port": 8000}])
def test_internal_crew_with_multiple_mcp():
from crewai_tools import MCPServerAdapter
from crewai_tools.adapters.mcp_adapter import ToolCollection
from unittest.mock import call
mock = Mock(spec=MCPServerAdapter)
mock.tools = ToolCollection([simple_tool, another_simple_tool])
with patch("crewai_tools.MCPServerAdapter", return_value=mock) as adapter_mock:
crew = InternalCrewWithMultipleMCP()
assert crew.reporting_analyst().tools == [simple_tool, another_simple_tool]
assert crew.researcher().tools == [simple_tool]
adapter_mock.assert_has_calls([
call({"url": "localhost", "port": 8000}),
call({"url": "localhost", "port": 8001})
], any_order=True)
adapter_mock.assert_called_once_with({"host": "localhost", "port": 8000})

View File

@@ -1,14 +1,10 @@
import os
from unittest.mock import MagicMock, patch
import pytest
from mem0.client.main import MemoryClient
from mem0.memory.main import Memory
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.storage.mem0_storage import Mem0Storage
from crewai.task import Task
# Define the class (if not already defined)
@@ -171,8 +167,9 @@ def test_save_method_with_memory_oss(mem0_storage_with_mocked_config):
mem0_storage.save(test_value, test_metadata)
expected_messages = [{"role": "assistant", "content": test_value}]
mem0_storage.memory.add.assert_called_once_with(
test_value,
expected_messages,
agent_id="Test_Agent",
infer=False,
metadata={"type": "short_term", "key": "value"},
@@ -190,8 +187,9 @@ def test_save_method_with_memory_client(mem0_storage_with_memory_client_using_co
mem0_storage.save(test_value, test_metadata)
expected_messages = [{"role": "assistant", "content": test_value}]
mem0_storage.memory.add.assert_called_once_with(
test_value,
expected_messages,
agent_id="Test_Agent",
infer=False,
metadata={"type": "short_term", "key": "value"},
@@ -218,6 +216,67 @@ def test_search_method_with_memory_oss(mem0_storage_with_mocked_config):
assert results[0]["content"] == "Result 1"
def test_save_method_external_memory_type():
"""Test save method specifically for external memory type"""
crew = MockCrew(
memory_config={
"provider": "mem0",
"config": {"user_id": "test_user", "api_key": "test-key"},
}
)
with patch.object(MemoryClient, "__new__") as mock_client:
mock_memory_instance = MagicMock(spec=MemoryClient)
mock_client.return_value = mock_memory_instance
mem0_storage = Mem0Storage(type="external", crew=crew)
mem0_storage.memory.add = MagicMock()
test_value = "External memory test content"
test_metadata = {"task": "test_task", "agent": "test_agent"}
mem0_storage.save(test_value, test_metadata)
expected_messages = [{"role": "assistant", "content": test_value}]
mem0_storage.memory.add.assert_called_once_with(
expected_messages,
user_id="test_user",
agent_id="Test_Agent",
metadata={"type": "external", "task": "test_task", "agent": "test_agent"},
output_format="v1.1"
)
def test_save_method_with_non_string_value():
"""Test save method when value is already in message format"""
crew = MockCrew(
memory_config={
"provider": "mem0",
"config": {"user_id": "test_user", "api_key": "test-key"},
}
)
with patch.object(MemoryClient, "__new__") as mock_client:
mock_memory_instance = MagicMock(spec=MemoryClient)
mock_client.return_value = mock_memory_instance
mem0_storage = Mem0Storage(type="external", crew=crew)
mem0_storage.memory.add = MagicMock()
test_messages = [{"role": "user", "content": "Test message"}]
test_metadata = {"task": "test_task"}
mem0_storage.save(test_messages, test_metadata)
mem0_storage.memory.add.assert_called_once_with(
test_messages,
user_id="test_user",
agent_id="Test_Agent",
metadata={"type": "external", "task": "test_task"},
output_format="v1.1"
)
def test_search_method_with_memory_client(mem0_storage_with_memory_client_using_config_from_crew):
"""Test search method for different memory types"""
mem0_storage = mem0_storage_with_memory_client_using_config_from_crew

View File

@@ -1133,119 +1133,6 @@ def test_output_file_validation():
)
def test_create_directory_true():
"""Test that directories are created when create_directory=True."""
from pathlib import Path
output_path = "test_create_dir/output.txt"
task = Task(
description="Test task",
expected_output="Test output",
output_file=output_path,
create_directory=True,
)
resolved_path = Path(output_path).expanduser().resolve()
resolved_dir = resolved_path.parent
if resolved_path.exists():
resolved_path.unlink()
if resolved_dir.exists():
import shutil
shutil.rmtree(resolved_dir)
assert not resolved_dir.exists()
task._save_file("test content")
assert resolved_dir.exists()
assert resolved_path.exists()
if resolved_path.exists():
resolved_path.unlink()
if resolved_dir.exists():
import shutil
shutil.rmtree(resolved_dir)
def test_create_directory_false():
"""Test that directories are not created when create_directory=False."""
from pathlib import Path
output_path = "nonexistent_test_dir/output.txt"
task = Task(
description="Test task",
expected_output="Test output",
output_file=output_path,
create_directory=False,
)
resolved_path = Path(output_path).expanduser().resolve()
resolved_dir = resolved_path.parent
if resolved_dir.exists():
import shutil
shutil.rmtree(resolved_dir)
assert not resolved_dir.exists()
with pytest.raises(RuntimeError, match="Directory .* does not exist and create_directory is False"):
task._save_file("test content")
def test_create_directory_default():
"""Test that create_directory defaults to True for backward compatibility."""
task = Task(
description="Test task",
expected_output="Test output",
output_file="output.txt",
)
assert task.create_directory is True
def test_create_directory_with_existing_directory():
"""Test that create_directory=False works when directory already exists."""
from pathlib import Path
output_path = "existing_test_dir/output.txt"
resolved_path = Path(output_path).expanduser().resolve()
resolved_dir = resolved_path.parent
resolved_dir.mkdir(parents=True, exist_ok=True)
task = Task(
description="Test task",
expected_output="Test output",
output_file=output_path,
create_directory=False,
)
task._save_file("test content")
assert resolved_path.exists()
if resolved_path.exists():
resolved_path.unlink()
if resolved_dir.exists():
import shutil
shutil.rmtree(resolved_dir)
def test_github_issue_3149_reproduction():
"""Test that reproduces the exact issue from GitHub issue #3149."""
task = Task(
description="Test task for issue reproduction",
expected_output="Test output",
output_file="test_output.txt",
create_directory=True,
)
assert task.create_directory is True
assert task.output_file == "test_output.txt"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_task_execution_times():
researcher = Agent(

View File

@@ -1,27 +1,16 @@
import multiprocessing
import tempfile
import unittest
from typing import Any, Dict, List, Union
from chromadb.config import Settings
from unittest.mock import patch, MagicMock
import pytest
from crewai.utilities.chromadb import (
MAX_COLLECTION_LENGTH,
MIN_COLLECTION_LENGTH,
is_ipv4_pattern,
sanitize_collection_name,
create_persistent_client,
)
def persistent_client_worker(path, queue):
try:
create_persistent_client(path=path)
queue.put(None)
except Exception as e:
queue.put(e)
class TestChromadbUtils(unittest.TestCase):
def test_sanitize_collection_name_long_name(self):
"""Test sanitizing a very long collection name."""
@@ -90,34 +79,3 @@ class TestChromadbUtils(unittest.TestCase):
self.assertLessEqual(len(sanitized), MAX_COLLECTION_LENGTH)
self.assertTrue(sanitized[0].isalnum())
self.assertTrue(sanitized[-1].isalnum())
def test_create_persistent_client_passes_args(self):
with patch(
"crewai.utilities.chromadb.PersistentClient"
) as mock_persistent_client, tempfile.TemporaryDirectory() as tmpdir:
mock_instance = MagicMock()
mock_persistent_client.return_value = mock_instance
settings = Settings(allow_reset=True)
client = create_persistent_client(path=tmpdir, settings=settings)
mock_persistent_client.assert_called_once_with(
path=tmpdir, settings=settings
)
self.assertIs(client, mock_instance)
def test_create_persistent_client_process_safe(self):
with tempfile.TemporaryDirectory() as tmpdir:
queue = multiprocessing.Queue()
processes = [
multiprocessing.Process(
target=persistent_client_worker, args=(tmpdir, queue)
)
for _ in range(5)
]
[p.start() for p in processes]
[p.join() for p in processes]
errors = [queue.get(timeout=5) for _ in processes]
self.assertTrue(all(err is None for err in errors))

6022
uv.lock generated

File diff suppressed because it is too large Load Diff