Compare commits

..

2 Commits

Author SHA1 Message Date
Lucas Gomide
eaf550521c global handlers(?) WIP 2025-07-15 13:47:53 -03:00
Lucas Gomide
7b1ee07b18 feat: implement thread-safe event bus
This PR also added tests to ensure singleton pattern
2025-07-14 11:42:31 -03:00
48 changed files with 5923 additions and 5014 deletions

View File

@@ -9,7 +9,12 @@
},
"favicon": "/images/favicon.svg",
"contextual": {
"options": ["copy", "view", "chatgpt", "claude"]
"options": [
"copy",
"view",
"chatgpt",
"claude"
]
},
"navigation": {
"languages": [
@@ -50,22 +55,32 @@
"groups": [
{
"group": "Get Started",
"pages": ["en/introduction", "en/installation", "en/quickstart"]
"pages": [
"en/introduction",
"en/installation",
"en/quickstart"
]
},
{
"group": "Guides",
"pages": [
{
"group": "Strategy",
"pages": ["en/guides/concepts/evaluating-use-cases"]
"pages": [
"en/guides/concepts/evaluating-use-cases"
]
},
{
"group": "Agents",
"pages": ["en/guides/agents/crafting-effective-agents"]
"pages": [
"en/guides/agents/crafting-effective-agents"
]
},
{
"group": "Crews",
"pages": ["en/guides/crews/first-crew"]
"pages": [
"en/guides/crews/first-crew"
]
},
{
"group": "Flows",
@@ -79,6 +94,7 @@
"pages": [
"en/guides/advanced/customizing-prompts",
"en/guides/advanced/fingerprinting"
]
}
]
@@ -225,7 +241,6 @@
"en/observability/langtrace",
"en/observability/maxim",
"en/observability/mlflow",
"en/observability/neatlogs",
"en/observability/openlit",
"en/observability/opik",
"en/observability/patronus-evaluation",
@@ -259,7 +274,9 @@
},
{
"group": "Telemetry",
"pages": ["en/telemetry"]
"pages": [
"en/telemetry"
]
}
]
},
@@ -268,7 +285,9 @@
"groups": [
{
"group": "Getting Started",
"pages": ["en/enterprise/introduction"]
"pages": [
"en/enterprise/introduction"
]
},
{
"group": "Features",
@@ -323,7 +342,9 @@
},
{
"group": "Resources",
"pages": ["en/enterprise/resources/frequently-asked-questions"]
"pages": [
"en/enterprise/resources/frequently-asked-questions"
]
}
]
},
@@ -332,7 +353,9 @@
"groups": [
{
"group": "Getting Started",
"pages": ["en/api-reference/introduction"]
"pages": [
"en/api-reference/introduction"
]
},
{
"group": "Endpoints",
@@ -342,13 +365,16 @@
},
{
"tab": "Examples",
"groups": [
"groups": [
{
"group": "Examples",
"pages": ["en/examples/example"]
"pages": [
"en/examples/example"
]
}
]
}
]
},
{
@@ -399,15 +425,21 @@
"pages": [
{
"group": "Estratégia",
"pages": ["pt-BR/guides/concepts/evaluating-use-cases"]
"pages": [
"pt-BR/guides/concepts/evaluating-use-cases"
]
},
{
"group": "Agentes",
"pages": ["pt-BR/guides/agents/crafting-effective-agents"]
"pages": [
"pt-BR/guides/agents/crafting-effective-agents"
]
},
{
"group": "Crews",
"pages": ["pt-BR/guides/crews/first-crew"]
"pages": [
"pt-BR/guides/crews/first-crew"
]
},
{
"group": "Flows",
@@ -600,7 +632,9 @@
},
{
"group": "Telemetria",
"pages": ["pt-BR/telemetry"]
"pages": [
"pt-BR/telemetry"
]
}
]
},
@@ -609,7 +643,9 @@
"groups": [
{
"group": "Começando",
"pages": ["pt-BR/enterprise/introduction"]
"pages": [
"pt-BR/enterprise/introduction"
]
},
{
"group": "Funcionalidades",
@@ -674,7 +710,9 @@
"groups": [
{
"group": "Começando",
"pages": ["pt-BR/api-reference/introduction"]
"pages": [
"pt-BR/api-reference/introduction"
]
},
{
"group": "Endpoints",
@@ -684,13 +722,16 @@
},
{
"tab": "Exemplos",
"groups": [
"groups": [
{
"group": "Exemplos",
"pages": ["pt-BR/examples/example"]
"pages": [
"pt-BR/examples/example"
]
}
]
}
]
}
]

View File

@@ -1,134 +0,0 @@
---
title: Neatlogs Integration
description: Understand, debug, and share your CrewAI agent runs
icon: magnifying-glass-chart
---
# Introduction
Neatlogs helps you **see what your agent did**, **why**, and **share it**.
It captures every step: thoughts, tool calls, responses, evaluations. No raw logs. Just clear, structured traces. Great for debugging and collaboration.
## Why use Neatlogs?
CrewAI agents use multiple tools and reasoning steps. When something goes wrong, you need context — not just errors.
Neatlogs lets you:
- Follow the full decision path
- Add feedback directly on steps
- Chat with the trace using AI assistant
- Share runs publicly for feedback
- Turn insights into tasks
All in one place.
Manage your traces effortlessly
![Traces](/images/neatlogs-1.png)
![Trace Response](/images/neatlogs-2.png)
The best UX to view a CrewAI trace. Post comments anywhere you want. Use AI to debug.
![Trace Details](/images/neatlogs-3.png)
![Ai Chat Bot With A Trace](/images/neatlogs-4.png)
![Comments Drawer](/images/neatlogs-5.png)
## Core Features
- **Trace Viewer**: Track thoughts, tools, and decisions in sequence
- **Inline Comments**: Tag teammates on any trace step
- **Feedback & Evaluation**: Mark outputs as correct or incorrect
- **Error Highlighting**: Automatic flagging of API/tool failures
- **Task Conversion**: Convert comments into assigned tasks
- **Ask the Trace (AI)**: Chat with your trace using Neatlogs AI bot
- **Public Sharing**: Publish trace links to your community
## Quick Setup with CrewAI
<Steps>
<Step title="Sign Up & Get API Key">
Visit [neatlogs.com](https://neatlogs.com/?utm_source=crewAI-docs), create a project, copy the API key.
</Step>
<Step title="Install SDK">
```bash
pip install neatlogs
```
(Latest version 0.8.0, Python 3.8+; MIT license)
</Step>
<Step title="Initialize Neatlogs">
Before starting Crew agents, add:
```python
import neatlogs
neatlogs.init("YOUR_PROJECT_API_KEY")
```
Agents run as usual. Neatlogs captures everything automatically.
</Step>
</Steps>
## Under the Hood
According to GitHub, Neatlogs:
- Captures thoughts, tool calls, responses, errors, and token stats
- Supports AI-powered task generation and robust evaluation workflows
All with just two lines of code.
## Watch It Work
### 🔍 Full Demo (4min)
<iframe
width="100%"
height="315"
src="https://www.youtube.com/embed/8KDme9T2I7Q?si=b8oHteaBwFNs_Duk"
title="YouTube video player"
frameBorder="0"
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture"
allowFullScreen
></iframe>
### ⚙️ CrewAI Integration (30s)
<iframe
className="w-full aspect-video rounded-xl"
src="https://www.loom.com/embed/9c78b552af43452bb3e4783cb8d91230?sid=e9d7d370-a91a-49b0-809e-2f375d9e801d"
title="Loom video player"
frameBorder="0"
allowFullScreen
></iframe>
## Links & Support
- 📘 [Neatlogs Docs](https://docs.neatlogs.com/)
- 🔐 [Dashboard & API Key](https://app.neatlogs.com/)
- 🐦 [Follow on Twitter](https://twitter.com/neatlogs)
- 📧 Contact: hello@neatlogs.com
- 🛠 [GitHub SDK](https://github.com/NeatLogs/neatlogs)
## TL;DR
With just:
```bash
pip install neatlogs
import neatlogs
neatlogs.init("YOUR_API_KEY")
You can now capture, understand, share, and act on your CrewAI agent runs in seconds.
No setup overhead. Full trace transparency. Full team collaboration.
```

Binary file not shown.

Before

Width:  |  Height:  |  Size: 222 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 329 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 590 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 216 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 277 KiB

View File

@@ -47,7 +47,7 @@ Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools~=0.55.0"]
tools = ["crewai-tools~=0.51.0"]
embeddings = [
"tiktoken~=0.8.0"
]

View File

@@ -54,7 +54,7 @@ def _track_install_async():
_track_install_async()
__version__ = "0.148.0"
__version__ = "0.141.0"
__all__ = [
"Agent",
"Crew",

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.148.0,<1.0.0"
"crewai[tools]>=0.141.0,<1.0.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.148.0,<1.0.0",
"crewai[tools]>=0.141.0,<1.0.0",
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.148.0"
"crewai[tools]>=0.141.0"
]
[tool.crewai]

View File

@@ -1313,6 +1313,7 @@ class Crew(FlowTrackable, BaseModel):
n_iterations: int,
eval_llm: Union[str, InstanceOf[BaseLLM]],
inputs: Optional[Dict[str, Any]] = None,
include_agent_eval: Optional[bool] = False
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
try:
@@ -1332,13 +1333,28 @@ class Crew(FlowTrackable, BaseModel):
)
test_crew = self.copy()
# TODO: Refator to use a single Evaluator Manage class
evaluator = CrewEvaluator(test_crew, llm_instance)
if include_agent_eval:
from crewai.experimental.evaluation import create_default_evaluator
agent_evaluator = create_default_evaluator(crew=test_crew)
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)
if include_agent_eval:
agent_evaluator.set_iteration(i)
test_crew.kickoff(inputs=inputs)
# TODO: Refactor to use ListenerEvents instead of trigger each iteration manually
if include_agent_eval:
agent_evaluator.evaluate_current_iteration()
evaluator.print_crew_evaluation_result()
if include_agent_eval:
agent_evaluator.get_agent_evaluation(include_evaluation_feedback=True)
crewai_event_bus.emit(
self,

View File

@@ -0,0 +1,53 @@
from crewai.evaluation.base_evaluator import (
BaseEvaluator,
EvaluationScore,
MetricCategory,
AgentEvaluationResult
)
from crewai.evaluation.metrics.semantic_quality_metrics import (
SemanticQualityEvaluator
)
from crewai.evaluation.metrics.goal_metrics import (
GoalAlignmentEvaluator
)
from crewai.evaluation.metrics.reasoning_metrics import (
ReasoningEfficiencyEvaluator
)
from crewai.evaluation.metrics.tools_metrics import (
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.evaluation.evaluation_listener import (
EvaluationTraceCallback,
create_evaluation_callbacks
)
from crewai.evaluation.agent_evaluator import (
AgentEvaluator,
create_default_evaluator
)
__all__ = [
"BaseEvaluator",
"EvaluationScore",
"MetricCategory",
"AgentEvaluationResult",
"SemanticQualityEvaluator",
"GoalAlignmentEvaluator",
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"EvaluationTraceCallback",
"create_evaluation_callbacks",
"AgentEvaluator",
"create_default_evaluator"
]

View File

@@ -0,0 +1,178 @@
from crewai.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy
from crewai.agent import Agent
from crewai.task import Task
from crewai.evaluation.evaluation_display import EvaluationDisplayFormatter
from typing import Any, Dict
from collections import defaultdict
from crewai.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence
from crewai.crew import Crew
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
class AgentEvaluator:
def __init__(
self,
evaluators: Sequence[BaseEvaluator] | None = None,
crew: Crew | None = None,
):
self.crew: Crew | None = crew
self.evaluators: Sequence[BaseEvaluator] | None = evaluators
self.agent_evaluators: dict[str, Sequence[BaseEvaluator] | None] = {}
if crew is not None:
assert crew and crew.agents is not None
for agent in crew.agents:
self.agent_evaluators[str(agent.id)] = self.evaluators
self.callback = create_evaluation_callbacks()
self.console_formatter = ConsoleFormatter()
self.display_formatter = EvaluationDisplayFormatter()
self.iteration = 1
self.iterations_results: dict[int, dict[str, list[AgentEvaluationResult]]] = {}
def set_iteration(self, iteration: int) -> None:
self.iteration = iteration
def evaluate_current_iteration(self) -> dict[str, list[AgentEvaluationResult]]:
if not self.crew:
raise ValueError("Cannot evaluate: no crew was provided to the evaluator.")
if not self.callback:
raise ValueError("Cannot evaluate: no callback was set. Use set_callback() method first.")
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
evaluation_results: defaultdict[str, list[AgentEvaluationResult]] = defaultdict(list)
total_evals = 0
for agent in self.crew.agents:
for task in self.crew.tasks:
if task.agent and task.agent.id == agent.id and self.agent_evaluators.get(str(agent.id)):
total_evals += 1
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}[/bold blue]"),
BarColumn(),
TextColumn("{task.percentage:.0f}% completed"),
console=self.console_formatter.console
) as progress:
eval_task = progress.add_task(f"Evaluating agents (iteration {self.iteration})...", total=total_evals)
for agent in self.crew.agents:
evaluator = self.agent_evaluators.get(str(agent.id))
if not evaluator:
continue
for task in self.crew.tasks:
if task.agent and str(task.agent.id) != str(agent.id):
continue
trace = self.callback.get_trace(str(agent.id), str(task.id))
if not trace:
self.console_formatter.print(f"[yellow]Warning: No trace found for agent {agent.role} on task {task.description[:30]}...[/yellow]")
progress.update(eval_task, advance=1)
continue
with crewai_event_bus.scoped_handlers():
result = self.evaluate(
agent=agent,
task=task,
execution_trace=trace,
final_output=task.output
)
evaluation_results[agent.role].append(result)
progress.update(eval_task, advance=1)
self.iterations_results[self.iteration] = evaluation_results
return evaluation_results
def get_evaluation_results(self):
if self.iteration in self.iterations_results:
return self.iterations_results[self.iteration]
return self.evaluate_current_iteration()
def display_results_with_iterations(self):
self.display_formatter.display_summary_results(self.iterations_results)
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = False):
agent_results = {}
with crewai_event_bus.scoped_handlers():
task_results = self.get_evaluation_results()
for agent_role, results in task_results.items():
if not results:
continue
agent_id = results[0].agent_id
aggregated_result = self.display_formatter._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=results,
strategy=strategy
)
agent_results[agent_role] = aggregated_result
if self.iteration == max(self.iterations_results.keys()):
self.display_results_with_iterations()
if include_evaluation_feedback:
self.display_evaluation_with_feedback()
return agent_results
def display_evaluation_with_feedback(self):
self.display_formatter.display_evaluation_with_feedback(self.iterations_results)
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any
) -> AgentEvaluationResult:
result = AgentEvaluationResult(
agent_id=str(agent.id),
task_id=str(task.id)
)
assert self.evaluators is not None
for evaluator in self.evaluators:
try:
score = evaluator.evaluate(
agent=agent,
task=task,
execution_trace=execution_trace,
final_output=final_output
)
result.metrics[evaluator.metric_category] = score
except Exception as e:
self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}")
return result
def create_default_evaluator(crew, llm=None):
from crewai.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
)
evaluators = [
GoalAlignmentEvaluator(llm=llm),
SemanticQualityEvaluator(llm=llm),
ToolSelectionEvaluator(llm=llm),
ParameterExtractionEvaluator(llm=llm),
ToolInvocationEvaluator(llm=llm),
ReasoningEfficiencyEvaluator(llm=llm),
]
return AgentEvaluator(evaluators=evaluators, crew=crew)

View File

@@ -0,0 +1,125 @@
import abc
import enum
from enum import Enum
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.task import Task
from crewai.llm import BaseLLM
from crewai.utilities.llm_utils import create_llm
class MetricCategory(enum.Enum):
GOAL_ALIGNMENT = "goal_alignment"
SEMANTIC_QUALITY = "semantic_quality"
REASONING_EFFICIENCY = "reasoning_efficiency"
TOOL_SELECTION = "tool_selection"
PARAMETER_EXTRACTION = "parameter_extraction"
TOOL_INVOCATION = "tool_invocation"
def title(self):
return self.value.replace('_', ' ').title()
class EvaluationScore(BaseModel):
score: float | None = Field(
default=5.0,
description="Numeric score from 0-10 where 0 is worst and 10 is best, None if not applicable",
ge=0.0,
le=10.0
)
feedback: str = Field(
default="",
description="Detailed feedback explaining the evaluation score"
)
raw_response: str | None = Field(
default=None,
description="Raw response from the evaluator (e.g., LLM)"
)
def __str__(self) -> str:
if self.score is None:
return f"Score: N/A - {self.feedback}"
return f"Score: {self.score:.1f}/10 - {self.feedback}"
class BaseEvaluator(abc.ABC):
def __init__(self, llm: BaseLLM | None = None):
self.llm: BaseLLM | None = create_llm(llm)
@property
@abc.abstractmethod
def metric_category(self) -> MetricCategory:
pass
@abc.abstractmethod
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
) -> EvaluationScore:
pass
class AgentEvaluationResult(BaseModel):
agent_id: str = Field(description="ID of the evaluated agent")
task_id: str = Field(description="ID of the task that was executed")
metrics: Dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Evaluation scores for each metric category"
)
class AggregationStrategy(Enum):
SIMPLE_AVERAGE = "simple_average" # Equal weight to all tasks
WEIGHTED_BY_COMPLEXITY = "weighted_by_complexity" # Weight by task complexity
BEST_PERFORMANCE = "best_performance" # Use best scores across tasks
WORST_PERFORMANCE = "worst_performance" # Use worst scores across tasks
class AgentAggregatedEvaluationResult(BaseModel):
agent_id: str = Field(
default="",
description="ID of the agent"
)
agent_role: str = Field(
default="",
description="Role of the agent"
)
task_count: int = Field(
default=0,
description="Number of tasks included in this aggregation"
)
aggregation_strategy: AggregationStrategy = Field(
default=AggregationStrategy.SIMPLE_AVERAGE,
description="Strategy used for aggregation"
)
metrics: Dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Aggregated metrics across all tasks"
)
task_results: List[str] = Field(
default_factory=list,
description="IDs of tasks included in this aggregation"
)
overall_score: Optional[float] = Field(
default=None,
description="Overall score for this agent"
)
def __str__(self) -> str:
result = f"Agent Evaluation: {self.agent_role}\n"
result += f"Strategy: {self.aggregation_strategy.value}\n"
result += f"Tasks evaluated: {self.task_count}\n"
for category, score in self.metrics.items():
result += f"\n\n- {category.value.upper()}: {score.score}/10\n"
if score.feedback:
detailed_feedback = "\n ".join(score.feedback.split('\n'))
result += f" {detailed_feedback}\n"
return result

View File

@@ -0,0 +1,341 @@
from collections import defaultdict
from typing import Dict, Any, List
from rich.table import Table
from rich.box import HEAVY_EDGE, ROUNDED
from collections.abc import Sequence
from crewai.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy, AgentEvaluationResult, MetricCategory
from crewai.evaluation import EvaluationScore
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.llm_utils import create_llm
class EvaluationDisplayFormatter:
def __init__(self):
self.console_formatter = ConsoleFormatter()
def display_evaluation_with_feedback(self, iterations_results: Dict[int, Dict[str, List[Any]]]):
if not iterations_results:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
return
# Get all agent roles across all iterations
all_agent_roles: set[str] = set()
for iter_results in iterations_results.values():
all_agent_roles.update(iter_results.keys())
for agent_role in sorted(all_agent_roles):
self.console_formatter.print(f"\n[bold cyan]Agent: {agent_role}[/bold cyan]")
# Process each iteration
for iter_num, results in sorted(iterations_results.items()):
if agent_role not in results or not results[agent_role]:
continue
agent_results = results[agent_role]
agent_id = agent_results[0].agent_id
# Aggregate results for this agent in this iteration
aggregated_result = self._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=agent_results,
)
# Display iteration header
self.console_formatter.print(f"\n[bold]Iteration {iter_num}[/bold]")
# Create table for this iteration
table = Table(box=ROUNDED)
table.add_column("Metric", style="cyan")
table.add_column("Score (1-10)", justify="center")
table.add_column("Feedback", style="green")
# Add metrics to table
if aggregated_result.metrics:
for metric, evaluation_score in aggregated_result.metrics.items():
score = evaluation_score.score
if isinstance(score, (int, float)):
if score >= 8.0:
score_text = f"[green]{score:.1f}[/green]"
elif score >= 6.0:
score_text = f"[cyan]{score:.1f}[/cyan]"
elif score >= 4.0:
score_text = f"[yellow]{score:.1f}[/yellow]"
else:
score_text = f"[red]{score:.1f}[/red]"
else:
score_text = "[dim]N/A[/dim]"
table.add_section()
table.add_row(
metric.title(),
score_text,
evaluation_score.feedback or ""
)
if aggregated_result.overall_score is not None:
overall_score = aggregated_result.overall_score
if overall_score >= 8.0:
overall_color = "green"
elif overall_score >= 6.0:
overall_color = "cyan"
elif overall_score >= 4.0:
overall_color = "yellow"
else:
overall_color = "red"
table.add_section()
table.add_row(
"Overall Score",
f"[{overall_color}]{overall_score:.1f}[/]",
"Overall agent evaluation score"
)
# Print the table for this iteration
self.console_formatter.print(table)
def display_summary_results(self, iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]]):
if not iterations_results:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
return
self.console_formatter.print("\n")
table = Table(title="Agent Performance Scores \n (1-10 Higher is better)", box=HEAVY_EDGE)
table.add_column("Agent/Metric", style="cyan")
for iter_num in sorted(iterations_results.keys()):
run_label = f"Run {iter_num}"
table.add_column(run_label, justify="center")
table.add_column("Avg. Total", justify="center")
all_agent_roles: set[str] = set()
for results in iterations_results.values():
all_agent_roles.update(results.keys())
for agent_role in sorted(all_agent_roles):
agent_scores_by_iteration = {}
agent_metrics_by_iteration = {}
for iter_num, results in sorted(iterations_results.items()):
if agent_role not in results or not results[agent_role]:
continue
agent_results = results[agent_role]
agent_id = agent_results[0].agent_id
aggregated_result = self._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=agent_results,
strategy=AggregationStrategy.SIMPLE_AVERAGE
)
valid_scores = [score.score for score in aggregated_result.metrics.values()
if score.score is not None]
if valid_scores:
avg_score = sum(valid_scores) / len(valid_scores)
agent_scores_by_iteration[iter_num] = avg_score
agent_metrics_by_iteration[iter_num] = aggregated_result.metrics
if not agent_scores_by_iteration:
continue
avg_across_iterations = sum(agent_scores_by_iteration.values()) / len(agent_scores_by_iteration)
row = [f"[bold]{agent_role}[/bold]"]
for iter_num in sorted(iterations_results.keys()):
if iter_num in agent_scores_by_iteration:
score = agent_scores_by_iteration[iter_num]
if score >= 8.0:
color = "green"
elif score >= 6.0:
color = "cyan"
elif score >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[bold {color}]{score:.1f}[/]")
else:
row.append("-")
if avg_across_iterations >= 8.0:
color = "green"
elif avg_across_iterations >= 6.0:
color = "cyan"
elif avg_across_iterations >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[bold {color}]{avg_across_iterations:.1f}[/]")
table.add_row(*row)
all_metrics: set[Any] = set()
for metrics in agent_metrics_by_iteration.values():
all_metrics.update(metrics.keys())
for metric in sorted(all_metrics, key=lambda x: x.value):
metric_scores = []
row = [f" - {metric.title()}"]
for iter_num in sorted(iterations_results.keys()):
if (iter_num in agent_metrics_by_iteration and
metric in agent_metrics_by_iteration[iter_num]):
metric_score = agent_metrics_by_iteration[iter_num][metric].score
if metric_score is not None:
metric_scores.append(metric_score)
if metric_score >= 8.0:
color = "green"
elif metric_score >= 6.0:
color = "cyan"
elif metric_score >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[{color}]{metric_score:.1f}[/]")
else:
row.append("[dim]N/A[/dim]")
else:
row.append("-")
if metric_scores:
avg = sum(metric_scores) / len(metric_scores)
if avg >= 8.0:
color = "green"
elif avg >= 6.0:
color = "cyan"
elif avg >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[{color}]{avg:.1f}[/]")
else:
row.append("-")
table.add_row(*row)
table.add_row(*[""] * (len(sorted(iterations_results.keys())) + 2))
self.console_formatter.print(table)
self.console_formatter.print("\n")
def _aggregate_agent_results(
self,
agent_id: str,
agent_role: str,
results: Sequence[AgentEvaluationResult],
strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE,
) -> AgentAggregatedEvaluationResult:
metrics_by_category: dict[MetricCategory, list[EvaluationScore]] = defaultdict(list)
for result in results:
for metric_name, evaluation_score in result.metrics.items():
metrics_by_category[metric_name].append(evaluation_score)
aggregated_metrics: dict[MetricCategory, EvaluationScore] = {}
for category, scores in metrics_by_category.items():
valid_scores = [s.score for s in scores if s.score is not None]
avg_score = sum(valid_scores) / len(valid_scores) if valid_scores else None
feedbacks = [s.feedback for s in scores if s.feedback]
feedback_summary = None
if feedbacks:
if len(feedbacks) > 1:
# Use the summarization method for multiple feedbacks
feedback_summary = self._summarize_feedbacks(
agent_role=agent_role,
metric=category.title(),
feedbacks=feedbacks,
scores=[s.score for s in scores],
strategy=strategy
)
else:
feedback_summary = feedbacks[0]
aggregated_metrics[category] = EvaluationScore(
score=avg_score,
feedback=feedback_summary
)
overall_score = None
if aggregated_metrics:
valid_scores = [m.score for m in aggregated_metrics.values() if m.score is not None]
if valid_scores:
overall_score = sum(valid_scores) / len(valid_scores)
return AgentAggregatedEvaluationResult(
agent_id=agent_id,
agent_role=agent_role,
metrics=aggregated_metrics,
overall_score=overall_score,
task_count=len(results),
aggregation_strategy=strategy
)
def _summarize_feedbacks(
self,
agent_role: str,
metric: str,
feedbacks: List[str],
scores: List[float | None],
strategy: AggregationStrategy
) -> str:
if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks):
return "\n\n".join([f"Feedback {i+1}: {fb}" for i, fb in enumerate(feedbacks)])
try:
llm = create_llm()
formatted_feedbacks = []
for i, (feedback, score) in enumerate(zip(feedbacks, scores)):
if len(feedback) > 500:
feedback = feedback[:500] + "..."
score_text = f"{score:.1f}" if score is not None else "N/A"
formatted_feedbacks.append(f"Feedback #{i+1} (Score: {score_text}):\n{feedback}")
all_feedbacks = "\n\n" + "\n\n---\n\n".join(formatted_feedbacks)
strategy_guidance = ""
if strategy == AggregationStrategy.BEST_PERFORMANCE:
strategy_guidance = "Focus on the highest-scoring aspects and strengths demonstrated."
elif strategy == AggregationStrategy.WORST_PERFORMANCE:
strategy_guidance = "Focus on areas that need improvement and common issues across tasks."
else: # Default/average strategies
strategy_guidance = "Provide a balanced analysis of strengths and weaknesses across all tasks."
prompt = [
{"role": "system", "content": f"""You are an expert evaluator creating a comprehensive summary of agent performance feedback.
Your job is to synthesize multiple feedback points about the same metric across different tasks.
Create a concise, insightful summary that captures the key patterns and themes from all feedback.
{strategy_guidance}
Your summary should be:
1. Specific and concrete (not vague or general)
2. Focused on actionable insights
3. Highlighting patterns across tasks
4. 150-250 words in length
The summary should be directly usable as final feedback for the agent's performance on this metric."""},
{"role": "user", "content": f"""I need a synthesized summary of the following feedback for:
Agent Role: {agent_role}
Metric: {metric.title()}
{all_feedbacks}
"""}
]
assert llm is not None
response = llm.call(prompt)
return response
except Exception:
return "Synthesized from multiple tasks: " + "\n\n".join([f"- {fb[:500]}..." for fb in feedbacks])

View File

@@ -0,0 +1,190 @@
from datetime import datetime
from typing import Any, Dict, Optional
from collections.abc import Sequence
from crewai.agent import Agent
from crewai.task import Task
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent
)
from crewai.utilities.events.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
ToolExecutionErrorEvent,
ToolSelectionErrorEvent,
ToolValidateInputErrorEvent
)
from crewai.utilities.events.llm_events import (
LLMCallStartedEvent,
LLMCallCompletedEvent
)
class EvaluationTraceCallback(BaseEventListener):
"""Event listener for collecting execution traces for evaluation.
This listener attaches to the event bus to collect detailed information
about the execution process, including agent steps, tool uses, knowledge
retrievals, and final output - all for use in agent evaluation.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not hasattr(self, "_initialized") or not self._initialized:
super().__init__()
self.traces = {}
self.current_agent_id = None
self.current_task_id = None
self._initialized = True
def setup_listeners(self, event_bus: CrewAIEventsBus):
@event_bus.on(AgentExecutionStartedEvent)
def on_agent_started(source, event: AgentExecutionStartedEvent):
self.on_agent_start(event.agent, event.task)
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_completed(source, event: AgentExecutionCompletedEvent):
self.on_agent_finish(event.agent, event.task, event.output)
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_completed(source, event: ToolUsageFinishedEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.output, success=True)
@event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="usage_error")
@event_bus.on(ToolExecutionErrorEvent)
def on_tool_execution_error(source, event: ToolExecutionErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="execution_error")
@event_bus.on(ToolSelectionErrorEvent)
def on_tool_selection_error(source, event: ToolSelectionErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="selection_error")
@event_bus.on(ToolValidateInputErrorEvent)
def on_tool_validate_input_error(source, event: ToolValidateInputErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="validation_error")
@event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
self.on_llm_call_start(event.messages, event.tools)
@event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.on_llm_call_end(event.messages, event.response)
def on_agent_start(self, agent: Agent, task: Task):
self.current_agent_id = agent.id
self.current_task_id = task.id
trace_key = f"{agent.id}_{task.id}"
self.traces[trace_key] = {
"agent_id": agent.id,
"task_id": task.id,
"tool_uses": [],
"llm_calls": [],
"start_time": datetime.now(),
"final_output": None
}
def on_agent_finish(self, agent: Agent, task: Task, output: Any):
trace_key = f"{agent.id}_{task.id}"
if trace_key in self.traces:
self.traces[trace_key]["final_output"] = output
self.traces[trace_key]["end_time"] = datetime.now()
self.current_agent_id = None
self.current_task_id = None
def on_tool_use(self, tool_name: str, tool_args: dict[str, Any] | str, result: Any,
success: bool = True, error_type: str | None = None):
if not self.current_agent_id or not self.current_task_id:
return
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
if trace_key in self.traces:
tool_use = {
"tool": tool_name,
"args": tool_args,
"result": result,
"success": success,
"timestamp": datetime.now()
}
# Add error information if applicable
if not success and error_type:
tool_use["error"] = True
tool_use["error_type"] = error_type
self.traces[trace_key]["tool_uses"].append(tool_use)
def on_llm_call_start(self, messages: str | Sequence[dict[str, Any]] | None, tools: Sequence[dict[str, Any]] | None = None):
if not self.current_agent_id or not self.current_task_id:
return
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
if trace_key not in self.traces:
return
self.current_llm_call = {
"messages": messages,
"tools": tools,
"start_time": datetime.now(),
"response": None,
"end_time": None
}
def on_llm_call_end(self, messages: str | list[dict[str, Any]] | None, response: Any):
if not self.current_agent_id or not self.current_task_id:
return
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
if trace_key not in self.traces:
return
total_tokens = 0
if hasattr(response, "usage") and hasattr(response.usage, "total_tokens"):
total_tokens = response.usage.total_tokens
current_time = datetime.now()
start_time = None
if hasattr(self, "current_llm_call") and self.current_llm_call:
start_time = self.current_llm_call.get("start_time")
if not start_time:
start_time = current_time
llm_call = {
"messages": messages,
"response": response,
"start_time": start_time,
"end_time": current_time,
"total_tokens": total_tokens
}
self.traces[trace_key]["llm_calls"].append(llm_call)
if hasattr(self, "current_llm_call"):
self.current_llm_call = {}
def get_trace(self, agent_id: str, task_id: str) -> Optional[Dict[str, Any]]:
trace_key = f"{agent_id}_{task_id}"
return self.traces.get(trace_key)
def create_evaluation_callbacks() -> EvaluationTraceCallback:
return EvaluationTraceCallback()

View File

@@ -0,0 +1,49 @@
import warnings
from crewai.experimental.evaluation import ExperimentResults
def assert_experiment_successfully(experiment_results: ExperimentResults) -> None:
"""
Assert that all experiment results passed successfully.
Args:
experiment_results: The experiment results to check
Raises:
AssertionError: If any test case failed
"""
failed_tests = [result for result in experiment_results.results if not result.passed]
if failed_tests:
detailed_failures: list[str] = []
for result in failed_tests:
expected = result.expected_score
actual = result.score
detailed_failures.append(f"- {result.identifier}: expected {expected}, got {actual}")
failure_details = "\n".join(detailed_failures)
raise AssertionError(f"The following test cases failed:\n{failure_details}")
def assert_experiment_no_regression(comparison_result: dict[str, list[str]]) -> None:
"""
Assert that there are no regressions in the experiment results compared to baseline.
Also warns if there are missing tests.
Args:
comparison_result: The result from compare_with_baseline()
Raises:
AssertionError: If there are regressions
"""
# Check for regressions
regressed = comparison_result.get("regressed", [])
if regressed:
raise AssertionError(f"Regression detected! The following tests that previously passed now fail: {regressed}")
# Check for missing tests and warn
missing_tests = comparison_result.get("missing_tests", [])
if missing_tests:
warnings.warn(
f"Warning: {len(missing_tests)} tests from the baseline are missing in the current run: {missing_tests}",
UserWarning
)

View File

@@ -0,0 +1,30 @@
"""Robust JSON parsing utilities for evaluation responses."""
import json
import re
from typing import Any
def extract_json_from_llm_response(text: str) -> dict[str, Any]:
try:
return json.loads(text)
except json.JSONDecodeError:
pass
json_patterns = [
# Standard markdown code blocks with json
r'```json\s*([\s\S]*?)\s*```',
# Code blocks without language specifier
r'```\s*([\s\S]*?)\s*```',
# Inline code with JSON
r'`([{\\[].*[}\]])`',
]
for pattern in json_patterns:
matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL)
for match in matches:
try:
return json.loads(match.strip())
except json.JSONDecodeError:
continue
raise ValueError("No valid JSON found in the response")

View File

@@ -0,0 +1,66 @@
from typing import Any, Dict
from crewai.agent import Agent
from crewai.task import Task
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.evaluation.json_parser import extract_json_from_llm_response
class GoalAlignmentEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.GOAL_ALIGNMENT
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
) -> EvaluationScore:
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent's output aligns with its assigned task goal.
Score the agent's goal alignment on a scale from 0-10 where:
- 0: Complete misalignment, agent did not understand or attempt the task goal
- 5: Partial alignment, agent attempted the task but missed key requirements
- 10: Perfect alignment, agent fully satisfied all task requirements
Consider:
1. Did the agent correctly interpret the task goal?
2. Did the final output directly address the requirements?
3. Did the agent focus on relevant aspects of the task?
4. Did the agent provide all requested information or deliverables?
Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string).
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Agent goal: {agent.goal}
Task description: {task.description}
Expected output: {task.expected_output}
Agent's final output:
{final_output}
Evaluate how well the agent's output aligns with the assigned task goal.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
assert evaluation_data is not None
return EvaluationScore(
score=evaluation_data.get("score", 0),
feedback=evaluation_data.get("feedback", response),
raw_response=response
)
except Exception:
return EvaluationScore(
score=None,
feedback=f"Failed to parse evaluation. Raw response: {response}",
raw_response=response
)

View File

@@ -0,0 +1,355 @@
"""Agent reasoning efficiency evaluators.
This module provides evaluator implementations for:
- Reasoning efficiency
- Loop detection
- Thinking-to-action ratio
"""
import logging
import re
from enum import Enum
from typing import Any, Dict, List, Tuple
import numpy as np
from collections.abc import Sequence
from crewai.agent import Agent
from crewai.task import Task
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.evaluation.json_parser import extract_json_from_llm_response
from crewai.tasks.task_output import TaskOutput
class ReasoningPatternType(Enum):
EFFICIENT = "efficient" # Good reasoning flow
LOOP = "loop" # Agent is stuck in a loop
VERBOSE = "verbose" # Agent is unnecessarily verbose
INDECISIVE = "indecisive" # Agent struggles to make decisions
SCATTERED = "scattered" # Agent jumps between topics without focus
class ReasoningEfficiencyEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.REASONING_EFFICIENCY
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: TaskOutput,
) -> EvaluationScore:
llm_calls = execution_trace.get("llm_calls", [])
if not llm_calls or len(llm_calls) < 2:
return EvaluationScore(
score=None,
feedback="Insufficient LLM calls to evaluate reasoning efficiency."
)
total_calls = len(llm_calls)
total_tokens = sum(call.get("total_tokens", 0) for call in llm_calls)
avg_tokens_per_call = total_tokens / total_calls if total_calls > 0 else 0
time_intervals = []
has_reliable_timing = True
for i in range(1, len(llm_calls)):
start_time = llm_calls[i-1].get("end_time")
end_time = llm_calls[i].get("start_time")
if start_time and end_time and start_time != end_time:
try:
interval = end_time - start_time
time_intervals.append(interval.total_seconds() if hasattr(interval, 'total_seconds') else 0)
except Exception:
has_reliable_timing = False
else:
has_reliable_timing = False
loop_detected, loop_details = self._detect_loops(llm_calls)
pattern_analysis = self._analyze_reasoning_patterns(llm_calls)
efficiency_metrics = {
"total_llm_calls": total_calls,
"total_tokens": total_tokens,
"avg_tokens_per_call": avg_tokens_per_call,
"reasoning_pattern": pattern_analysis["primary_pattern"].value,
"loops_detected": loop_detected,
}
if has_reliable_timing and time_intervals:
efficiency_metrics["avg_time_between_calls"] = np.mean(time_intervals)
loop_info = f"Detected {len(loop_details)} potential reasoning loops." if loop_detected else "No significant reasoning loops detected."
call_samples = self._get_call_samples(llm_calls)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the reasoning efficiency of an AI agent's thought process.
Evaluate the agent's reasoning efficiency across these five key subcategories:
1. Focus (0-10): How well the agent stays on topic and avoids unnecessary tangents
2. Progression (0-10): How effectively the agent builds on previous thoughts rather than repeating or circling
3. Decision Quality (0-10): How decisively and appropriately the agent makes decisions
4. Conciseness (0-10): How efficiently the agent communicates without unnecessary verbosity
5. Loop Avoidance (0-10): How well the agent avoids getting stuck in repetitive thinking patterns
For each subcategory, provide a score from 0-10 where:
- 0: Completely inefficient
- 5: Moderately efficient
- 10: Highly efficient
The overall score should be a weighted average of these subcategories.
Return your evaluation as JSON with the following structure:
{
"overall_score": float,
"scores": {
"focus": float,
"progression": float,
"decision_quality": float,
"conciseness": float,
"loop_avoidance": float
},
"feedback": string (general feedback about overall reasoning efficiency),
"optimization_suggestions": string (concrete suggestions for improving reasoning efficiency),
"detected_patterns": string (describe any inefficient reasoning patterns you observe)
}"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
Reasoning efficiency metrics:
- Total LLM calls: {efficiency_metrics["total_llm_calls"]}
- Average tokens per call: {efficiency_metrics["avg_tokens_per_call"]:.1f}
- Primary reasoning pattern: {efficiency_metrics["reasoning_pattern"]}
- {loop_info}
{"- Average time between calls: {:.2f} seconds".format(efficiency_metrics.get("avg_time_between_calls", 0)) if "avg_time_between_calls" in efficiency_metrics else ""}
Sample of agent reasoning flow (chronological sequence):
{call_samples}
Agent's final output:
{final_output.raw[:500]}... (truncated)
Evaluate the reasoning efficiency of this agent based on these interaction patterns.
Identify any inefficient reasoning patterns and provide specific suggestions for optimization.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
scores = evaluation_data.get("scores", {})
focus = scores.get("focus", 5.0)
progression = scores.get("progression", 5.0)
decision_quality = scores.get("decision_quality", 5.0)
conciseness = scores.get("conciseness", 5.0)
loop_avoidance = scores.get("loop_avoidance", 5.0)
overall_score = evaluation_data.get("overall_score", evaluation_data.get("score", 5.0))
feedback = evaluation_data.get("feedback", "No detailed feedback provided.")
optimization_suggestions = evaluation_data.get("optimization_suggestions", "No specific suggestions provided.")
detailed_feedback = "Reasoning Efficiency Evaluation:\n"
detailed_feedback += f"• Focus: {focus}/10 - Staying on topic without tangents\n"
detailed_feedback += f"• Progression: {progression}/10 - Building on previous thinking\n"
detailed_feedback += f"• Decision Quality: {decision_quality}/10 - Making appropriate decisions\n"
detailed_feedback += f"• Conciseness: {conciseness}/10 - Communicating efficiently\n"
detailed_feedback += f"• Loop Avoidance: {loop_avoidance}/10 - Avoiding repetitive patterns\n\n"
detailed_feedback += f"Feedback:\n{feedback}\n\n"
detailed_feedback += f"Optimization Suggestions:\n{optimization_suggestions}"
return EvaluationScore(
score=float(overall_score),
feedback=detailed_feedback,
raw_response=response
)
except Exception as e:
logging.warning(f"Failed to parse reasoning efficiency evaluation: {e}")
return EvaluationScore(
score=None,
feedback=f"Failed to parse reasoning efficiency evaluation. Raw response: {response[:200]}...",
raw_response=response
)
def _detect_loops(self, llm_calls: List[Dict]) -> Tuple[bool, List[Dict]]:
loop_details = []
messages = []
for call in llm_calls:
content = call.get("response", "")
if isinstance(content, str):
messages.append(content)
elif isinstance(content, list) and len(content) > 0:
# Handle message list format
for msg in content:
if isinstance(msg, dict) and "content" in msg:
messages.append(msg["content"])
# Simple n-gram based similarity detection
# For a more robust implementation, consider using embedding-based similarity
for i in range(len(messages) - 2):
for j in range(i + 1, len(messages) - 1):
# Check for repeated patterns (simplistic approach)
# A more sophisticated approach would use semantic similarity
similarity = self._calculate_text_similarity(messages[i], messages[j])
if similarity > 0.7: # Arbitrary threshold
loop_details.append({
"first_occurrence": i,
"second_occurrence": j,
"similarity": similarity,
"snippet": messages[i][:100] + "..."
})
return len(loop_details) > 0, loop_details
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
text1 = re.sub(r'\s+', ' ', text1.lower()).strip()
text2 = re.sub(r'\s+', ' ', text2.lower()).strip()
# Simple Jaccard similarity on word sets
words1 = set(text1.split())
words2 = set(text2.split())
intersection = len(words1.intersection(words2))
union = len(words1.union(words2))
return intersection / union if union > 0 else 0.0
def _analyze_reasoning_patterns(self, llm_calls: List[Dict]) -> Dict[str, Any]:
call_lengths = []
response_times = []
for call in llm_calls:
content = call.get("response", "")
if isinstance(content, str):
call_lengths.append(len(content))
elif isinstance(content, list) and len(content) > 0:
# Handle message list format
total_length = 0
for msg in content:
if isinstance(msg, dict) and "content" in msg:
total_length += len(msg["content"])
call_lengths.append(total_length)
start_time = call.get("start_time")
end_time = call.get("end_time")
if start_time and end_time:
try:
response_times.append(end_time - start_time)
except Exception:
pass
avg_length = np.mean(call_lengths) if call_lengths else 0
std_length = np.std(call_lengths) if call_lengths else 0
length_trend = self._calculate_trend(call_lengths)
primary_pattern = ReasoningPatternType.EFFICIENT
details = "Agent demonstrates efficient reasoning patterns."
loop_score = self._calculate_loop_likelihood(call_lengths, response_times)
if loop_score > 0.7:
primary_pattern = ReasoningPatternType.LOOP
details = "Agent appears to be stuck in repetitive thinking patterns."
elif avg_length > 1000 and std_length / avg_length < 0.3:
primary_pattern = ReasoningPatternType.VERBOSE
details = "Agent is consistently verbose across interactions."
elif len(llm_calls) > 10 and length_trend > 0.5:
primary_pattern = ReasoningPatternType.INDECISIVE
details = "Agent shows signs of indecisiveness with increasing message lengths."
elif std_length / avg_length > 0.8:
primary_pattern = ReasoningPatternType.SCATTERED
details = "Agent shows inconsistent reasoning flow with highly variable responses."
return {
"primary_pattern": primary_pattern,
"details": details,
"metrics": {
"avg_length": avg_length,
"std_length": std_length,
"length_trend": length_trend,
"loop_score": loop_score
}
}
def _calculate_trend(self, values: Sequence[float | int]) -> float:
if not values or len(values) < 2:
return 0.0
try:
x = np.arange(len(values))
y = np.array(values)
# Simple linear regression
slope = np.polyfit(x, y, 1)[0]
# Normalize slope to -1 to 1 range
max_possible_slope = max(values) - min(values)
if max_possible_slope > 0:
normalized_slope = slope / max_possible_slope
return max(min(normalized_slope, 1.0), -1.0)
return 0.0
except Exception:
return 0.0
def _calculate_loop_likelihood(self, call_lengths: Sequence[float], response_times: Sequence[float]) -> float:
if not call_lengths or len(call_lengths) < 3:
return 0.0
indicators = []
if len(call_lengths) >= 4:
repeated_lengths = 0
for i in range(len(call_lengths) - 2):
ratio = call_lengths[i] / call_lengths[i + 2] if call_lengths[i + 2] > 0 else 0
if 0.85 <= ratio <= 1.15:
repeated_lengths += 1
length_repetition_score = repeated_lengths / (len(call_lengths) - 2)
indicators.append(length_repetition_score)
if response_times and len(response_times) >= 3:
try:
std_time = np.std(response_times)
mean_time = np.mean(response_times)
if mean_time > 0:
time_consistency = 1.0 - (std_time / mean_time)
indicators.append(max(0, time_consistency - 0.3) * 1.5)
except Exception:
pass
return np.mean(indicators) if indicators else 0.0
def _get_call_samples(self, llm_calls: List[Dict]) -> str:
samples = []
if len(llm_calls) <= 6:
sample_indices = list(range(len(llm_calls)))
else:
sample_indices = [0, 1, len(llm_calls) // 2 - 1, len(llm_calls) // 2,
len(llm_calls) - 2, len(llm_calls) - 1]
for idx in sample_indices:
call = llm_calls[idx]
content = call.get("response", "")
if isinstance(content, str):
sample = content
elif isinstance(content, list) and len(content) > 0:
sample_parts = []
for msg in content:
if isinstance(msg, dict) and "content" in msg:
sample_parts.append(msg["content"])
sample = "\n".join(sample_parts)
else:
sample = str(content)
truncated = sample[:200] + "..." if len(sample) > 200 else sample
samples.append(f"Call {idx + 1}:\n{truncated}\n")
return "\n".join(samples)

View File

@@ -0,0 +1,65 @@
from typing import Any, Dict
from crewai.agent import Agent
from crewai.task import Task
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.evaluation.json_parser import extract_json_from_llm_response
class SemanticQualityEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.SEMANTIC_QUALITY
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
) -> EvaluationScore:
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the semantic quality of an AI agent's output.
Score the semantic quality on a scale from 0-10 where:
- 0: Completely incoherent, confusing, or logically flawed output
- 5: Moderately clear and logical output with some issues
- 10: Exceptionally clear, coherent, and logically sound output
Consider:
1. Is the output well-structured and organized?
2. Is the reasoning logical and well-supported?
3. Is the language clear, precise, and appropriate for the task?
4. Are claims supported by evidence when appropriate?
5. Is the output free from contradictions and logical fallacies?
Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string).
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
Agent's final output:
{final_output}
Evaluate the semantic quality and reasoning of this output.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
assert evaluation_data is not None
return EvaluationScore(
score=float(evaluation_data["score"]) if evaluation_data.get("score") is not None else None,
feedback=evaluation_data.get("feedback", response),
raw_response=response
)
except Exception:
return EvaluationScore(
score=None,
feedback=f"Failed to parse evaluation. Raw response: {response}",
raw_response=response
)

View File

@@ -0,0 +1,400 @@
import json
from typing import Dict, Any
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.evaluation.json_parser import extract_json_from_llm_response
from crewai.agent import Agent
from crewai.task import Task
class ToolSelectionEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.TOOL_SELECTION
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
) -> EvaluationScore:
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
unique_tool_types = set([tool.get("tool", "Unknown tool") for tool in tool_uses])
if tool_count == 0:
if not agent.tools:
return EvaluationScore(
score=None,
feedback="Agent had no tools available to use."
)
else:
return EvaluationScore(
score=None,
feedback="Agent had tools available but didn't use any."
)
available_tools_info = ""
if agent.tools:
for tool in agent.tools:
available_tools_info += f"- {tool.name}: {tool.description}\n"
else:
available_tools_info = "No tools available"
tool_types_summary = "Tools selected by the agent:\n"
for tool_type in sorted(unique_tool_types):
tool_types_summary += f"- {tool_type}\n"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing if an AI agent selected the most appropriate tools for a given task.
You must evaluate based on these 2 criteria:
1. Relevance (0-10): Were the tools chosen directly aligned with the task's goals?
2. Coverage (0-10): Did the agent select ALL appropriate tools from the AVAILABLE tools?
IMPORTANT:
- ONLY consider tools that are listed as available to the agent
- DO NOT suggest tools that aren't in the 'Available tools' list
- DO NOT evaluate the quality or accuracy of tool outputs/results
- DO NOT evaluate how many times each tool was used
- DO NOT evaluate how the agent used the parameters
- DO NOT evaluate whether the agent interpreted the task correctly
Focus ONLY on whether the correct CATEGORIES of tools were selected from what was available.
Return your evaluation as JSON with these fields:
- scores: {"relevance": number, "coverage": number}
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on tool selection decisions from available tools)
- improvement_suggestions: string (ONLY suggest better selection from the AVAILABLE tools list, NOT new tools)
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
Available tools for this agent:
{available_tools_info}
{tool_types_summary}
Based ONLY on the task description and comparing the AVAILABLE tools with those that were selected (listed above), evaluate if the agent selected the appropriate tool types for this task.
IMPORTANT:
- ONLY evaluate selection from tools listed as available
- DO NOT suggest new tools that aren't in the available tools list
- DO NOT evaluate tool usage or results
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {})
relevance = scores.get("relevance", 5.0)
coverage = scores.get("coverage", 5.0)
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Tool Selection Evaluation:\n"
feedback += f"• Relevance: {relevance}/10 - Selection of appropriate tool types for the task\n"
feedback += f"• Coverage: {coverage}/10 - Selection of all necessary tool types\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating tool selection: {e}",
raw_response=response
)
class ParameterExtractionEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.PARAMETER_EXTRACTION
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
) -> EvaluationScore:
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
if tool_count == 0:
return EvaluationScore(
score=None,
feedback="No tool usage detected. Cannot evaluate parameter extraction."
)
validation_errors = []
for tool_use in tool_uses:
if not tool_use.get("success", True) and tool_use.get("error_type") == "validation_error":
validation_errors.append({
"tool": tool_use.get("tool", "Unknown tool"),
"error": tool_use.get("result"),
"args": tool_use.get("args", {})
})
validation_error_rate = len(validation_errors) / tool_count if tool_count > 0 else 0
param_samples = []
for i, tool_use in enumerate(tool_uses[:5]):
tool_name = tool_use.get("tool", "Unknown tool")
tool_args = tool_use.get("args", {})
success = tool_use.get("success", True) and not tool_use.get("error", False)
error_type = tool_use.get("error_type", "") if not success else ""
is_validation_error = error_type == "validation_error"
sample = f"Tool use #{i+1} - {tool_name}:\n"
sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n"
sample += f"- Success: {'No' if not success else 'Yes'}"
if is_validation_error:
sample += " (PARAMETER VALIDATION ERROR)\n"
sample += f"- Error: {tool_use.get('result', 'Unknown error')}"
elif not success:
sample += f" (Other error: {error_type})\n"
param_samples.append(sample)
validation_errors_info = ""
if validation_errors:
validation_errors_info = f"\nParameter validation errors detected: {len(validation_errors)} ({validation_error_rate:.1%} of tool uses)\n"
for i, err in enumerate(validation_errors[:3]):
tool_name = err.get("tool", "Unknown tool")
error_msg = err.get("error", "Unknown error")
args = err.get("args", {})
validation_errors_info += f"\nValidation Error #{i+1}:\n- Tool: {tool_name}\n- Args: {json.dumps(args, indent=2)}\n- Error: {error_msg}"
if len(validation_errors) > 3:
validation_errors_info += f"\n...and {len(validation_errors) - 3} more validation errors."
param_samples_text = "\n\n".join(param_samples)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent extracts and formats PARAMETER VALUES for tool calls.
Your job is to evaluate ONLY whether the agent used the correct parameter VALUES, not whether the right tools were selected or how the tools were invoked.
Evaluate parameter extraction based on these criteria:
1. Accuracy (0-10): Are parameter values correctly identified from the context/task?
2. Formatting (0-10): Are values formatted correctly for each tool's requirements?
3. Completeness (0-10): Are all required parameter values provided, with no missing information?
IMPORTANT: DO NOT evaluate:
- Whether the right tool was chosen (that's the ToolSelectionEvaluator's job)
- How the tools were structurally invoked (that's the ToolInvocationEvaluator's job)
- The quality of results from tools
Focus ONLY on the PARAMETER VALUES - whether they were correctly extracted from the context, properly formatted, and complete.
Validation errors are important signals that parameter values weren't properly extracted or formatted.
Return your evaluation as JSON with these fields:
- scores: {"accuracy": number, "formatting": number, "completeness": number}
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on parameter value extraction quality)
- improvement_suggestions: string (concrete suggestions for better parameter VALUE extraction)
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
Parameter extraction examples:
{param_samples_text}
{validation_errors_info}
Evaluate the quality of the agent's parameter extraction for this task.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {})
accuracy = scores.get("accuracy", 5.0)
formatting = scores.get("formatting", 5.0)
completeness = scores.get("completeness", 5.0)
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Parameter Extraction Evaluation:\n"
feedback += f"• Accuracy: {accuracy}/10 - Correctly identifying required parameters\n"
feedback += f"• Formatting: {formatting}/10 - Properly formatting parameters for tools\n"
feedback += f"• Completeness: {completeness}/10 - Including all necessary information\n\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating parameter extraction: {e}",
raw_response=response
)
class ToolInvocationEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.TOOL_INVOCATION
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
) -> EvaluationScore:
tool_uses = execution_trace.get("tool_uses", [])
tool_errors = []
tool_count = len(tool_uses)
if tool_count == 0:
return EvaluationScore(
score=None,
feedback="No tool usage detected. Cannot evaluate tool invocation."
)
for tool_use in tool_uses:
if not tool_use.get("success", True) or tool_use.get("error", False):
error_info = {
"tool": tool_use.get("tool", "Unknown tool"),
"error": tool_use.get("result"),
"error_type": tool_use.get("error_type", "unknown_error")
}
tool_errors.append(error_info)
error_rate = len(tool_errors) / tool_count if tool_count > 0 else 0
error_types = {}
for error in tool_errors:
error_type = error.get("error_type", "unknown_error")
if error_type not in error_types:
error_types[error_type] = 0
error_types[error_type] += 1
invocation_samples = []
for i, tool_use in enumerate(tool_uses[:5]):
tool_name = tool_use.get("tool", "Unknown tool")
tool_args = tool_use.get("args", {})
success = tool_use.get("success", True) and not tool_use.get("error", False)
error_type = tool_use.get("error_type", "") if not success else ""
error_msg = tool_use.get("result", "No error") if not success else "No error"
sample = f"Tool invocation #{i+1}:\n"
sample += f"- Tool: {tool_name}\n"
sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n"
sample += f"- Success: {'No' if not success else 'Yes'}\n"
if not success:
sample += f"- Error type: {error_type}\n"
sample += f"- Error: {error_msg}"
invocation_samples.append(sample)
error_type_summary = ""
if error_types:
error_type_summary = "Error type breakdown:\n"
for error_type, count in error_types.items():
error_type_summary += f"- {error_type}: {count} occurrences ({(count/tool_count):.1%})\n"
invocation_samples_text = "\n\n".join(invocation_samples)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how correctly an AI agent's tool invocations are STRUCTURED.
Your job is to evaluate ONLY the structural and syntactical aspects of how the agent called tools, NOT which tools were selected or what parameter values were used.
Evaluate the agent's tool invocation based on these criteria:
1. Structure (0-10): Does the tool call follow the expected syntax and format?
2. Error Handling (0-10): Does the agent handle tool errors appropriately?
3. Invocation Patterns (0-10): Are tool calls properly sequenced, batched, or managed?
Error types that indicate invocation issues:
- execution_error: The tool was called correctly but failed during execution
- usage_error: General errors in how the tool was used structurally
IMPORTANT: DO NOT evaluate:
- Whether the right tool was chosen (that's the ToolSelectionEvaluator's job)
- Whether the parameter values are correct (that's the ParameterExtractionEvaluator's job)
- The quality of results from tools
Focus ONLY on HOW tools were invoked - the structure, format, and handling of the invocation process.
Return your evaluation as JSON with these fields:
- scores: {"structure": number, "error_handling": number, "invocation_patterns": number}
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on structural aspects of tool invocation)
- improvement_suggestions: string (concrete suggestions for better structuring of tool calls)
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
Tool invocation examples:
{invocation_samples_text}
Tool error rate: {error_rate:.2%} ({len(tool_errors)} errors out of {tool_count} invocations)
{error_type_summary}
Evaluate the quality of the agent's tool invocation structure during this task.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {})
structure = scores.get("structure", 5.0)
error_handling = scores.get("error_handling", 5.0)
invocation_patterns = scores.get("invocation_patterns", 5.0)
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Tool Invocation Evaluation:\n"
feedback += f"• Structure: {structure}/10 - Following proper syntax and format\n"
feedback += f"• Error Handling: {error_handling}/10 - Appropriately handling tool errors\n"
feedback += f"• Invocation Patterns: {invocation_patterns}/10 - Proper sequencing and management of calls\n\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating tool invocation: {e}",
raw_response=response
)

View File

@@ -1,35 +1,34 @@
import threading
from typing import Any
from crewai.experimental.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.evaluation_display import EvaluationDisplayFormatter
from crewai.utilities.events.agent_events import AgentEvaluationStartedEvent, AgentEvaluationCompletedEvent, AgentEvaluationFailedEvent
from typing import Any, Dict
from collections import defaultdict
from crewai.experimental.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence
from crewai.crew import Crew
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.events.task_events import TaskCompletedEvent
from crewai.utilities.events.agent_events import LiteAgentExecutionCompletedEvent
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.evaluation_display import AgentAggregatedEvaluationResult
from contextlib import contextmanager
import threading
class ExecutionState:
def __init__(self):
self.traces = {}
self.traces: dict[str, Any] = {}
self.current_agent_id: str | None = None
self.current_task_id: str | None = None
self.iteration = 1
self.iterations_results = {}
self.agent_evaluators = {}
self.iteration: int = 1
self.iterations_results: dict[int, dict[str, list[AgentEvaluationResult]]] = {}
class AgentEvaluator:
def __init__(
self,
agents: list[Agent],
evaluators: Sequence[BaseEvaluator] | None = None,
crew: Crew | None = None,
):
self.agents: list[Agent] = agents
self.crew: Crew | None = crew
self.evaluators: Sequence[BaseEvaluator] | None = evaluators
self.callback = create_evaluation_callbacks()
@@ -38,10 +37,19 @@ class AgentEvaluator:
self._thread_local: threading.local = threading.local()
for agent in self.agents:
self._execution_state.agent_evaluators[str(agent.id)] = self.evaluators
self.agent_evaluators: dict[str, Sequence[BaseEvaluator] | None] = {}
if crew is not None:
assert crew and crew.agents is not None
for agent in crew.agents:
self.agent_evaluators[str(agent.id)] = self.evaluators
self._subscribe_to_events()
@contextmanager
def execution_context(self):
state = ExecutionState()
try:
yield state
finally:
pass
@property
def _execution_state(self) -> ExecutionState:
@@ -49,100 +57,81 @@ class AgentEvaluator:
self._thread_local.execution_state = ExecutionState()
return self._thread_local.execution_state
def _subscribe_to_events(self) -> None:
from typing import cast
crewai_event_bus.register_handler(TaskCompletedEvent, cast(Any, self._handle_task_completed))
crewai_event_bus.register_handler(LiteAgentExecutionCompletedEvent, cast(Any, self._handle_lite_agent_completed))
def _handle_task_completed(self, source: Any, event: TaskCompletedEvent) -> None:
assert event.task is not None
agent = event.task.agent
if agent and str(getattr(agent, 'id', 'unknown')) in self._execution_state.agent_evaluators:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=str(event.task.id))
state = ExecutionState()
state.current_agent_id = str(agent.id)
state.current_task_id = str(event.task.id)
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
if not trace:
return
result = self.evaluate(
agent=agent,
task=event.task,
execution_trace=trace,
final_output=event.output,
state=state
)
current_iteration = self._execution_state.iteration
if current_iteration not in self._execution_state.iterations_results:
self._execution_state.iterations_results[current_iteration] = {}
if agent.role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent.role] = []
self._execution_state.iterations_results[current_iteration][agent.role].append(result)
def _handle_lite_agent_completed(self, source: object, event: LiteAgentExecutionCompletedEvent) -> None:
agent_info = event.agent_info
agent_id = str(agent_info["id"])
if agent_id in self._execution_state.agent_evaluators:
state = ExecutionState()
state.current_agent_id = agent_id
state.current_task_id = "lite_task"
target_agent = None
for agent in self.agents:
if str(agent.id) == agent_id:
target_agent = agent
break
if not target_agent:
return
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
if not trace:
return
result = self.evaluate(
agent=target_agent,
execution_trace=trace,
final_output=event.output,
state=state
)
current_iteration = self._execution_state.iteration
if current_iteration not in self._execution_state.iterations_results:
self._execution_state.iterations_results[current_iteration] = {}
agent_role = target_agent.role
if agent_role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent_role] = []
self._execution_state.iterations_results[current_iteration][agent_role].append(result)
def set_iteration(self, iteration: int) -> None:
self._execution_state.iteration = iteration
def reset_iterations_results(self) -> None:
self._execution_state.iterations_results = {}
def evaluate_current_iteration(self) -> dict[str, list[AgentEvaluationResult]]:
if not self.crew:
raise ValueError("Cannot evaluate: no crew was provided to the evaluator.")
if not self.callback:
raise ValueError("Cannot evaluate: no callback was set. Use set_callback() method first.")
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
evaluation_results: defaultdict[str, list[AgentEvaluationResult]] = defaultdict(list)
total_evals = 0
for agent in self.crew.agents:
for task in self.crew.tasks:
if task.agent and task.agent.id == agent.id and self.agent_evaluators.get(str(agent.id)):
total_evals += 1
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}[/bold blue]"),
BarColumn(),
TextColumn("{task.percentage:.0f}% completed"),
console=self.console_formatter.console
) as progress:
eval_task = progress.add_task(f"Evaluating agents (iteration {self._execution_state.iteration})...", total=total_evals)
with self.execution_context() as state:
state.iteration = self._execution_state.iteration
for agent in self.crew.agents:
evaluator = self.agent_evaluators.get(str(agent.id))
if not evaluator:
continue
for task in self.crew.tasks:
if task.agent and str(task.agent.id) != str(agent.id):
continue
trace = self.callback.get_trace(str(agent.id), str(task.id))
if not trace:
self.console_formatter.print(f"[yellow]Warning: No trace found for agent {agent.role} on task {task.description[:30]}...[/yellow]")
progress.update(eval_task, advance=1)
continue
state.current_agent_id = str(agent.id)
state.current_task_id = str(task.id)
with crewai_event_bus.scoped_handlers():
result = self.evaluate(
agent=agent,
task=task,
execution_trace=trace,
final_output=task.output,
state=state
)
evaluation_results[agent.role].append(result)
progress.update(eval_task, advance=1)
self._execution_state.iterations_results[self._execution_state.iteration] = evaluation_results
return evaluation_results
def get_evaluation_results(self) -> dict[str, list[AgentEvaluationResult]]:
if self._execution_state.iterations_results and self._execution_state.iteration in self._execution_state.iterations_results:
if self._execution_state.iteration in self._execution_state.iterations_results:
return self._execution_state.iterations_results[self._execution_state.iteration]
return {}
return self.evaluate_current_iteration()
def display_results_with_iterations(self) -> None:
self.display_formatter.display_summary_results(self._execution_state.iterations_results)
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = True) -> dict[str, AgentAggregatedEvaluationResult]:
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = False) -> Dict[str, AgentAggregatedEvaluationResult]:
agent_results = {}
with crewai_event_bus.scoped_handlers():
task_results = self.get_evaluation_results()
@@ -176,21 +165,19 @@ class AgentEvaluator:
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: dict[str, Any],
final_output: Any,
state: ExecutionState,
task: Task | None = None,
state: ExecutionState
) -> AgentEvaluationResult:
result = AgentEvaluationResult(
agent_id=state.current_agent_id or str(agent.id),
task_id=state.current_task_id or (str(task.id) if task else "unknown_task")
task_id=state.current_task_id or str(task.id)
)
assert self.evaluators is not None
task_id = str(task.id) if task else None
for evaluator in self.evaluators:
try:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id)
score = evaluator.evaluate(
agent=agent,
task=task,
@@ -198,32 +185,12 @@ class AgentEvaluator:
final_output=final_output
)
result.metrics[evaluator.metric_category] = score
self.emit_evaluation_completed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, metric_category=evaluator.metric_category, score=score)
except Exception as e:
self.emit_evaluation_failed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, error=str(e))
self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}")
return result
def emit_evaluation_started_event(self, agent_role: str, agent_id: str, task_id: str | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationStartedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration)
)
def emit_evaluation_completed_event(self, agent_role: str, agent_id: str, task_id: str | None = None, metric_category: MetricCategory | None = None, score: EvaluationScore | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationCompletedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, metric_category=metric_category, score=score)
)
def emit_evaluation_failed_event(self, agent_role: str, agent_id: str, error: str, task_id: str | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationFailedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, error=error)
)
def create_default_evaluator(agents: list[Agent], llm: None = None):
def create_default_evaluator(crew, llm=None):
from crewai.experimental.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
@@ -242,4 +209,4 @@ def create_default_evaluator(agents: list[Agent], llm: None = None):
ReasoningEfficiencyEvaluator(llm=llm),
]
return AgentEvaluator(evaluators=evaluators, agents=agents)
return AgentEvaluator(evaluators=evaluators, crew=crew)

View File

@@ -57,9 +57,9 @@ class BaseEvaluator(abc.ABC):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
pass

View File

@@ -9,9 +9,7 @@ from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent
AgentExecutionCompletedEvent
)
from crewai.utilities.events.tool_usage_events import (
ToolUsageFinishedEvent,
@@ -54,18 +52,10 @@ class EvaluationTraceCallback(BaseEventListener):
def on_agent_started(source, event: AgentExecutionStartedEvent):
self.on_agent_start(event.agent, event.task)
@event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_started(source, event: LiteAgentExecutionStartedEvent):
self.on_lite_agent_start(event.agent_info)
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_completed(source, event: AgentExecutionCompletedEvent):
self.on_agent_finish(event.agent, event.task, event.output)
@event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_completed(source, event: LiteAgentExecutionCompletedEvent):
self.on_lite_agent_finish(event.output)
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_completed(source, event: ToolUsageFinishedEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.output, success=True)
@@ -98,38 +88,19 @@ class EvaluationTraceCallback(BaseEventListener):
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.on_llm_call_end(event.messages, event.response)
def on_lite_agent_start(self, agent_info: dict[str, Any]):
self.current_agent_id = agent_info['id']
self.current_task_id = "lite_task"
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
self._init_trace(
trace_key=trace_key,
agent_id=self.current_agent_id,
task_id=self.current_task_id,
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
)
def _init_trace(self, trace_key: str, **kwargs: Any):
self.traces[trace_key] = kwargs
def on_agent_start(self, agent: Agent, task: Task):
self.current_agent_id = agent.id
self.current_task_id = task.id
trace_key = f"{agent.id}_{task.id}"
self._init_trace(
trace_key=trace_key,
agent_id=agent.id,
task_id=task.id,
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
)
self.traces[trace_key] = {
"agent_id": agent.id,
"task_id": task.id,
"tool_uses": [],
"llm_calls": [],
"start_time": datetime.now(),
"final_output": None
}
def on_agent_finish(self, agent: Agent, task: Task, output: Any):
trace_key = f"{agent.id}_{task.id}"
@@ -137,20 +108,9 @@ class EvaluationTraceCallback(BaseEventListener):
self.traces[trace_key]["final_output"] = output
self.traces[trace_key]["end_time"] = datetime.now()
self._reset_current()
def _reset_current(self):
self.current_agent_id = None
self.current_task_id = None
def on_lite_agent_finish(self, output: Any):
trace_key = f"{self.current_agent_id}_lite_task"
if trace_key in self.traces:
self.traces[trace_key]["final_output"] = output
self.traces[trace_key]["end_time"] = datetime.now()
self._reset_current()
def on_tool_use(self, tool_name: str, tool_args: dict[str, Any] | str, result: Any,
success: bool = True, error_type: str | None = None):
if not self.current_agent_id or not self.current_task_id:
@@ -227,8 +187,4 @@ class EvaluationTraceCallback(BaseEventListener):
def create_evaluation_callbacks() -> EvaluationTraceCallback:
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
callback = EvaluationTraceCallback()
callback.setup_listeners(crewai_event_bus)
return callback
return EvaluationTraceCallback()

View File

@@ -2,7 +2,7 @@ from collections import defaultdict
from hashlib import md5
from typing import Any
from crewai import Crew, Agent
from crewai import Crew
from crewai.experimental.evaluation import AgentEvaluator, create_default_evaluator
from crewai.experimental.evaluation.experiment.result_display import ExperimentResultsDisplay
from crewai.experimental.evaluation.experiment.result import ExperimentResults, ExperimentResult
@@ -14,18 +14,14 @@ class ExperimentRunner:
self.evaluator: AgentEvaluator | None = None
self.display = ExperimentResultsDisplay()
def run(self, crew: Crew | None = None, agents: list[Agent] | None = None, print_summary: bool = False) -> ExperimentResults:
if crew and not agents:
agents = crew.agents
assert agents is not None
self.evaluator = create_default_evaluator(agents=agents)
def run(self, crew: Crew, print_summary: bool = False) -> ExperimentResults:
self.evaluator = create_default_evaluator(crew=crew)
results = []
for test_case in self.dataset:
self.evaluator.reset_iterations_results()
result = self._run_test_case(test_case=test_case, crew=crew, agents=agents)
result = self._run_test_case(test_case, crew)
results.append(result)
experiment_results = ExperimentResults(results)
@@ -35,7 +31,7 @@ class ExperimentRunner:
return experiment_results
def _run_test_case(self, test_case: dict[str, Any], agents: list[Agent], crew: Crew | None = None) -> ExperimentResult:
def _run_test_case(self, test_case: dict[str, Any], crew: Crew) -> ExperimentResult:
inputs = test_case["inputs"]
expected_score = test_case["expected_score"]
identifier = test_case.get("identifier") or md5(str(test_case).encode(), usedforsecurity=False).hexdigest()
@@ -43,11 +39,7 @@ class ExperimentRunner:
try:
self.display.console.print(f"[dim]Running crew with input: {str(inputs)[:50]}...[/dim]")
self.display.console.print("\n")
if crew:
crew.kickoff(inputs=inputs)
else:
for agent in agents:
agent.kickoff(**inputs)
crew.kickoff(inputs=inputs)
assert self.evaluator is not None
agent_evaluations = self.evaluator.get_agent_evaluation()

View File

@@ -14,14 +14,10 @@ class GoalAlignmentEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}\nExpected output: {task.expected_output}\n"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent's output aligns with its assigned task goal.
@@ -41,7 +37,8 @@ Return your evaluation as JSON with fields 'score' (number) and 'feedback' (stri
{"role": "user", "content": f"""
Agent role: {agent.role}
Agent goal: {agent.goal}
{task_context}
Task description: {task.description}
Expected output: {task.expected_output}
Agent's final output:
{final_output}

View File

@@ -36,14 +36,10 @@ class ReasoningEfficiencyEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: TaskOutput | str,
task: Task | None = None,
final_output: TaskOutput,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}\nExpected output: {task.expected_output}\n"
llm_calls = execution_trace.get("llm_calls", [])
if not llm_calls or len(llm_calls) < 2:
@@ -87,8 +83,6 @@ class ReasoningEfficiencyEvaluator(BaseEvaluator):
call_samples = self._get_call_samples(llm_calls)
final_output = final_output.raw if isinstance(final_output, TaskOutput) else final_output
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the reasoning efficiency of an AI agent's thought process.
@@ -123,7 +117,7 @@ Return your evaluation as JSON with the following structure:
}"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Task description: {task.description}
Reasoning efficiency metrics:
- Total LLM calls: {efficiency_metrics["total_llm_calls"]}
@@ -136,7 +130,7 @@ Sample of agent reasoning flow (chronological sequence):
{call_samples}
Agent's final output:
{final_output[:500]}... (truncated)
{final_output.raw[:500]}... (truncated)
Evaluate the reasoning efficiency of this agent based on these interaction patterns.
Identify any inefficient reasoning patterns and provide specific suggestions for optimization.

View File

@@ -14,13 +14,10 @@ class SemanticQualityEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the semantic quality of an AI agent's output.
@@ -40,7 +37,7 @@ Return your evaluation as JSON with fields 'score' (number) and 'feedback' (stri
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Task description: {task.description}
Agent's final output:
{final_output}

View File

@@ -16,14 +16,10 @@ class ToolSelectionEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
unique_tool_types = set([tool.get("tool", "Unknown tool") for tool in tool_uses])
@@ -76,7 +72,7 @@ Return your evaluation as JSON with these fields:
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Task description: {task.description}
Available tools for this agent:
{available_tools_info}
@@ -132,13 +128,10 @@ class ParameterExtractionEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
@@ -219,7 +212,7 @@ Return your evaluation as JSON with these fields:
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Task description: {task.description}
Parameter extraction examples:
{param_samples_text}
@@ -274,13 +267,10 @@ class ToolInvocationEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_errors = []
tool_count = len(tool_uses)
@@ -362,7 +352,7 @@ Return your evaluation as JSON with these fields:
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Task description: {task.description}
Tool invocation examples:
{invocation_samples_text}

View File

@@ -3,7 +3,7 @@ import inspect
from typing_extensions import Any
import warnings
from crewai.experimental.evaluation.experiment import ExperimentResults, ExperimentRunner
from crewai import Crew, Agent
from crewai import Crew
def assert_experiment_successfully(experiment_results: ExperimentResults, baseline_filepath: str | None = None) -> None:
failed_tests = [result for result in experiment_results.results if not result.passed]
@@ -35,10 +35,10 @@ def assert_experiment_no_regression(comparison_result: dict[str, list[str]]) ->
UserWarning
)
def run_experiment(dataset: list[dict[str, Any]], crew: Crew | None = None, agents: list[Agent] | None = None, verbose: bool = False) -> ExperimentResults:
def run_experiment(dataset: list[dict[str, Any]], crew: Crew, verbose: bool = False) -> ExperimentResults:
runner = ExperimentRunner(dataset=dataset)
return runner.run(agents=agents, crew=crew, print_summary=verbose)
return runner.run(crew=crew, print_summary=verbose)
def _get_baseline_filepath_fallback() -> str:
test_func_name = "experiment_fallback"

View File

@@ -305,7 +305,6 @@ class LiteAgent(FlowTrackable, BaseModel):
"""
# Create agent info for event emission
agent_info = {
"id": self.id,
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,

View File

@@ -311,7 +311,6 @@ class LLM(BaseLLM):
callbacks: List[Any] = [],
reasoning_effort: Optional[Literal["none", "low", "medium", "high"]] = None,
stream: bool = False,
extra_headers: Optional[Dict[str, str]] = None,
**kwargs,
):
self.model = model
@@ -338,7 +337,6 @@ class LLM(BaseLLM):
self.additional_params = kwargs
self.is_anthropic = self._is_anthropic_model(model)
self.stream = stream
self.extra_headers = extra_headers
litellm.drop_params = True
@@ -410,7 +408,6 @@ class LLM(BaseLLM):
"stream": self.stream,
"tools": tools,
"reasoning_effort": self.reasoning_effort,
"extra_headers": self.extra_headers,
**self.additional_params,
}

View File

@@ -17,9 +17,6 @@ from .agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentEvaluationStartedEvent,
AgentEvaluationCompletedEvent,
AgentEvaluationFailedEvent,
)
from .task_events import (
TaskStartedEvent,
@@ -77,9 +74,6 @@ __all__ = [
"AgentExecutionStartedEvent",
"AgentExecutionCompletedEvent",
"AgentExecutionErrorEvent",
"AgentEvaluationStartedEvent",
"AgentEvaluationCompletedEvent",
"AgentEvaluationFailedEvent",
"TaskStartedEvent",
"TaskCompletedEvent",
"TaskFailedEvent",

View File

@@ -123,28 +123,3 @@ class AgentLogsExecutionEvent(BaseEvent):
type: str = "agent_logs_execution"
model_config = {"arbitrary_types_allowed": True}
# Agent Eval events
class AgentEvaluationStartedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
type: str = "agent_evaluation_started"
class AgentEvaluationCompletedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
metric_category: Any
score: Any
type: str = "agent_evaluation_completed"
class AgentEvaluationFailedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
error: str
type: str = "agent_evaluation_failed"

View File

@@ -1,6 +1,6 @@
import threading
from contextlib import contextmanager
from typing import Any, Callable, Dict, List, Type, TypeVar, cast
from typing import Any, Callable, Type, TypeVar, cast
from blinker import Signal
@@ -14,10 +14,13 @@ class CrewAIEventsBus:
"""
A singleton event bus that uses blinker signals for event handling.
Allows both internal (Flow/Crew) and external event handling.
Handlers are global by default for cross-thread communication,
with optional thread-local isolation for testing scenarios.
"""
_instance = None
_lock = threading.Lock()
_thread_local: threading.local = threading.local()
def __new__(cls):
if cls._instance is None:
@@ -30,7 +33,46 @@ class CrewAIEventsBus:
def _initialize(self) -> None:
"""Initialize the event bus internal state"""
self._signal = Signal("crewai_event_bus")
self._handlers: Dict[Type[BaseEvent], List[Callable]] = {}
self._global_handlers: dict[type[BaseEvent], list[Callable]] = {}
@property
def _handlers(self) -> dict[type[BaseEvent], list[Callable]]:
if not hasattr(CrewAIEventsBus._thread_local, "handlers"):
CrewAIEventsBus._thread_local.handlers = {}
return CrewAIEventsBus._thread_local.handlers
@_handlers.setter
def _handlers(self, value: dict[type[BaseEvent], list[Callable]]) -> None:
if not hasattr(CrewAIEventsBus._thread_local, "handlers"):
CrewAIEventsBus._thread_local.handlers = {}
CrewAIEventsBus._thread_local.handlers = value
def _add_handler_with_deduplication(
self, handlers_dict: dict, event_type: Type[BaseEvent], handler: Callable
) -> bool:
"""
Add a handler to the specified handlers dictionary with deduplication.
Args:
handlers_dict: The dictionary to add the handler to
event_type: The event type
handler: The handler function to add
Returns:
bool: True if handler was added, False if it was already present
"""
if event_type not in handlers_dict:
handlers_dict[event_type] = []
# Check if handler is already registered
for existing_handler in handlers_dict[event_type]:
if existing_handler is handler:
# Handler already exists, don't add duplicate
return False
# Add the handler
handlers_dict[event_type].append(handler)
return True
def on(
self, event_type: Type[EventT]
@@ -38,6 +80,13 @@ class CrewAIEventsBus:
"""
Decorator to register an event handler for a specific event type.
Handlers registered with this decorator are global by default,
allowing cross-thread event communication. Use scoped_handlers()
for thread-local isolation in testing scenarios.
Duplicate handlers are automatically prevented - the same handler
function will only be registered once per event type.
Usage:
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(
@@ -50,23 +99,38 @@ class CrewAIEventsBus:
def decorator(
handler: Callable[[Any, EventT], None],
) -> Callable[[Any, EventT], None]:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(
cast(Callable[[Any, EventT], None], handler)
was_added = self._add_handler_with_deduplication(
self._global_handlers, event_type, handler
)
if not was_added:
# Log that duplicate was prevented (optional)
print(
f"[EventBus Info] Handler '{handler.__name__}' already registered for {event_type.__name__}"
)
return handler
return decorator
def emit(self, source: Any, event: BaseEvent) -> None:
"""
Emit an event to all registered handlers
Emit an event to all registered handlers (both global and thread-local)
Args:
source: The object emitting the event
event: The event instance to emit
"""
# Call global handlers (default behavior, cross-thread)
for event_type, handlers in self._global_handlers.items():
if isinstance(event, event_type):
for handler in handlers:
try:
handler(source, event)
except Exception as e:
print(
f"[EventBus Error] Global handler '{handler.__name__}' failed for event '{event_type.__name__}': {e}"
)
# Call thread-local handlers (for testing isolation)
for event_type, handlers in self._handlers.items():
if isinstance(event, event_type):
for handler in handlers:
@@ -74,32 +138,76 @@ class CrewAIEventsBus:
handler(source, event)
except Exception as e:
print(
f"[EventBus Error] Handler '{handler.__name__}' failed for event '{event_type.__name__}': {e}"
f"[EventBus Error] Thread-local handler '{handler.__name__}' failed for event '{event_type.__name__}': {e}"
)
# Send to blinker signal (existing mechanism)
self._signal.send(source, event=event)
def register_handler(
self, event_type: Type[EventTypes], handler: Callable[[Any, EventTypes], None]
) -> None:
"""Register an event handler for a specific event type"""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(
cast(Callable[[Any, EventTypes], None], handler)
self, event_type: Type[BaseEvent], handler: Callable[[Any, BaseEvent], None]
) -> bool:
"""
Register an event handler for a specific event type (global)
Args:
event_type: The event type to handle
handler: The handler function to register
Returns:
bool: True if handler was added, False if it was already present
"""
return self._add_handler_with_deduplication(
self._global_handlers, event_type, handler
)
def unregister_handler(
self, event_type: Type[BaseEvent], handler: Callable[[Any, BaseEvent], None]
) -> bool:
"""
Unregister an event handler for a specific event type (global)
Args:
event_type: The event type
handler: The handler function to unregister
Returns:
bool: True if handler was removed, False if it wasn't found
"""
if event_type in self._global_handlers:
try:
self._global_handlers[event_type].remove(handler)
return True
except ValueError:
return False
return False
def get_handler_count(self, event_type: Type[BaseEvent]) -> int:
"""
Get the number of handlers registered for a specific event type
Args:
event_type: The event type to check
Returns:
int: Number of handlers registered for this event type
"""
return len(self._global_handlers.get(event_type, []))
@contextmanager
def scoped_handlers(self):
"""
Context manager for temporary event handling scope.
Useful for testing or temporary event handling.
Context manager for temporary thread-local event handling scope.
Useful for testing or temporary event handling with thread isolation.
This creates thread-local handlers that are isolated from global handlers,
making it useful for testing scenarios where you want to avoid interference.
Usage:
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStarted)
def temp_handler(source, event):
print("Temporary handler")
print("Temporary thread-local handler")
# Do stuff...
# Handlers are cleared after the context
"""
@@ -110,6 +218,25 @@ class CrewAIEventsBus:
finally:
self._handlers = previous_handlers
@contextmanager
def scoped_global_handlers(self):
"""
Context manager for temporary global event handling scope.
Useful for testing or temporary global event handling.
Usage:
with crewai_event_bus.scoped_global_handlers():
crewai_event_bus.register_handler(CrewKickoffStarted, temp_handler)
# Do stuff...
# Global handlers are cleared after the context
"""
previous_global_handlers = self._global_handlers.copy()
self._global_handlers.clear()
try:
yield
finally:
self._global_handlers = previous_global_handlers
# Global instance
crewai_event_bus = CrewAIEventsBus()

View File

@@ -4,7 +4,6 @@ from .agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
)
from .crew_events import (
CrewKickoffCompletedEvent,
@@ -81,7 +80,6 @@ EventTypes = Union[
CrewTrainFailedEvent,
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
LiteAgentExecutionCompletedEvent,
TaskStartedEvent,
TaskCompletedEvent,
TaskFailedEvent,

View File

@@ -1,237 +0,0 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Test Agent. An agent
created for testing purposes\nYour personal goal is: Complete test tasks successfully\n\nTo
give my best complete final answer to the task respond using the exact following
format:\n\nThought: I now can give a great answer\nFinal Answer: Your final
answer must be the great and the most complete as possible, it must be outcome
described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user",
"content": "Complete this task successfully"}], "model": "gpt-4o-mini", "stop":
["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '583'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFNNb9swDL3nVxA6J0U+HKTNbd0woMAOw7Bu6LbCUCXa1iqLgkgnzYr8
98FKWqdbB+wiQHx81OMj9TgCUM6qNSjTaDFt9JNL+TZ7N/dfrusPN01NyV6vPk3f/mrl5vLrXI17
Bt39RCNPrDNDbfQojsIBNgm1YF91tlrOl+fzxXKWgZYs+p5WR5kUNGldcJP5dF5MpqvJ7PzIbsgZ
ZLWG7yMAgMd89jqDxQe1hun4KdIis65RrZ+TAFQi30eUZnYsOogaD6ChIBiy9M8NdXUja7iCQFsw
OkDtNgga6l4/6MBbTAA/wnsXtIc3+b6Gjx41I8REG2cRWoStkwakQeCIxlXOgEXRzjNQgvzigwBV
OUU038OOOgiIFhr0MdPHoIOFK9g67wEDdwlBCI7OIjgB7oxB5qrzfpeznxRokIZS3wwk5EiB8ey0
54RVx7r3PXTenwA6BBLdzy27fXtE9s/+eqpjojv+g6oqFxw3ZULNFHovWSiqjO5HALd5jt2L0aiY
qI1SCt1jfu7i4lBODdszgEVxBIVE+yE+KxbjV8qVR79PFkEZbRq0A3XYGt1ZRyfA6KTpv9W8VvvQ
uAv1/5QfAGMwCtoyJrTOvOx4SEvYf65/pT2bnAUrxrRxBktxmPpBWKx05w8rr3jHgm1ZuVBjiskd
9r6K5aLQy0LjxcKo0X70GwAA//8DAMz2wVUFBAAA
headers:
CF-RAY:
- 95f93ea9af627e0b-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 15 Jul 2025 12:25:54 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=GRZmZLrjW5ZRHNmUJa4ccrMcy20D1rmeqK6Ptlv0mRY-1752582354-1.0.1.1-xKd_yga48Eedech5TRlThlEpDgsB2whxkWHlCyAGOVMqMcvH1Ju9FdXYbuQ9NdUQcVxPLgiGM35lYhqSLVQiXDyK01dnyp2Gvm560FBN9DY;
path=/; expires=Tue, 15-Jul-25 12:55:54 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=MYqswpSR7sqr4kGp6qZVkaL7HDYwMiww49PeN9QBP.A-1752582354973-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '4047'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '4440'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999885'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_5704c0f206a927ddc12aa1a19b612a75
status:
code: 200
message: OK
- request:
body: '{"messages": [{"role": "system", "content": "You are an expert evaluator
assessing how well an AI agent''s output aligns with its assigned task goal.\n\nScore
the agent''s goal alignment on a scale from 0-10 where:\n- 0: Complete misalignment,
agent did not understand or attempt the task goal\n- 5: Partial alignment, agent
attempted the task but missed key requirements\n- 10: Perfect alignment, agent
fully satisfied all task requirements\n\nConsider:\n1. Did the agent correctly
interpret the task goal?\n2. Did the final output directly address the requirements?\n3.
Did the agent focus on relevant aspects of the task?\n4. Did the agent provide
all requested information or deliverables?\n\nReturn your evaluation as JSON
with fields ''score'' (number) and ''feedback'' (string).\n"}, {"role": "user",
"content": "\nAgent role: Test Agent\nAgent goal: Complete test tasks successfully\n\n\nAgent''s
final output:\nPlease provide me with the specific details or context of the
task you need help with, and I will ensure to complete it successfully and provide
a thorough response.\n\nEvaluate how well the agent''s output aligns with the
assigned task goal.\n"}], "model": "gpt-4o-mini", "stop": []}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '1196'
content-type:
- application/json
cookie:
- __cf_bm=GRZmZLrjW5ZRHNmUJa4ccrMcy20D1rmeqK6Ptlv0mRY-1752582354-1.0.1.1-xKd_yga48Eedech5TRlThlEpDgsB2whxkWHlCyAGOVMqMcvH1Ju9FdXYbuQ9NdUQcVxPLgiGM35lYhqSLVQiXDyK01dnyp2Gvm560FBN9DY;
_cfuvid=MYqswpSR7sqr4kGp6qZVkaL7HDYwMiww49PeN9QBP.A-1752582354973-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAA4xUy27bQAy8+yuIPdtGbMdN4FvbSxM0QIsEKNA6MJhdSmK82hWWVFwj8L8XKz/k
9AH0ogOHnOFjVq8DAMPOLMDYCtXWjR990O+TT7dfZs/v5OtFy/ef7++mxfu7j83t/cONGeaK+PRM
Vo9VYxvrxpNyDHvYJkKlzDq5mk/n19PZfN4BdXTkc1nZ6OgyjmoOPJpeTC9HF1ejyfWhuopsScwC
fgwAAF67b+4zOPppFnAxPEZqEsGSzOKUBGBS9DliUIRFMagZ9qCNQSl0rb8uA8DSiI2JlmYB0+E+
UBC5J7TrHFuah4oASwoKjh2EqOCojkE0oRIgWE+YoA2OUhZzHEqIBWhFoChrKCP6IWwqthWwgEY4
bItASbRLEpDWWhIpWu+3Y7gJooRuCKyAsiYHRUxQx0TgSJG9DIGDY4ua5RA82nVW5cDKqPxCWYhC
iSXBhrU69TOGbxV7ysxSxY0Awoa951AGkq69/do67QLZk8vBJsUXdgQYtoBWW/SQSJoYpFPq2Ptp
MLjTttC51DFXVIPjRFb9drw0y7A7v0uiohXM3git92cAhhAVs7c6RzwekN3JAz6WTYpP8lupKTiw
VKtEKDHke4vGxnTobgDw2HmtfWMf06RYN7rSuKZObjo7eM30Fu/R6yOoUdH38dnkCLzhWx1ud+ZW
Y9FW5PrS3trYOo5nwOBs6j+7+Rv3fnIO5f/Q94C11Ci5VZPIsX07cZ+WKP8B/pV22nLXsBFKL2xp
pUwpX8JRga3fv0sjW1GqVwWHklKTuHuc+ZKD3eAXAAAA//8DADksFsafBAAA
headers:
CF-RAY:
- 95f93ec73a1c7e0b-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 15 Jul 2025 12:25:57 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '1544'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '1546'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999732'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_44930ba12ad8d1e3f0beed1d5e3d8b0c
status:
code: 200
message: OK
version: 1

File diff suppressed because one or more lines are too long

View File

@@ -427,140 +427,4 @@ interactions:
status:
code: 200
message: OK
- request:
body: '{"messages": [{"role": "system", "content": "You are an expert evaluator
assessing how well an AI agent''s output aligns with its assigned task goal.\n\nScore
the agent''s goal alignment on a scale from 0-10 where:\n- 0: Complete misalignment,
agent did not understand or attempt the task goal\n- 5: Partial alignment, agent
attempted the task but missed key requirements\n- 10: Perfect alignment, agent
fully satisfied all task requirements\n\nConsider:\n1. Did the agent correctly
interpret the task goal?\n2. Did the final output directly address the requirements?\n3.
Did the agent focus on relevant aspects of the task?\n4. Did the agent provide
all requested information or deliverables?\n\nReturn your evaluation as JSON
with fields ''score'' (number) and ''feedback'' (string).\n"}, {"role": "user",
"content": "\nAgent role: Test Agent\nAgent goal: Complete test tasks successfully\nTask
description: Test task description\nExpected output: Expected test output\n\nAgent''s
final output:\nThe expected test output is a comprehensive document that outlines
the specific parameters and criteria that define success for the task at hand.
It should include detailed descriptions of the tasks, the goals that need to
be achieved, and any specific formatting or structural requirements necessary
for the output. Each component of the task must be analyzed and addressed, providing
context as well as examples where applicable. Additionally, any tools or methodologies
that are relevant to executing the tasks successfully should be outlined, including
any potential risks or challenges that may arise during the process. This document
serves as a guiding framework to ensure that all aspects of the task are thoroughly
considered and executed to meet the high standards expected.\n\nEvaluate how
well the agent''s output aligns with the assigned task goal.\n"}], "model":
"gpt-4o-mini", "stop": []}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '1893'
content-type:
- application/json
cookie:
- _cfuvid=XwsgBfgvDGlKFQ4LiGYGIARIoSNTiwidqoo9UZcc.XY-1752087999227-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFRNbxs5DL37VxA6jwPHddrUxxwWi2BRtEAPRevCYCSOh41GUkWOnTTI
fy8kf4zT5rCXOfCRT4+P5DxNAAw7swRjO1TbJz+90dvFxy//vX0za7dfr29+3eo/n75++Mh0O/za
maZUxLsfZPVYdWFjnzwpx7CHbSZUKqyX767mV/PL2eKqAn105EvZJul0Eac9B57OZ/PFdPZuenl9
qO4iWxKzhG8TAICn+i06g6MHs4RZc4z0JIIbMstTEoDJ0ZeIQREWxaCmGUEbg1Ko0p9WAWBlxMZM
K7OEq2YfaIncHdr7EluZzx0BbigopBy37MgBgiNF9uTAkdjMqbQOsYVdhwraEdBDIqvkIA6aBgXp
4uAdcLB+cNTArmPbAQfHFpUEJPYEQ3CUi2LHYVPoCpOi3EOmnwNn6imoXMC/cUdbyk3FWw7oj8+4
SAIhKkgiyy1b9P4RHHneUn4pTEn0WIYC6YDX5866aqDH+yKHFRJm5cqInjeB3AWM7vQsUgzhTFb9
48GtUlloSwMkZ4bEDMetOaSg1QH9XldVwSrk2wY4iBLWSs/hmG47zGiVMouylZP7WHkzdRSEtwQu
2qH4dhyBjcWKHWsXhzJTEgpVAwagByySirgzRSfLDrtzsTKr8Hy+VJnaQbAsdhi8PwMwhKhYfKzr
/P2APJ8W2MdNyvFO/ig1LQeWbp0JJYayrKIxmYo+TwC+10MZXuy+STn2Sdca76k+92ax2POZ8T5H
9P31AdSo6Mf4YjFvXuFb71dezk7NWLQdubF0vEscHMczYHLW9d9qXuPed85h83/oR8BaSkpunTI5
ti87HtMy/agTfT3t5HIVbITyli2tlSmXSThqcfD7n4qRR1Hq1y2HDeWUuf5ZyiQnz5PfAAAA//8D
AEfUP8BcBQAA
headers:
CF-RAY:
- 95f365f1bfc87ded-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Mon, 14 Jul 2025 19:24:07 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=PcC3_3T8.MK_WpZlQLdZfwpNv9Pe45AIYmrXOSgJ65E-1752521047-1.0.1.1-eyqwSWfQC7ZV6.JwTsTihK1ZWCrEmxd52CtNcfe.fw1UjjBN9rdTU4G7hRZiNqHQYo4sVZMmgRgqM9k7HRSzN2zln0bKmMiOuSQTZh6xF_I;
path=/; expires=Mon, 14-Jul-25 19:54:07 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=JvQ1c4qYZefNwOPoVNgAtX8ET7ObU.JKDvGc43LOR6g-1752521047741-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '2729'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '2789'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999559'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_74f6e8ff49db25dbea3d3525cc149e8e
status:
code: 200
message: OK
version: 1

View File

@@ -1,123 +0,0 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Test Agent. An agent
created for testing purposes\nYour personal goal is: Complete test tasks successfully\nTo
give my best complete final answer to the task respond using the exact following
format:\n\nThought: I now can give a great answer\nFinal Answer: Your final
answer must be the great and the most complete as possible, it must be outcome
described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user",
"content": "\nCurrent Task: Test task description\n\nThis is the expected criteria
for your final answer: Expected test output\nyou MUST return the actual complete
content as the final answer, not a summary.\n\nBegin! This is VERY important
to you, use the tools available and give your best Final Answer, your job depends
on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '879'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFTBbhtHDL3rK4g5rwRbtaNYt9RoEaNoUaBODm0DgZnh7jKe5WyHXDmO
4X8vZiRLcupDLwvsPPLxPQ45jzMAx8GtwfkezQ9jnP9oeLv98N5+vfl9+4v89Mf76+XV7XDz8Yc/
r39T15SM9PkLeXvOWvg0jJGMk+xgnwmNCuv56nJ5+XZ1tbqswJACxZLWjTa/SPOBhefLs+XF/Gw1
P3+7z+4Te1K3hr9mAACP9Vt0SqCvbg1nzfPJQKrYkVsfggBcTrGcOFRlNRRzzRH0SYykSr8BSffg
UaDjLQFCV2QDit5TBvhbfmbBCO/q/xpue1ZgBesJ6OtI3iiAkRqkycbJGrjv2ffgk5S6CqkFhECG
HClAIPWZx9Kkgtz3aJVq37vChXoH2qcpBogp3UHkO1rAbU/QViW7Os8hLD5OgQBjBCFfOpEfgKVN
ecBSpoFAQxK1jMbSgY+Y2R6aWjJTT6K8JSHVBlACYOgpk3gCS4DyADqS55YpQDdxoMhCuoCbgwKf
tpSB0PeAJdaKseKpOsn0z8SZBhJrgESnXERY8S0JRsxWulkoilkKkDJ0JJQx8jcKi13DX3pWyuWm
FPDQN8jU7mW3KRfdSaj2r5ZLMEmgXOYg7K5OlcQYI1Cs4vSFavSVmLWnsDgdnEztpFiGV6YYTwAU
SVYbXkf20x55OgxpTN2Y02f9LtW1LKz9JhNqkjKQaml0FX2aAXyqyzC9mG835jSMtrF0R7Xc+Zvz
HZ877uARvXqzBy0ZxuP58nLVvMK32Q2rnqyT8+h7CsfU4+7hFDidALMT1/9V8xr3zjlL93/oj4D3
NBqFzZgpsH/p+BiW6Utd0dfDDl2ugl2ZK/a0MaZcbiJQi1PcPRxOH9Ro2LQsHeUxc309yk3Onmb/
AgAA//8DAAbYfvVABQAA
headers:
CF-RAY:
- 95f9c7ffa8331b11-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 15 Jul 2025 13:59:38 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=J_xe1AP.B5P6D2GVMCesyioeS5E9DnYT34rbwQUefFc-1752587978-1.0.1.1-5Dflk5cAj6YCsOSVbCFWWSpXpw_mXsczIdzWzs2h2OwDL01HQbduE5LAToy67sfjFjHeeO4xRrqPLUQpySy2QqyHXbI_fzX4UAt3.UdwHxU;
path=/; expires=Tue, 15-Jul-25 14:29:38 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=0rTD8RMpxBQQy42jzmum16_eoRtWNfaZMG_TJkhGS7I-1752587978437-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '2623'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '2626'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999813'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_ccc347e91010713379c920aa0efd1f4f
status:
code: 200
message: OK
version: 1

View File

@@ -11,15 +11,10 @@ from crewai.experimental.evaluation import (
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator,
MetricCategory,
EvaluationScore
ReasoningEfficiencyEvaluator
)
from crewai.utilities.events.agent_events import AgentEvaluationStartedEvent, AgentEvaluationCompletedEvent, AgentEvaluationFailedEvent
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.experimental.evaluation import create_default_evaluator
class TestAgentEvaluator:
@pytest.fixture
def mock_crew(self):
@@ -44,18 +39,18 @@ class TestAgentEvaluator:
return crew
def test_set_iteration(self):
agent_evaluator = AgentEvaluator(agents=[])
agent_evaluator = AgentEvaluator()
agent_evaluator.set_iteration(3)
assert agent_evaluator._execution_state.iteration == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_evaluate_current_iteration(self, mock_crew):
agent_evaluator = AgentEvaluator(agents=mock_crew.agents, evaluators=[GoalAlignmentEvaluator()])
agent_evaluator = AgentEvaluator(crew=mock_crew, evaluators=[GoalAlignmentEvaluator()])
mock_crew.kickoff()
results = agent_evaluator.get_evaluation_results()
results = agent_evaluator.evaluate_current_iteration()
assert isinstance(results, dict)
@@ -75,16 +70,16 @@ class TestAgentEvaluator:
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 5.0
expected_feedback = "The agent's output demonstrates an understanding of the need for a comprehensive document outlining task"
expected_feedback = "The agent's output demonstrates an understanding of the need for a comprehensive document"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 5' in goal_alignment.raw_response
def test_create_default_evaluator(self, mock_crew):
agent_evaluator = create_default_evaluator(agents=mock_crew.agents)
agent_evaluator = create_default_evaluator(crew=mock_crew)
assert isinstance(agent_evaluator, AgentEvaluator)
assert agent_evaluator.agents == mock_crew.agents
assert agent_evaluator.crew == mock_crew
expected_types = [
GoalAlignmentEvaluator,
@@ -98,181 +93,3 @@ class TestAgentEvaluator:
assert len(agent_evaluator.evaluators) == len(expected_types)
for evaluator, expected_type in zip(agent_evaluator.evaluators, expected_types):
assert isinstance(evaluator, expected_type)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_eval_lite_agent(self):
agent = Agent(
role="Test Agent",
goal="Complete test tasks successfully",
backstory="An agent created for testing purposes",
)
with crewai_event_bus.scoped_handlers():
events = {}
@crewai_event_bus.on(AgentEvaluationStartedEvent)
def capture_started(source, event):
events["started"] = event
@crewai_event_bus.on(AgentEvaluationCompletedEvent)
def capture_completed(source, event):
events["completed"] = event
@crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
events["failed"] = event
agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[GoalAlignmentEvaluator()])
agent.kickoff(messages="Complete this task successfully")
assert events.keys() == {"started", "completed"}
assert events["started"].agent_id == str(agent.id)
assert events["started"].agent_role == agent.role
assert events["started"].task_id is None
assert events["started"].iteration == 1
assert events["completed"].agent_id == str(agent.id)
assert events["completed"].agent_role == agent.role
assert events["completed"].task_id is None
assert events["completed"].iteration == 1
assert events["completed"].metric_category == MetricCategory.GOAL_ALIGNMENT
assert isinstance(events["completed"].score, EvaluationScore)
assert events["completed"].score.score == 2.0
results = agent_evaluator.get_evaluation_results()
assert isinstance(results, dict)
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == "lite_task"
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 2.0
expected_feedback = "The agent did not demonstrate a clear understanding of the task goal, which is to complete test tasks successfully"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 2' in goal_alignment.raw_response
@pytest.mark.vcr(filter_headers=["authorization"])
def test_eval_specific_agents_from_crew(self, mock_crew):
agent = Agent(
role="Test Agent Eval",
goal="Complete test tasks successfully",
backstory="An agent created for testing purposes",
)
task = Task(
description="Test task description",
agent=agent,
expected_output="Expected test output"
)
mock_crew.agents.append(agent)
mock_crew.tasks.append(task)
with crewai_event_bus.scoped_handlers():
events = {}
@crewai_event_bus.on(AgentEvaluationStartedEvent)
def capture_started(source, event):
events["started"] = event
@crewai_event_bus.on(AgentEvaluationCompletedEvent)
def capture_completed(source, event):
events["completed"] = event
@crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
events["failed"] = event
agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[GoalAlignmentEvaluator()])
mock_crew.kickoff()
assert events.keys() == {"started", "completed"}
assert events["started"].agent_id == str(agent.id)
assert events["started"].agent_role == agent.role
assert events["started"].task_id == str(task.id)
assert events["started"].iteration == 1
assert events["completed"].agent_id == str(agent.id)
assert events["completed"].agent_role == agent.role
assert events["completed"].task_id == str(task.id)
assert events["completed"].iteration == 1
assert events["completed"].metric_category == MetricCategory.GOAL_ALIGNMENT
assert isinstance(events["completed"].score, EvaluationScore)
assert events["completed"].score.score == 5.0
results = agent_evaluator.get_evaluation_results()
assert isinstance(results, dict)
assert len(results.keys()) == 1
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == str(task.id)
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 5.0
expected_feedback = "The agent provided a thorough guide on how to conduct a test task but failed to produce specific expected output"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 5' in goal_alignment.raw_response
@pytest.mark.vcr(filter_headers=["authorization"])
def test_failed_evaluation(self, mock_crew):
agent, = mock_crew.agents
task, = mock_crew.tasks
with crewai_event_bus.scoped_handlers():
events = {}
@crewai_event_bus.on(AgentEvaluationStartedEvent)
def capture_started(source, event):
events["started"] = event
@crewai_event_bus.on(AgentEvaluationCompletedEvent)
def capture_completed(source, event):
events["completed"] = event
@crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
events["failed"] = event
# Create a mock evaluator that will raise an exception
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator
from crewai.experimental.evaluation import MetricCategory
class FailingEvaluator(BaseEvaluator):
metric_category = MetricCategory.GOAL_ALIGNMENT
def evaluate(self, agent, task, execution_trace, final_output):
raise ValueError("Forced evaluation failure")
agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[FailingEvaluator()])
mock_crew.kickoff()
assert events.keys() == {"started", "failed"}
assert events["started"].agent_id == str(agent.id)
assert events["started"].agent_role == agent.role
assert events["started"].task_id == str(task.id)
assert events["started"].iteration == 1
assert events["failed"].agent_id == str(agent.id)
assert events["failed"].agent_role == agent.role
assert events["failed"].task_id == str(task.id)
assert events["failed"].iteration == 1
assert events["failed"].error == "Forced evaluation failure"
results = agent_evaluator.get_evaluation_results()
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == str(task.id)
assert result.metrics == {}

View File

@@ -509,85 +509,6 @@ def test_deepseek_r1_with_open_router():
assert "Paris" in result
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_passes_extra_headers():
"""Test that extra_headers parameter is passed to litellm.completion."""
extra_headers = {
"X-Custom-Auth": "bearer token123",
"X-API-Version": "v1.0"
}
llm = LLM(
model="gpt-4o-mini",
extra_headers=extra_headers,
)
messages = [{"role": "user", "content": "Hello, world!"}]
with patch("litellm.completion") as mocked_completion:
mock_message = MagicMock()
mock_message.content = "Test response"
mock_choice = MagicMock()
mock_choice.message = mock_message
mock_response = MagicMock()
mock_response.choices = [mock_choice]
mock_response.usage = {
"prompt_tokens": 5,
"completion_tokens": 5,
"total_tokens": 10,
}
mocked_completion.return_value = mock_response
result = llm.call(messages)
mocked_completion.assert_called_once()
_, kwargs = mocked_completion.call_args
assert kwargs["extra_headers"] == extra_headers
assert kwargs["model"] == "gpt-4o-mini"
assert kwargs["messages"] == messages
assert result == "Test response"
def test_llm_extra_headers_none_by_default():
"""Test that extra_headers defaults to None and doesn't break existing functionality."""
llm = LLM(model="gpt-4o-mini")
messages = [{"role": "user", "content": "Hello, world!"}]
with patch("litellm.completion") as mocked_completion:
mock_message = MagicMock()
mock_message.content = "Test response"
mock_choice = MagicMock()
mock_choice.message = mock_message
mock_response = MagicMock()
mock_response.choices = [mock_choice]
mock_response.usage = {
"prompt_tokens": 5,
"completion_tokens": 5,
"total_tokens": 10,
}
mocked_completion.return_value = mock_response
result = llm.call(messages)
mocked_completion.assert_called_once()
_, kwargs = mocked_completion.call_args
assert "extra_headers" not in kwargs
assert kwargs["model"] == "gpt-4o-mini"
assert kwargs["messages"] == messages
assert result == "Test response"
def assert_event_count(
mock_emit,
expected_completed_tool_call: int = 0,

View File

@@ -1,13 +1,31 @@
import threading
from typing import Any, Callable, cast
from unittest.mock import Mock
import pytest
from crewai.utilities.events.base_events import BaseEvent
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
@pytest.fixture(autouse=True)
def scoped_event_handlers():
with crewai_event_bus.scoped_handlers():
yield
class TestEvent(BaseEvent):
pass
class AnotherThreadTestEvent(BaseEvent):
pass
class CrossThreadTestEvent(BaseEvent):
pass
def test_specific_event_handler():
mock_handler = Mock()
@@ -44,4 +62,444 @@ def test_event_bus_error_handling(capfd):
out, err = capfd.readouterr()
assert "Simulated handler failure" in out
assert "Handler 'broken_handler' failed" in out
assert "Global handler 'broken_handler' failed" in out
def test_singleton_pattern_across_threads():
instances = []
def get_instance():
instances.append(crewai_event_bus)
threads = []
for _ in range(10):
thread = threading.Thread(target=get_instance)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
assert len(instances) == 10
for instance in instances:
assert instance is crewai_event_bus
assert instance is instances[0]
def test_default_handlers_are_global():
"""Test that handlers registered with @crewai_event_bus.on() are global by default."""
received_events = []
mock_handler = Mock()
@crewai_event_bus.on(CrossThreadTestEvent)
def global_handler(source, event):
received_events.append((source, event))
mock_handler(source, event)
def thread_worker(thread_id):
# Emit event from a different thread
event = CrossThreadTestEvent(type=f"cross_thread_event_{thread_id}")
crewai_event_bus.emit(f"thread_source_{thread_id}", event)
# Start multiple threads that emit events
threads = []
for i in range(3):
thread = threading.Thread(target=thread_worker, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
# Verify that the global handler received all events from different threads
assert len(received_events) == 3
assert mock_handler.call_count == 3
# Check that events from different threads were received
for i in range(3):
source, event = received_events[i]
assert source == f"thread_source_{i}"
assert event.type == f"cross_thread_event_{i}"
def test_scoped_handlers_thread_isolation():
"""Test that scoped_handlers() provides thread-local isolation for testing."""
global_events = []
scoped_events = []
# Register a global handler
@crewai_event_bus.on(CrossThreadTestEvent)
def global_handler(source, event):
global_events.append((source, event))
# Emit an event - should be received by global handler
event1 = CrossThreadTestEvent(type="event_1")
crewai_event_bus.emit("source_1", event1)
assert len(global_events) == 1
# Use scoped handlers for testing isolation
with crewai_event_bus.scoped_handlers():
# Register a handler in the scoped context (thread-local)
@crewai_event_bus.on(CrossThreadTestEvent)
def scoped_handler(source, event):
scoped_events.append((source, event))
# Emit event - should be received by scoped handler only
event2 = CrossThreadTestEvent(type="event_2")
crewai_event_bus.emit("source_2", event2)
# After scope, emit another event - should be received by global handler only
event3 = CrossThreadTestEvent(type="event_3")
crewai_event_bus.emit("source_3", event3)
# Verify events
assert len(global_events) == 2 # event_1 and event_3
assert len(scoped_events) == 1 # only event_2
assert global_events[0] == ("source_1", event1)
assert scoped_events[0] == ("source_2", event2)
assert global_events[1] == ("source_3", event3)
def test_scoped_handlers_thread_safety():
"""Test that scoped handlers work correctly across multiple threads."""
thread_results = {}
def thread_worker(thread_id):
with crewai_event_bus.scoped_handlers():
mock_handler = Mock()
@crewai_event_bus.on(AnotherThreadTestEvent)
def scoped_handler(source, event):
mock_handler(f"scoped_thread_{thread_id}", event)
scoped_event = AnotherThreadTestEvent(type=f"scoped_event_{thread_id}")
crewai_event_bus.emit(f"scoped_source_{thread_id}", scoped_event)
thread_results[thread_id] = {
"mock_handler": mock_handler,
"scoped_event": scoped_event,
}
# After scope, emit event - should not be received by scoped handler
post_scoped_event = AnotherThreadTestEvent(type=f"post_scoped_{thread_id}")
crewai_event_bus.emit(f"post_source_{thread_id}", post_scoped_event)
threads = []
for i in range(5):
thread = threading.Thread(target=thread_worker, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
for thread_id, result in thread_results.items():
result["mock_handler"].assert_called_once_with(
f"scoped_thread_{thread_id}", result["scoped_event"]
)
def test_register_handler_method():
"""Test the register_handler method works with global handlers."""
received_events = []
def handler(source, event):
received_events.append((source, event))
# Register handler using the method
crewai_event_bus.register_handler(CrossThreadTestEvent, handler)
# Emit event from different thread
def thread_worker():
event = CrossThreadTestEvent(type="test_event")
crewai_event_bus.emit("thread_source", event)
thread = threading.Thread(target=thread_worker)
thread.start()
thread.join()
# Verify handler received the event
assert len(received_events) == 1
assert received_events[0] == (
"thread_source",
CrossThreadTestEvent(type="test_event"),
)
def test_scoped_global_handlers():
"""Test the scoped_global_handlers context manager."""
global_events = []
def global_handler(source, event):
global_events.append((source, event))
# Register a global handler
crewai_event_bus.register_handler(CrossThreadTestEvent, global_handler)
# Emit an event - should be received
event1 = CrossThreadTestEvent(type="event_1")
crewai_event_bus.emit("source_1", event1)
assert len(global_events) == 1
# Use scoped global handlers
with crewai_event_bus.scoped_global_handlers():
# Register a different handler in scope
def scoped_handler(source, event):
global_events.append(("scoped", source, event))
crewai_event_bus.register_handler(CrossThreadTestEvent, scoped_handler)
# Emit event - should be received by scoped handler
event2 = CrossThreadTestEvent(type="event_2")
crewai_event_bus.emit("source_2", event2)
# After scope, original handler should be restored
event3 = CrossThreadTestEvent(type="event_3")
crewai_event_bus.emit("source_3", event3)
# Verify events
assert len(global_events) == 3
assert global_events[0] == ("source_1", event1)
assert global_events[1] == ("scoped", "source_2", event2)
assert global_events[2] == ("source_3", event3)
def test_handler_duplication_scenarios():
"""Test various scenarios where handler duplication can occur."""
call_counts = []
def handler(source, event):
call_counts.append(1)
# Scenario 1: Register the same handler multiple times
crewai_event_bus.register_handler(TestEvent, handler)
crewai_event_bus.register_handler(TestEvent, handler) # Duplicate registration
# Scenario 2: Use decorator multiple times on the same function
@crewai_event_bus.on(TestEvent)
def decorated_handler1(source, event):
call_counts.append(1)
@crewai_event_bus.on(TestEvent)
def decorated_handler2(source, event): # Same function name, different instance
call_counts.append(1)
# Emit an event
event = TestEvent(type="test_event")
crewai_event_bus.emit("source", event)
# Currently, all handlers are called (including duplicates)
# This shows the current behavior - handlers can be duplicated
assert len(call_counts) >= 4 # At least 4 calls (2 direct + 2 decorated)
def test_module_reload_duplication():
"""Test duplication that could occur from module reloading."""
call_counts = []
def create_handler():
def handler(source, event):
call_counts.append(1)
return handler
# Simulate module reload scenario
handler1 = create_handler()
handler2 = create_handler() # Same function, different instance
crewai_event_bus.register_handler(TestEvent, handler1)
crewai_event_bus.register_handler(TestEvent, handler2)
event = TestEvent(type="test_event")
crewai_event_bus.emit("source", event)
# Both handlers are called (duplication)
assert len(call_counts) == 2
def test_listener_class_duplication():
"""Test duplication from multiple listener class instances."""
call_counts = []
class TestListener:
def __init__(self):
@crewai_event_bus.on(TestEvent)
def handler(source, event):
call_counts.append(1)
# Create multiple instances (simulating multiple imports)
listener1 = TestListener()
listener2 = TestListener()
event = TestEvent(type="test_event")
crewai_event_bus.emit("source", event)
# Both instances register handlers (duplication)
assert len(call_counts) == 2
def test_handler_deduplication():
"""Test that duplicate handlers are automatically prevented."""
call_counts = []
def handler(source, event):
call_counts.append(1)
# Register the same handler multiple times
result1 = crewai_event_bus.register_handler(TestEvent, handler)
result2 = crewai_event_bus.register_handler(
TestEvent, handler
) # Duplicate registration
# First registration should succeed, second should fail
assert result1 is True
assert result2 is False
# Emit an event
event = TestEvent(type="test_event")
crewai_event_bus.emit("source", event)
# Handler should only be called once (no duplication)
assert len(call_counts) == 1
def test_decorator_deduplication():
"""Test that decorator prevents duplicate registrations."""
call_counts = []
# Define the same handler function
def handler(source, event):
call_counts.append(1)
# Register using decorator
@crewai_event_bus.on(TestEvent)
def decorated_handler(source, event):
call_counts.append(1)
# Try to register the same function again using register_handler
result = crewai_event_bus.register_handler(
TestEvent, cast(Callable[[Any, BaseEvent], None], decorated_handler)
)
# Should fail because it's already registered
assert result is False
# Emit an event
event = TestEvent(type="test_event")
crewai_event_bus.emit("source", event)
# Should only be called once
assert len(call_counts) == 1
def test_handler_unregistration():
"""Test that handlers can be unregistered."""
call_counts = []
def handler(source, event):
call_counts.append(1)
# Register handler
crewai_event_bus.register_handler(TestEvent, handler)
# Verify it's registered
assert crewai_event_bus.get_handler_count(TestEvent) == 1
# Emit event - should be called
event = TestEvent(type="test_event")
crewai_event_bus.emit("source", event)
assert len(call_counts) == 1
# Unregister handler
result = crewai_event_bus.unregister_handler(TestEvent, handler)
assert result is True
assert crewai_event_bus.get_handler_count(TestEvent) == 0
# Emit event again - should not be called
crewai_event_bus.emit("source", event)
assert len(call_counts) == 1 # Still only 1 call
def test_handler_count_tracking():
"""Test that handler counts are tracked correctly."""
def handler1(source, event):
pass
def handler2(source, event):
pass
# Initially no handlers
assert crewai_event_bus.get_handler_count(TestEvent) == 0
# Register first handler
crewai_event_bus.register_handler(TestEvent, handler1)
assert crewai_event_bus.get_handler_count(TestEvent) == 1
# Register second handler
crewai_event_bus.register_handler(TestEvent, handler2)
assert crewai_event_bus.get_handler_count(TestEvent) == 2
# Try to register first handler again (should fail)
crewai_event_bus.register_handler(TestEvent, handler1)
assert crewai_event_bus.get_handler_count(TestEvent) == 2 # Count unchanged
# Unregister first handler
crewai_event_bus.unregister_handler(TestEvent, handler1)
assert crewai_event_bus.get_handler_count(TestEvent) == 1
# Unregister second handler
crewai_event_bus.unregister_handler(TestEvent, handler2)
assert crewai_event_bus.get_handler_count(TestEvent) == 0
def test_different_event_types_dont_conflict():
"""Test that handlers for different event types don't interfere."""
test_event_calls = []
cross_thread_calls = []
def test_event_handler(source, event):
test_event_calls.append(1)
def cross_thread_handler(source, event):
cross_thread_calls.append(1)
# Register handlers for different event types
crewai_event_bus.register_handler(TestEvent, test_event_handler)
crewai_event_bus.register_handler(CrossThreadTestEvent, cross_thread_handler)
# Emit TestEvent
test_event = TestEvent(type="test")
crewai_event_bus.emit("source", test_event)
assert len(test_event_calls) == 1
assert len(cross_thread_calls) == 0
# Emit CrossThreadTestEvent
cross_thread_event = CrossThreadTestEvent(type="cross_thread")
crewai_event_bus.emit("source", cross_thread_event)
assert len(test_event_calls) == 1 # Unchanged
assert len(cross_thread_calls) == 1
def test_scoped_handlers_with_deduplication():
"""Test that deduplication works within scoped handlers."""
call_counts = []
def handler(source, event):
call_counts.append(1)
# Register global handler
crewai_event_bus.register_handler(TestEvent, handler)
# Use scoped handlers
with crewai_event_bus.scoped_handlers():
# Try to register the same handler in scoped context
@crewai_event_bus.on(TestEvent)
def scoped_handler(source, event):
call_counts.append(1)
# Emit event - should be called by both global and scoped handlers
event = TestEvent(type="test_event")
crewai_event_bus.emit("source", event)
# Should have 2 calls (1 global + 1 scoped)
assert len(call_counts) == 2

6390
uv.lock generated

File diff suppressed because it is too large Load Diff