Compare commits

...

12 Commits

Author SHA1 Message Date
Lucas Gomide
064997464e fix: allow messages be empty on LLMCallCompletedEvent 2025-07-11 14:05:25 -03:00
Lucas Gomide
6f0ed6642b style: fix mypy issues 2025-07-11 13:02:34 -03:00
Lucas Gomide
43f339fa84 style: resolve linter issues 2025-07-11 13:02:34 -03:00
Lucas Gomide
5ea221e54e fix: render all feedback per iteration 2025-07-11 13:02:34 -03:00
Lucas Gomide
d4c15ec25f test: add Agent eval tests 2025-07-11 13:02:34 -03:00
Lucas Gomide
37cfbe7389 fix: do not evaluate Agent by default
This is a experimental feature we still need refine it further
2025-07-11 13:02:34 -03:00
Lucas Gomide
6d7c7d940e feat: add AgentEvaluator class
This class will evaluate Agent' results and report to user
2025-07-11 13:02:34 -03:00
Lucas Gomide
80bd23a8a9 feat: add Reasoning Metrics for Agent evaluation, still in progress 2025-07-11 13:02:34 -03:00
Lucas Gomide
50593d1485 feat: add Tool Metrics for Agent evaluation 2025-07-11 13:02:34 -03:00
Lucas Gomide
60084af745 feat: add SemanticQuality metric for Agent evaluation 2025-07-11 13:02:34 -03:00
Lucas Gomide
be4ade8c45 feat: add GoalAlignment metric for Agent evaluation 2025-07-11 13:02:34 -03:00
Lucas Gomide
6a49a24810 feat: add exchanged messages in LLMCallCompletedEvent 2025-07-11 13:02:34 -03:00
26 changed files with 2930 additions and 14 deletions

View File

@@ -1313,6 +1313,7 @@ class Crew(FlowTrackable, BaseModel):
n_iterations: int,
eval_llm: Union[str, InstanceOf[BaseLLM]],
inputs: Optional[Dict[str, Any]] = None,
include_agent_eval: Optional[bool] = False
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
try:
@@ -1331,13 +1332,29 @@ class Crew(FlowTrackable, BaseModel):
),
)
test_crew = self.copy()
# TODO: Refator to use a single Evaluator Manage class
evaluator = CrewEvaluator(test_crew, llm_instance)
if include_agent_eval:
from crewai.evaluation import create_default_evaluator
agent_evaluator = create_default_evaluator(crew=test_crew)
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)
if include_agent_eval:
agent_evaluator.set_iteration(i)
test_crew.kickoff(inputs=inputs)
# TODO: Refactor to use ListenerEvents instead of trigger each iteration manually
if include_agent_eval:
agent_evaluator.evaluate_current_iteration()
evaluator.print_crew_evaluation_result()
if include_agent_eval:
agent_evaluator.get_agent_evaluation(include_evaluation_feedback=True)
crewai_event_bus.emit(
self,

View File

@@ -0,0 +1,53 @@
from crewai.evaluation.base_evaluator import (
BaseEvaluator,
EvaluationScore,
MetricCategory,
AgentEvaluationResult
)
from crewai.evaluation.metrics.semantic_quality_metrics import (
SemanticQualityEvaluator
)
from crewai.evaluation.metrics.goal_metrics import (
GoalAlignmentEvaluator
)
from crewai.evaluation.metrics.reasoning_metrics import (
ReasoningEfficiencyEvaluator
)
from crewai.evaluation.metrics.tools_metrics import (
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.evaluation.evaluation_listener import (
EvaluationTraceCallback,
create_evaluation_callbacks
)
from crewai.evaluation.agent_evaluator import (
AgentEvaluator,
create_default_evaluator
)
__all__ = [
"BaseEvaluator",
"EvaluationScore",
"MetricCategory",
"AgentEvaluationResult",
"SemanticQualityEvaluator",
"GoalAlignmentEvaluator",
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"EvaluationTraceCallback",
"create_evaluation_callbacks",
"AgentEvaluator",
"create_default_evaluator"
]

View File

@@ -0,0 +1,178 @@
from crewai.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy
from crewai.agent import Agent
from crewai.task import Task
from crewai.evaluation.evaluation_display import EvaluationDisplayFormatter
from typing import Any, Dict
from collections import defaultdict
from crewai.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence
from crewai.crew import Crew
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
class AgentEvaluator:
def __init__(
self,
evaluators: Sequence[BaseEvaluator] | None = None,
crew: Crew | None = None,
):
self.crew: Crew | None = crew
self.evaluators: Sequence[BaseEvaluator] | None = evaluators
self.agent_evaluators: dict[str, Sequence[BaseEvaluator] | None] = {}
if crew is not None:
assert crew and crew.agents is not None
for agent in crew.agents:
self.agent_evaluators[str(agent.id)] = self.evaluators
self.callback = create_evaluation_callbacks()
self.console_formatter = ConsoleFormatter()
self.display_formatter = EvaluationDisplayFormatter()
self.iteration = 1
self.iterations_results: dict[int, dict[str, list[AgentEvaluationResult]]] = {}
def set_iteration(self, iteration: int) -> None:
self.iteration = iteration
def evaluate_current_iteration(self) -> dict[str, list[AgentEvaluationResult]]:
if not self.crew:
raise ValueError("Cannot evaluate: no crew was provided to the evaluator.")
if not self.callback:
raise ValueError("Cannot evaluate: no callback was set. Use set_callback() method first.")
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
evaluation_results: defaultdict[str, list[AgentEvaluationResult]] = defaultdict(list)
total_evals = 0
for agent in self.crew.agents:
for task in self.crew.tasks:
if task.agent and task.agent.id == agent.id and self.agent_evaluators.get(str(agent.id)):
total_evals += 1
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}[/bold blue]"),
BarColumn(),
TextColumn("{task.percentage:.0f}% completed"),
console=self.console_formatter.console
) as progress:
eval_task = progress.add_task(f"Evaluating agents (iteration {self.iteration})...", total=total_evals)
for agent in self.crew.agents:
evaluator = self.agent_evaluators.get(str(agent.id))
if not evaluator:
continue
for task in self.crew.tasks:
if task.agent and str(task.agent.id) != str(agent.id):
continue
trace = self.callback.get_trace(str(agent.id), str(task.id))
if not trace:
self.console_formatter.print(f"[yellow]Warning: No trace found for agent {agent.role} on task {task.description[:30]}...[/yellow]")
progress.update(eval_task, advance=1)
continue
with crewai_event_bus.scoped_handlers():
result = self.evaluate(
agent=agent,
task=task,
execution_trace=trace,
final_output=task.output
)
evaluation_results[agent.role].append(result)
progress.update(eval_task, advance=1)
self.iterations_results[self.iteration] = evaluation_results
return evaluation_results
def get_evaluation_results(self):
if self.iteration in self.iterations_results:
return self.iterations_results[self.iteration]
return self.evaluate_current_iteration()
def display_results_with_iterations(self):
self.display_formatter.display_summary_results(self.iterations_results)
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = False):
agent_results = {}
with crewai_event_bus.scoped_handlers():
task_results = self.get_evaluation_results()
for agent_role, results in task_results.items():
if not results:
continue
agent_id = results[0].agent_id
aggregated_result = self.display_formatter._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=results,
strategy=strategy
)
agent_results[agent_role] = aggregated_result
if self.iteration == max(self.iterations_results.keys()):
self.display_results_with_iterations()
if include_evaluation_feedback:
self.display_evaluation_with_feedback()
return agent_results
def display_evaluation_with_feedback(self):
self.display_formatter.display_evaluation_with_feedback(self.iterations_results)
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any
) -> AgentEvaluationResult:
result = AgentEvaluationResult(
agent_id=str(agent.id),
task_id=str(task.id)
)
assert self.evaluators is not None
for evaluator in self.evaluators:
try:
score = evaluator.evaluate(
agent=agent,
task=task,
execution_trace=execution_trace,
final_output=final_output
)
result.metrics[evaluator.metric_category] = score
except Exception as e:
self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}")
return result
def create_default_evaluator(crew, llm=None):
from crewai.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
)
evaluators = [
GoalAlignmentEvaluator(llm=llm),
SemanticQualityEvaluator(llm=llm),
ToolSelectionEvaluator(llm=llm),
ParameterExtractionEvaluator(llm=llm),
ToolInvocationEvaluator(llm=llm),
ReasoningEfficiencyEvaluator(llm=llm),
]
return AgentEvaluator(evaluators=evaluators, crew=crew)

View File

@@ -0,0 +1,125 @@
import abc
import enum
from enum import Enum
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.task import Task
from crewai.llm import BaseLLM
from crewai.utilities.llm_utils import create_llm
class MetricCategory(enum.Enum):
GOAL_ALIGNMENT = "goal_alignment"
SEMANTIC_QUALITY = "semantic_quality"
REASONING_EFFICIENCY = "reasoning_efficiency"
TOOL_SELECTION = "tool_selection"
PARAMETER_EXTRACTION = "parameter_extraction"
TOOL_INVOCATION = "tool_invocation"
def title(self):
return self.value.replace('_', ' ').title()
class EvaluationScore(BaseModel):
score: float | None = Field(
default=5.0,
description="Numeric score from 0-10 where 0 is worst and 10 is best, None if not applicable",
ge=0.0,
le=10.0
)
feedback: str = Field(
default="",
description="Detailed feedback explaining the evaluation score"
)
raw_response: str | None = Field(
default=None,
description="Raw response from the evaluator (e.g., LLM)"
)
def __str__(self) -> str:
if self.score is None:
return f"Score: N/A - {self.feedback}"
return f"Score: {self.score:.1f}/10 - {self.feedback}"
class BaseEvaluator(abc.ABC):
def __init__(self, llm: BaseLLM | None = None):
self.llm: BaseLLM | None = create_llm(llm)
@property
@abc.abstractmethod
def metric_category(self) -> MetricCategory:
pass
@abc.abstractmethod
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
) -> EvaluationScore:
pass
class AgentEvaluationResult(BaseModel):
agent_id: str = Field(description="ID of the evaluated agent")
task_id: str = Field(description="ID of the task that was executed")
metrics: Dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Evaluation scores for each metric category"
)
class AggregationStrategy(Enum):
SIMPLE_AVERAGE = "simple_average" # Equal weight to all tasks
WEIGHTED_BY_COMPLEXITY = "weighted_by_complexity" # Weight by task complexity
BEST_PERFORMANCE = "best_performance" # Use best scores across tasks
WORST_PERFORMANCE = "worst_performance" # Use worst scores across tasks
class AgentAggregatedEvaluationResult(BaseModel):
agent_id: str = Field(
default="",
description="ID of the agent"
)
agent_role: str = Field(
default="",
description="Role of the agent"
)
task_count: int = Field(
default=0,
description="Number of tasks included in this aggregation"
)
aggregation_strategy: AggregationStrategy = Field(
default=AggregationStrategy.SIMPLE_AVERAGE,
description="Strategy used for aggregation"
)
metrics: Dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Aggregated metrics across all tasks"
)
task_results: List[str] = Field(
default_factory=list,
description="IDs of tasks included in this aggregation"
)
overall_score: Optional[float] = Field(
default=None,
description="Overall score for this agent"
)
def __str__(self) -> str:
result = f"Agent Evaluation: {self.agent_role}\n"
result += f"Strategy: {self.aggregation_strategy.value}\n"
result += f"Tasks evaluated: {self.task_count}\n"
for category, score in self.metrics.items():
result += f"\n\n- {category.value.upper()}: {score.score}/10\n"
if score.feedback:
detailed_feedback = "\n ".join(score.feedback.split('\n'))
result += f" {detailed_feedback}\n"
return result

View File

@@ -0,0 +1,341 @@
from collections import defaultdict
from typing import Dict, Any, List
from rich.table import Table
from rich.box import HEAVY_EDGE, ROUNDED
from collections.abc import Sequence
from crewai.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy, AgentEvaluationResult, MetricCategory
from crewai.evaluation import EvaluationScore
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.llm_utils import create_llm
class EvaluationDisplayFormatter:
def __init__(self):
self.console_formatter = ConsoleFormatter()
def display_evaluation_with_feedback(self, iterations_results: Dict[int, Dict[str, List[Any]]]):
if not iterations_results:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
return
# Get all agent roles across all iterations
all_agent_roles: set[str] = set()
for iter_results in iterations_results.values():
all_agent_roles.update(iter_results.keys())
for agent_role in sorted(all_agent_roles):
self.console_formatter.print(f"\n[bold cyan]Agent: {agent_role}[/bold cyan]")
# Process each iteration
for iter_num, results in sorted(iterations_results.items()):
if agent_role not in results or not results[agent_role]:
continue
agent_results = results[agent_role]
agent_id = agent_results[0].agent_id
# Aggregate results for this agent in this iteration
aggregated_result = self._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=agent_results,
)
# Display iteration header
self.console_formatter.print(f"\n[bold]Iteration {iter_num}[/bold]")
# Create table for this iteration
table = Table(box=ROUNDED)
table.add_column("Metric", style="cyan")
table.add_column("Score (1-10)", justify="center")
table.add_column("Feedback", style="green")
# Add metrics to table
if aggregated_result.metrics:
for metric, evaluation_score in aggregated_result.metrics.items():
score = evaluation_score.score
if isinstance(score, (int, float)):
if score >= 8.0:
score_text = f"[green]{score:.1f}[/green]"
elif score >= 6.0:
score_text = f"[cyan]{score:.1f}[/cyan]"
elif score >= 4.0:
score_text = f"[yellow]{score:.1f}[/yellow]"
else:
score_text = f"[red]{score:.1f}[/red]"
else:
score_text = "[dim]N/A[/dim]"
table.add_section()
table.add_row(
metric.title(),
score_text,
evaluation_score.feedback or ""
)
if aggregated_result.overall_score is not None:
overall_score = aggregated_result.overall_score
if overall_score >= 8.0:
overall_color = "green"
elif overall_score >= 6.0:
overall_color = "cyan"
elif overall_score >= 4.0:
overall_color = "yellow"
else:
overall_color = "red"
table.add_section()
table.add_row(
"Overall Score",
f"[{overall_color}]{overall_score:.1f}[/]",
"Overall agent evaluation score"
)
# Print the table for this iteration
self.console_formatter.print(table)
def display_summary_results(self, iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]]):
if not iterations_results:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
return
self.console_formatter.print("\n")
table = Table(title="Agent Performance Scores \n (1-10 Higher is better)", box=HEAVY_EDGE)
table.add_column("Agent/Metric", style="cyan")
for iter_num in sorted(iterations_results.keys()):
run_label = f"Run {iter_num}"
table.add_column(run_label, justify="center")
table.add_column("Avg. Total", justify="center")
all_agent_roles: set[str] = set()
for results in iterations_results.values():
all_agent_roles.update(results.keys())
for agent_role in sorted(all_agent_roles):
agent_scores_by_iteration = {}
agent_metrics_by_iteration = {}
for iter_num, results in sorted(iterations_results.items()):
if agent_role not in results or not results[agent_role]:
continue
agent_results = results[agent_role]
agent_id = agent_results[0].agent_id
aggregated_result = self._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=agent_results,
strategy=AggregationStrategy.SIMPLE_AVERAGE
)
valid_scores = [score.score for score in aggregated_result.metrics.values()
if score.score is not None]
if valid_scores:
avg_score = sum(valid_scores) / len(valid_scores)
agent_scores_by_iteration[iter_num] = avg_score
agent_metrics_by_iteration[iter_num] = aggregated_result.metrics
if not agent_scores_by_iteration:
continue
avg_across_iterations = sum(agent_scores_by_iteration.values()) / len(agent_scores_by_iteration)
row = [f"[bold]{agent_role}[/bold]"]
for iter_num in sorted(iterations_results.keys()):
if iter_num in agent_scores_by_iteration:
score = agent_scores_by_iteration[iter_num]
if score >= 8.0:
color = "green"
elif score >= 6.0:
color = "cyan"
elif score >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[bold {color}]{score:.1f}[/]")
else:
row.append("-")
if avg_across_iterations >= 8.0:
color = "green"
elif avg_across_iterations >= 6.0:
color = "cyan"
elif avg_across_iterations >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[bold {color}]{avg_across_iterations:.1f}[/]")
table.add_row(*row)
all_metrics: set[Any] = set()
for metrics in agent_metrics_by_iteration.values():
all_metrics.update(metrics.keys())
for metric in sorted(all_metrics, key=lambda x: x.value):
metric_scores = []
row = [f" - {metric.title()}"]
for iter_num in sorted(iterations_results.keys()):
if (iter_num in agent_metrics_by_iteration and
metric in agent_metrics_by_iteration[iter_num]):
metric_score = agent_metrics_by_iteration[iter_num][metric].score
if metric_score is not None:
metric_scores.append(metric_score)
if metric_score >= 8.0:
color = "green"
elif metric_score >= 6.0:
color = "cyan"
elif metric_score >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[{color}]{metric_score:.1f}[/]")
else:
row.append("[dim]N/A[/dim]")
else:
row.append("-")
if metric_scores:
avg = sum(metric_scores) / len(metric_scores)
if avg >= 8.0:
color = "green"
elif avg >= 6.0:
color = "cyan"
elif avg >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[{color}]{avg:.1f}[/]")
else:
row.append("-")
table.add_row(*row)
table.add_row(*[""] * (len(sorted(iterations_results.keys())) + 2))
self.console_formatter.print(table)
self.console_formatter.print("\n")
def _aggregate_agent_results(
self,
agent_id: str,
agent_role: str,
results: Sequence[AgentEvaluationResult],
strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE,
) -> AgentAggregatedEvaluationResult:
metrics_by_category: dict[MetricCategory, list[EvaluationScore]] = defaultdict(list)
for result in results:
for metric_name, evaluation_score in result.metrics.items():
metrics_by_category[metric_name].append(evaluation_score)
aggregated_metrics: dict[MetricCategory, EvaluationScore] = {}
for category, scores in metrics_by_category.items():
valid_scores = [s.score for s in scores if s.score is not None]
avg_score = sum(valid_scores) / len(valid_scores) if valid_scores else None
feedbacks = [s.feedback for s in scores if s.feedback]
feedback_summary = None
if feedbacks:
if len(feedbacks) > 1:
# Use the summarization method for multiple feedbacks
feedback_summary = self._summarize_feedbacks(
agent_role=agent_role,
metric=category.title(),
feedbacks=feedbacks,
scores=[s.score for s in scores],
strategy=strategy
)
else:
feedback_summary = feedbacks[0]
aggregated_metrics[category] = EvaluationScore(
score=avg_score,
feedback=feedback_summary
)
overall_score = None
if aggregated_metrics:
valid_scores = [m.score for m in aggregated_metrics.values() if m.score is not None]
if valid_scores:
overall_score = sum(valid_scores) / len(valid_scores)
return AgentAggregatedEvaluationResult(
agent_id=agent_id,
agent_role=agent_role,
metrics=aggregated_metrics,
overall_score=overall_score,
task_count=len(results),
aggregation_strategy=strategy
)
def _summarize_feedbacks(
self,
agent_role: str,
metric: str,
feedbacks: List[str],
scores: List[float | None],
strategy: AggregationStrategy
) -> str:
if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks):
return "\n\n".join([f"Feedback {i+1}: {fb}" for i, fb in enumerate(feedbacks)])
try:
llm = create_llm()
formatted_feedbacks = []
for i, (feedback, score) in enumerate(zip(feedbacks, scores)):
if len(feedback) > 500:
feedback = feedback[:500] + "..."
score_text = f"{score:.1f}" if score is not None else "N/A"
formatted_feedbacks.append(f"Feedback #{i+1} (Score: {score_text}):\n{feedback}")
all_feedbacks = "\n\n" + "\n\n---\n\n".join(formatted_feedbacks)
strategy_guidance = ""
if strategy == AggregationStrategy.BEST_PERFORMANCE:
strategy_guidance = "Focus on the highest-scoring aspects and strengths demonstrated."
elif strategy == AggregationStrategy.WORST_PERFORMANCE:
strategy_guidance = "Focus on areas that need improvement and common issues across tasks."
else: # Default/average strategies
strategy_guidance = "Provide a balanced analysis of strengths and weaknesses across all tasks."
prompt = [
{"role": "system", "content": f"""You are an expert evaluator creating a comprehensive summary of agent performance feedback.
Your job is to synthesize multiple feedback points about the same metric across different tasks.
Create a concise, insightful summary that captures the key patterns and themes from all feedback.
{strategy_guidance}
Your summary should be:
1. Specific and concrete (not vague or general)
2. Focused on actionable insights
3. Highlighting patterns across tasks
4. 150-250 words in length
The summary should be directly usable as final feedback for the agent's performance on this metric."""},
{"role": "user", "content": f"""I need a synthesized summary of the following feedback for:
Agent Role: {agent_role}
Metric: {metric.title()}
{all_feedbacks}
"""}
]
assert llm is not None
response = llm.call(prompt)
return response
except Exception:
return "Synthesized from multiple tasks: " + "\n\n".join([f"- {fb[:500]}..." for fb in feedbacks])

View File

@@ -0,0 +1,190 @@
from datetime import datetime
from typing import Any, Dict, Optional
from collections.abc import Sequence
from crewai.agent import Agent
from crewai.task import Task
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent
)
from crewai.utilities.events.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
ToolExecutionErrorEvent,
ToolSelectionErrorEvent,
ToolValidateInputErrorEvent
)
from crewai.utilities.events.llm_events import (
LLMCallStartedEvent,
LLMCallCompletedEvent
)
class EvaluationTraceCallback(BaseEventListener):
"""Event listener for collecting execution traces for evaluation.
This listener attaches to the event bus to collect detailed information
about the execution process, including agent steps, tool uses, knowledge
retrievals, and final output - all for use in agent evaluation.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not hasattr(self, "_initialized") or not self._initialized:
super().__init__()
self.traces = {}
self.current_agent_id = None
self.current_task_id = None
self._initialized = True
def setup_listeners(self, event_bus: CrewAIEventsBus):
@event_bus.on(AgentExecutionStartedEvent)
def on_agent_started(source, event: AgentExecutionStartedEvent):
self.on_agent_start(event.agent, event.task)
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_completed(source, event: AgentExecutionCompletedEvent):
self.on_agent_finish(event.agent, event.task, event.output)
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_completed(source, event: ToolUsageFinishedEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.output, success=True)
@event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="usage_error")
@event_bus.on(ToolExecutionErrorEvent)
def on_tool_execution_error(source, event: ToolExecutionErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="execution_error")
@event_bus.on(ToolSelectionErrorEvent)
def on_tool_selection_error(source, event: ToolSelectionErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="selection_error")
@event_bus.on(ToolValidateInputErrorEvent)
def on_tool_validate_input_error(source, event: ToolValidateInputErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="validation_error")
@event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
self.on_llm_call_start(event.messages, event.tools)
@event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.on_llm_call_end(event.messages, event.response)
def on_agent_start(self, agent: Agent, task: Task):
self.current_agent_id = agent.id
self.current_task_id = task.id
trace_key = f"{agent.id}_{task.id}"
self.traces[trace_key] = {
"agent_id": agent.id,
"task_id": task.id,
"tool_uses": [],
"llm_calls": [],
"start_time": datetime.now(),
"final_output": None
}
def on_agent_finish(self, agent: Agent, task: Task, output: Any):
trace_key = f"{agent.id}_{task.id}"
if trace_key in self.traces:
self.traces[trace_key]["final_output"] = output
self.traces[trace_key]["end_time"] = datetime.now()
self.current_agent_id = None
self.current_task_id = None
def on_tool_use(self, tool_name: str, tool_args: dict[str, Any] | str, result: Any,
success: bool = True, error_type: str | None = None):
if not self.current_agent_id or not self.current_task_id:
return
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
if trace_key in self.traces:
tool_use = {
"tool": tool_name,
"args": tool_args,
"result": result,
"success": success,
"timestamp": datetime.now()
}
# Add error information if applicable
if not success and error_type:
tool_use["error"] = True
tool_use["error_type"] = error_type
self.traces[trace_key]["tool_uses"].append(tool_use)
def on_llm_call_start(self, messages: str | Sequence[dict[str, Any]] | None, tools: Sequence[dict[str, Any]] | None = None):
if not self.current_agent_id or not self.current_task_id:
return
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
if trace_key not in self.traces:
return
self.current_llm_call = {
"messages": messages,
"tools": tools,
"start_time": datetime.now(),
"response": None,
"end_time": None
}
def on_llm_call_end(self, messages: str | list[dict[str, Any]] | None, response: Any):
if not self.current_agent_id or not self.current_task_id:
return
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
if trace_key not in self.traces:
return
total_tokens = 0
if hasattr(response, "usage") and hasattr(response.usage, "total_tokens"):
total_tokens = response.usage.total_tokens
current_time = datetime.now()
start_time = None
if hasattr(self, "current_llm_call") and self.current_llm_call:
start_time = self.current_llm_call.get("start_time")
if not start_time:
start_time = current_time
llm_call = {
"messages": messages,
"response": response,
"start_time": start_time,
"end_time": current_time,
"total_tokens": total_tokens
}
self.traces[trace_key]["llm_calls"].append(llm_call)
if hasattr(self, "current_llm_call"):
self.current_llm_call = {}
def get_trace(self, agent_id: str, task_id: str) -> Optional[Dict[str, Any]]:
trace_key = f"{agent_id}_{task_id}"
return self.traces.get(trace_key)
def create_evaluation_callbacks() -> EvaluationTraceCallback:
return EvaluationTraceCallback()

View File

@@ -0,0 +1,30 @@
"""Robust JSON parsing utilities for evaluation responses."""
import json
import re
from typing import Any
def extract_json_from_llm_response(text: str) -> dict[str, Any]:
try:
return json.loads(text)
except json.JSONDecodeError:
pass
json_patterns = [
# Standard markdown code blocks with json
r'```json\s*([\s\S]*?)\s*```',
# Code blocks without language specifier
r'```\s*([\s\S]*?)\s*```',
# Inline code with JSON
r'`([{\\[].*[}\]])`',
]
for pattern in json_patterns:
matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL)
for match in matches:
try:
return json.loads(match.strip())
except json.JSONDecodeError:
continue
raise ValueError("No valid JSON found in the response")

View File

@@ -0,0 +1,66 @@
from typing import Any, Dict
from crewai.agent import Agent
from crewai.task import Task
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.evaluation.json_parser import extract_json_from_llm_response
class GoalAlignmentEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.GOAL_ALIGNMENT
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
) -> EvaluationScore:
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent's output aligns with its assigned task goal.
Score the agent's goal alignment on a scale from 0-10 where:
- 0: Complete misalignment, agent did not understand or attempt the task goal
- 5: Partial alignment, agent attempted the task but missed key requirements
- 10: Perfect alignment, agent fully satisfied all task requirements
Consider:
1. Did the agent correctly interpret the task goal?
2. Did the final output directly address the requirements?
3. Did the agent focus on relevant aspects of the task?
4. Did the agent provide all requested information or deliverables?
Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string).
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Agent goal: {agent.goal}
Task description: {task.description}
Expected output: {task.expected_output}
Agent's final output:
{final_output}
Evaluate how well the agent's output aligns with the assigned task goal.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
assert evaluation_data is not None
return EvaluationScore(
score=evaluation_data.get("score", 0),
feedback=evaluation_data.get("feedback", response),
raw_response=response
)
except Exception:
return EvaluationScore(
score=None,
feedback=f"Failed to parse evaluation. Raw response: {response}",
raw_response=response
)

View File

@@ -0,0 +1,355 @@
"""Agent reasoning efficiency evaluators.
This module provides evaluator implementations for:
- Reasoning efficiency
- Loop detection
- Thinking-to-action ratio
"""
import logging
import re
from enum import Enum
from typing import Any, Dict, List, Tuple
import numpy as np
from collections.abc import Sequence
from crewai.agent import Agent
from crewai.task import Task
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.evaluation.json_parser import extract_json_from_llm_response
from crewai.tasks.task_output import TaskOutput
class ReasoningPatternType(Enum):
EFFICIENT = "efficient" # Good reasoning flow
LOOP = "loop" # Agent is stuck in a loop
VERBOSE = "verbose" # Agent is unnecessarily verbose
INDECISIVE = "indecisive" # Agent struggles to make decisions
SCATTERED = "scattered" # Agent jumps between topics without focus
class ReasoningEfficiencyEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.REASONING_EFFICIENCY
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: TaskOutput,
) -> EvaluationScore:
llm_calls = execution_trace.get("llm_calls", [])
if not llm_calls or len(llm_calls) < 2:
return EvaluationScore(
score=None,
feedback="Insufficient LLM calls to evaluate reasoning efficiency."
)
total_calls = len(llm_calls)
total_tokens = sum(call.get("total_tokens", 0) for call in llm_calls)
avg_tokens_per_call = total_tokens / total_calls if total_calls > 0 else 0
time_intervals = []
has_reliable_timing = True
for i in range(1, len(llm_calls)):
start_time = llm_calls[i-1].get("end_time")
end_time = llm_calls[i].get("start_time")
if start_time and end_time and start_time != end_time:
try:
interval = end_time - start_time
time_intervals.append(interval.total_seconds() if hasattr(interval, 'total_seconds') else 0)
except Exception:
has_reliable_timing = False
else:
has_reliable_timing = False
loop_detected, loop_details = self._detect_loops(llm_calls)
pattern_analysis = self._analyze_reasoning_patterns(llm_calls)
efficiency_metrics = {
"total_llm_calls": total_calls,
"total_tokens": total_tokens,
"avg_tokens_per_call": avg_tokens_per_call,
"reasoning_pattern": pattern_analysis["primary_pattern"].value,
"loops_detected": loop_detected,
}
if has_reliable_timing and time_intervals:
efficiency_metrics["avg_time_between_calls"] = np.mean(time_intervals)
loop_info = f"Detected {len(loop_details)} potential reasoning loops." if loop_detected else "No significant reasoning loops detected."
call_samples = self._get_call_samples(llm_calls)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the reasoning efficiency of an AI agent's thought process.
Evaluate the agent's reasoning efficiency across these five key subcategories:
1. Focus (0-10): How well the agent stays on topic and avoids unnecessary tangents
2. Progression (0-10): How effectively the agent builds on previous thoughts rather than repeating or circling
3. Decision Quality (0-10): How decisively and appropriately the agent makes decisions
4. Conciseness (0-10): How efficiently the agent communicates without unnecessary verbosity
5. Loop Avoidance (0-10): How well the agent avoids getting stuck in repetitive thinking patterns
For each subcategory, provide a score from 0-10 where:
- 0: Completely inefficient
- 5: Moderately efficient
- 10: Highly efficient
The overall score should be a weighted average of these subcategories.
Return your evaluation as JSON with the following structure:
{
"overall_score": float,
"scores": {
"focus": float,
"progression": float,
"decision_quality": float,
"conciseness": float,
"loop_avoidance": float
},
"feedback": string (general feedback about overall reasoning efficiency),
"optimization_suggestions": string (concrete suggestions for improving reasoning efficiency),
"detected_patterns": string (describe any inefficient reasoning patterns you observe)
}"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
Reasoning efficiency metrics:
- Total LLM calls: {efficiency_metrics["total_llm_calls"]}
- Average tokens per call: {efficiency_metrics["avg_tokens_per_call"]:.1f}
- Primary reasoning pattern: {efficiency_metrics["reasoning_pattern"]}
- {loop_info}
{"- Average time between calls: {:.2f} seconds".format(efficiency_metrics.get("avg_time_between_calls", 0)) if "avg_time_between_calls" in efficiency_metrics else ""}
Sample of agent reasoning flow (chronological sequence):
{call_samples}
Agent's final output:
{final_output.raw[:500]}... (truncated)
Evaluate the reasoning efficiency of this agent based on these interaction patterns.
Identify any inefficient reasoning patterns and provide specific suggestions for optimization.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
scores = evaluation_data.get("scores", {})
focus = scores.get("focus", 5.0)
progression = scores.get("progression", 5.0)
decision_quality = scores.get("decision_quality", 5.0)
conciseness = scores.get("conciseness", 5.0)
loop_avoidance = scores.get("loop_avoidance", 5.0)
overall_score = evaluation_data.get("overall_score", evaluation_data.get("score", 5.0))
feedback = evaluation_data.get("feedback", "No detailed feedback provided.")
optimization_suggestions = evaluation_data.get("optimization_suggestions", "No specific suggestions provided.")
detailed_feedback = "Reasoning Efficiency Evaluation:\n"
detailed_feedback += f"• Focus: {focus}/10 - Staying on topic without tangents\n"
detailed_feedback += f"• Progression: {progression}/10 - Building on previous thinking\n"
detailed_feedback += f"• Decision Quality: {decision_quality}/10 - Making appropriate decisions\n"
detailed_feedback += f"• Conciseness: {conciseness}/10 - Communicating efficiently\n"
detailed_feedback += f"• Loop Avoidance: {loop_avoidance}/10 - Avoiding repetitive patterns\n\n"
detailed_feedback += f"Feedback:\n{feedback}\n\n"
detailed_feedback += f"Optimization Suggestions:\n{optimization_suggestions}"
return EvaluationScore(
score=float(overall_score),
feedback=detailed_feedback,
raw_response=response
)
except Exception as e:
logging.warning(f"Failed to parse reasoning efficiency evaluation: {e}")
return EvaluationScore(
score=None,
feedback=f"Failed to parse reasoning efficiency evaluation. Raw response: {response[:200]}...",
raw_response=response
)
def _detect_loops(self, llm_calls: List[Dict]) -> Tuple[bool, List[Dict]]:
loop_details = []
messages = []
for call in llm_calls:
content = call.get("response", "")
if isinstance(content, str):
messages.append(content)
elif isinstance(content, list) and len(content) > 0:
# Handle message list format
for msg in content:
if isinstance(msg, dict) and "content" in msg:
messages.append(msg["content"])
# Simple n-gram based similarity detection
# For a more robust implementation, consider using embedding-based similarity
for i in range(len(messages) - 2):
for j in range(i + 1, len(messages) - 1):
# Check for repeated patterns (simplistic approach)
# A more sophisticated approach would use semantic similarity
similarity = self._calculate_text_similarity(messages[i], messages[j])
if similarity > 0.7: # Arbitrary threshold
loop_details.append({
"first_occurrence": i,
"second_occurrence": j,
"similarity": similarity,
"snippet": messages[i][:100] + "..."
})
return len(loop_details) > 0, loop_details
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
text1 = re.sub(r'\s+', ' ', text1.lower()).strip()
text2 = re.sub(r'\s+', ' ', text2.lower()).strip()
# Simple Jaccard similarity on word sets
words1 = set(text1.split())
words2 = set(text2.split())
intersection = len(words1.intersection(words2))
union = len(words1.union(words2))
return intersection / union if union > 0 else 0.0
def _analyze_reasoning_patterns(self, llm_calls: List[Dict]) -> Dict[str, Any]:
call_lengths = []
response_times = []
for call in llm_calls:
content = call.get("response", "")
if isinstance(content, str):
call_lengths.append(len(content))
elif isinstance(content, list) and len(content) > 0:
# Handle message list format
total_length = 0
for msg in content:
if isinstance(msg, dict) and "content" in msg:
total_length += len(msg["content"])
call_lengths.append(total_length)
start_time = call.get("start_time")
end_time = call.get("end_time")
if start_time and end_time:
try:
response_times.append(end_time - start_time)
except Exception:
pass
avg_length = np.mean(call_lengths) if call_lengths else 0
std_length = np.std(call_lengths) if call_lengths else 0
length_trend = self._calculate_trend(call_lengths)
primary_pattern = ReasoningPatternType.EFFICIENT
details = "Agent demonstrates efficient reasoning patterns."
loop_score = self._calculate_loop_likelihood(call_lengths, response_times)
if loop_score > 0.7:
primary_pattern = ReasoningPatternType.LOOP
details = "Agent appears to be stuck in repetitive thinking patterns."
elif avg_length > 1000 and std_length / avg_length < 0.3:
primary_pattern = ReasoningPatternType.VERBOSE
details = "Agent is consistently verbose across interactions."
elif len(llm_calls) > 10 and length_trend > 0.5:
primary_pattern = ReasoningPatternType.INDECISIVE
details = "Agent shows signs of indecisiveness with increasing message lengths."
elif std_length / avg_length > 0.8:
primary_pattern = ReasoningPatternType.SCATTERED
details = "Agent shows inconsistent reasoning flow with highly variable responses."
return {
"primary_pattern": primary_pattern,
"details": details,
"metrics": {
"avg_length": avg_length,
"std_length": std_length,
"length_trend": length_trend,
"loop_score": loop_score
}
}
def _calculate_trend(self, values: Sequence[float | int]) -> float:
if not values or len(values) < 2:
return 0.0
try:
x = np.arange(len(values))
y = np.array(values)
# Simple linear regression
slope = np.polyfit(x, y, 1)[0]
# Normalize slope to -1 to 1 range
max_possible_slope = max(values) - min(values)
if max_possible_slope > 0:
normalized_slope = slope / max_possible_slope
return max(min(normalized_slope, 1.0), -1.0)
return 0.0
except Exception:
return 0.0
def _calculate_loop_likelihood(self, call_lengths: Sequence[float], response_times: Sequence[float]) -> float:
if not call_lengths or len(call_lengths) < 3:
return 0.0
indicators = []
if len(call_lengths) >= 4:
repeated_lengths = 0
for i in range(len(call_lengths) - 2):
ratio = call_lengths[i] / call_lengths[i + 2] if call_lengths[i + 2] > 0 else 0
if 0.85 <= ratio <= 1.15:
repeated_lengths += 1
length_repetition_score = repeated_lengths / (len(call_lengths) - 2)
indicators.append(length_repetition_score)
if response_times and len(response_times) >= 3:
try:
std_time = np.std(response_times)
mean_time = np.mean(response_times)
if mean_time > 0:
time_consistency = 1.0 - (std_time / mean_time)
indicators.append(max(0, time_consistency - 0.3) * 1.5)
except Exception:
pass
return np.mean(indicators) if indicators else 0.0
def _get_call_samples(self, llm_calls: List[Dict]) -> str:
samples = []
if len(llm_calls) <= 6:
sample_indices = list(range(len(llm_calls)))
else:
sample_indices = [0, 1, len(llm_calls) // 2 - 1, len(llm_calls) // 2,
len(llm_calls) - 2, len(llm_calls) - 1]
for idx in sample_indices:
call = llm_calls[idx]
content = call.get("response", "")
if isinstance(content, str):
sample = content
elif isinstance(content, list) and len(content) > 0:
sample_parts = []
for msg in content:
if isinstance(msg, dict) and "content" in msg:
sample_parts.append(msg["content"])
sample = "\n".join(sample_parts)
else:
sample = str(content)
truncated = sample[:200] + "..." if len(sample) > 200 else sample
samples.append(f"Call {idx + 1}:\n{truncated}\n")
return "\n".join(samples)

View File

@@ -0,0 +1,65 @@
from typing import Any, Dict
from crewai.agent import Agent
from crewai.task import Task
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.evaluation.json_parser import extract_json_from_llm_response
class SemanticQualityEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.SEMANTIC_QUALITY
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
) -> EvaluationScore:
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the semantic quality of an AI agent's output.
Score the semantic quality on a scale from 0-10 where:
- 0: Completely incoherent, confusing, or logically flawed output
- 5: Moderately clear and logical output with some issues
- 10: Exceptionally clear, coherent, and logically sound output
Consider:
1. Is the output well-structured and organized?
2. Is the reasoning logical and well-supported?
3. Is the language clear, precise, and appropriate for the task?
4. Are claims supported by evidence when appropriate?
5. Is the output free from contradictions and logical fallacies?
Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string).
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
Agent's final output:
{final_output}
Evaluate the semantic quality and reasoning of this output.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
assert evaluation_data is not None
return EvaluationScore(
score=float(evaluation_data["score"]) if evaluation_data.get("score") is not None else None,
feedback=evaluation_data.get("feedback", response),
raw_response=response
)
except Exception:
return EvaluationScore(
score=None,
feedback=f"Failed to parse evaluation. Raw response: {response}",
raw_response=response
)

View File

@@ -0,0 +1,400 @@
import json
from typing import Dict, Any
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.evaluation.json_parser import extract_json_from_llm_response
from crewai.agent import Agent
from crewai.task import Task
class ToolSelectionEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.TOOL_SELECTION
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
) -> EvaluationScore:
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
unique_tool_types = set([tool.get("tool", "Unknown tool") for tool in tool_uses])
if tool_count == 0:
if not agent.tools:
return EvaluationScore(
score=None,
feedback="Agent had no tools available to use."
)
else:
return EvaluationScore(
score=None,
feedback="Agent had tools available but didn't use any."
)
available_tools_info = ""
if agent.tools:
for tool in agent.tools:
available_tools_info += f"- {tool.name}: {tool.description}\n"
else:
available_tools_info = "No tools available"
tool_types_summary = "Tools selected by the agent:\n"
for tool_type in sorted(unique_tool_types):
tool_types_summary += f"- {tool_type}\n"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing if an AI agent selected the most appropriate tools for a given task.
You must evaluate based on these 2 criteria:
1. Relevance (0-10): Were the tools chosen directly aligned with the task's goals?
2. Coverage (0-10): Did the agent select ALL appropriate tools from the AVAILABLE tools?
IMPORTANT:
- ONLY consider tools that are listed as available to the agent
- DO NOT suggest tools that aren't in the 'Available tools' list
- DO NOT evaluate the quality or accuracy of tool outputs/results
- DO NOT evaluate how many times each tool was used
- DO NOT evaluate how the agent used the parameters
- DO NOT evaluate whether the agent interpreted the task correctly
Focus ONLY on whether the correct CATEGORIES of tools were selected from what was available.
Return your evaluation as JSON with these fields:
- scores: {"relevance": number, "coverage": number}
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on tool selection decisions from available tools)
- improvement_suggestions: string (ONLY suggest better selection from the AVAILABLE tools list, NOT new tools)
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
Available tools for this agent:
{available_tools_info}
{tool_types_summary}
Based ONLY on the task description and comparing the AVAILABLE tools with those that were selected (listed above), evaluate if the agent selected the appropriate tool types for this task.
IMPORTANT:
- ONLY evaluate selection from tools listed as available
- DO NOT suggest new tools that aren't in the available tools list
- DO NOT evaluate tool usage or results
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {})
relevance = scores.get("relevance", 5.0)
coverage = scores.get("coverage", 5.0)
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Tool Selection Evaluation:\n"
feedback += f"• Relevance: {relevance}/10 - Selection of appropriate tool types for the task\n"
feedback += f"• Coverage: {coverage}/10 - Selection of all necessary tool types\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating tool selection: {e}",
raw_response=response
)
class ParameterExtractionEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.PARAMETER_EXTRACTION
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
) -> EvaluationScore:
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
if tool_count == 0:
return EvaluationScore(
score=None,
feedback="No tool usage detected. Cannot evaluate parameter extraction."
)
validation_errors = []
for tool_use in tool_uses:
if not tool_use.get("success", True) and tool_use.get("error_type") == "validation_error":
validation_errors.append({
"tool": tool_use.get("tool", "Unknown tool"),
"error": tool_use.get("result"),
"args": tool_use.get("args", {})
})
validation_error_rate = len(validation_errors) / tool_count if tool_count > 0 else 0
param_samples = []
for i, tool_use in enumerate(tool_uses[:5]):
tool_name = tool_use.get("tool", "Unknown tool")
tool_args = tool_use.get("args", {})
success = tool_use.get("success", True) and not tool_use.get("error", False)
error_type = tool_use.get("error_type", "") if not success else ""
is_validation_error = error_type == "validation_error"
sample = f"Tool use #{i+1} - {tool_name}:\n"
sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n"
sample += f"- Success: {'No' if not success else 'Yes'}"
if is_validation_error:
sample += " (PARAMETER VALIDATION ERROR)\n"
sample += f"- Error: {tool_use.get('result', 'Unknown error')}"
elif not success:
sample += f" (Other error: {error_type})\n"
param_samples.append(sample)
validation_errors_info = ""
if validation_errors:
validation_errors_info = f"\nParameter validation errors detected: {len(validation_errors)} ({validation_error_rate:.1%} of tool uses)\n"
for i, err in enumerate(validation_errors[:3]):
tool_name = err.get("tool", "Unknown tool")
error_msg = err.get("error", "Unknown error")
args = err.get("args", {})
validation_errors_info += f"\nValidation Error #{i+1}:\n- Tool: {tool_name}\n- Args: {json.dumps(args, indent=2)}\n- Error: {error_msg}"
if len(validation_errors) > 3:
validation_errors_info += f"\n...and {len(validation_errors) - 3} more validation errors."
param_samples_text = "\n\n".join(param_samples)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent extracts and formats PARAMETER VALUES for tool calls.
Your job is to evaluate ONLY whether the agent used the correct parameter VALUES, not whether the right tools were selected or how the tools were invoked.
Evaluate parameter extraction based on these criteria:
1. Accuracy (0-10): Are parameter values correctly identified from the context/task?
2. Formatting (0-10): Are values formatted correctly for each tool's requirements?
3. Completeness (0-10): Are all required parameter values provided, with no missing information?
IMPORTANT: DO NOT evaluate:
- Whether the right tool was chosen (that's the ToolSelectionEvaluator's job)
- How the tools were structurally invoked (that's the ToolInvocationEvaluator's job)
- The quality of results from tools
Focus ONLY on the PARAMETER VALUES - whether they were correctly extracted from the context, properly formatted, and complete.
Validation errors are important signals that parameter values weren't properly extracted or formatted.
Return your evaluation as JSON with these fields:
- scores: {"accuracy": number, "formatting": number, "completeness": number}
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on parameter value extraction quality)
- improvement_suggestions: string (concrete suggestions for better parameter VALUE extraction)
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
Parameter extraction examples:
{param_samples_text}
{validation_errors_info}
Evaluate the quality of the agent's parameter extraction for this task.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {})
accuracy = scores.get("accuracy", 5.0)
formatting = scores.get("formatting", 5.0)
completeness = scores.get("completeness", 5.0)
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Parameter Extraction Evaluation:\n"
feedback += f"• Accuracy: {accuracy}/10 - Correctly identifying required parameters\n"
feedback += f"• Formatting: {formatting}/10 - Properly formatting parameters for tools\n"
feedback += f"• Completeness: {completeness}/10 - Including all necessary information\n\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating parameter extraction: {e}",
raw_response=response
)
class ToolInvocationEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.TOOL_INVOCATION
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
) -> EvaluationScore:
tool_uses = execution_trace.get("tool_uses", [])
tool_errors = []
tool_count = len(tool_uses)
if tool_count == 0:
return EvaluationScore(
score=None,
feedback="No tool usage detected. Cannot evaluate tool invocation."
)
for tool_use in tool_uses:
if not tool_use.get("success", True) or tool_use.get("error", False):
error_info = {
"tool": tool_use.get("tool", "Unknown tool"),
"error": tool_use.get("result"),
"error_type": tool_use.get("error_type", "unknown_error")
}
tool_errors.append(error_info)
error_rate = len(tool_errors) / tool_count if tool_count > 0 else 0
error_types = {}
for error in tool_errors:
error_type = error.get("error_type", "unknown_error")
if error_type not in error_types:
error_types[error_type] = 0
error_types[error_type] += 1
invocation_samples = []
for i, tool_use in enumerate(tool_uses[:5]):
tool_name = tool_use.get("tool", "Unknown tool")
tool_args = tool_use.get("args", {})
success = tool_use.get("success", True) and not tool_use.get("error", False)
error_type = tool_use.get("error_type", "") if not success else ""
error_msg = tool_use.get("result", "No error") if not success else "No error"
sample = f"Tool invocation #{i+1}:\n"
sample += f"- Tool: {tool_name}\n"
sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n"
sample += f"- Success: {'No' if not success else 'Yes'}\n"
if not success:
sample += f"- Error type: {error_type}\n"
sample += f"- Error: {error_msg}"
invocation_samples.append(sample)
error_type_summary = ""
if error_types:
error_type_summary = "Error type breakdown:\n"
for error_type, count in error_types.items():
error_type_summary += f"- {error_type}: {count} occurrences ({(count/tool_count):.1%})\n"
invocation_samples_text = "\n\n".join(invocation_samples)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how correctly an AI agent's tool invocations are STRUCTURED.
Your job is to evaluate ONLY the structural and syntactical aspects of how the agent called tools, NOT which tools were selected or what parameter values were used.
Evaluate the agent's tool invocation based on these criteria:
1. Structure (0-10): Does the tool call follow the expected syntax and format?
2. Error Handling (0-10): Does the agent handle tool errors appropriately?
3. Invocation Patterns (0-10): Are tool calls properly sequenced, batched, or managed?
Error types that indicate invocation issues:
- execution_error: The tool was called correctly but failed during execution
- usage_error: General errors in how the tool was used structurally
IMPORTANT: DO NOT evaluate:
- Whether the right tool was chosen (that's the ToolSelectionEvaluator's job)
- Whether the parameter values are correct (that's the ParameterExtractionEvaluator's job)
- The quality of results from tools
Focus ONLY on HOW tools were invoked - the structure, format, and handling of the invocation process.
Return your evaluation as JSON with these fields:
- scores: {"structure": number, "error_handling": number, "invocation_patterns": number}
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on structural aspects of tool invocation)
- improvement_suggestions: string (concrete suggestions for better structuring of tool calls)
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
Tool invocation examples:
{invocation_samples_text}
Tool error rate: {error_rate:.2%} ({len(tool_errors)} errors out of {tool_count} invocations)
{error_type_summary}
Evaluate the quality of the agent's tool invocation structure during this task.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {})
structure = scores.get("structure", 5.0)
error_handling = scores.get("error_handling", 5.0)
invocation_patterns = scores.get("invocation_patterns", 5.0)
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Tool Invocation Evaluation:\n"
feedback += f"• Structure: {structure}/10 - Following proper syntax and format\n"
feedback += f"• Error Handling: {error_handling}/10 - Appropriately handling tool errors\n"
feedback += f"• Invocation Patterns: {invocation_patterns}/10 - Proper sequencing and management of calls\n\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating tool invocation: {e}",
raw_response=response
)

View File

@@ -537,6 +537,7 @@ class LiteAgent(FlowTrackable, BaseModel):
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(
messages=self._messages,
response=answer,
call_type=LLMCallType.LLM_CALL,
from_agent=self,

View File

@@ -508,7 +508,6 @@ class LLM(BaseLLM):
# Enable tool calls using streaming
if "tool_calls" in delta:
tool_calls = delta["tool_calls"]
if tool_calls:
result = self._handle_streaming_tool_calls(
tool_calls=tool_calls,
@@ -517,6 +516,7 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
)
if result is not None:
chunk_content = result
@@ -631,7 +631,7 @@ class LLM(BaseLLM):
# Log token usage if available in streaming mode
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# Emit completion event and return response
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
self._handle_emit_call_events(response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
return full_response
# --- 9) Handle tool calls if present
@@ -643,7 +643,7 @@ class LLM(BaseLLM):
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# --- 11) Emit completion event and return response
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
self._handle_emit_call_events(response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
return full_response
except ContextWindowExceededError as e:
@@ -655,7 +655,7 @@ class LLM(BaseLLM):
logging.error(f"Error in streaming response: {str(e)}")
if full_response.strip():
logging.warning(f"Returning partial response despite error: {str(e)}")
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
self._handle_emit_call_events(response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
return full_response
# Emit failed event and re-raise the exception
@@ -809,7 +809,7 @@ class LLM(BaseLLM):
# --- 5) If no tool calls or no available functions, return the text response directly
if not tool_calls or not available_functions:
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task, from_agent)
self._handle_emit_call_events(response=text_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
return text_response
# --- 6) Handle tool calls if present
@@ -818,7 +818,7 @@ class LLM(BaseLLM):
return tool_result
# --- 7) If tool call handling didn't return a result, emit completion event and return text response
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task, from_agent)
self._handle_emit_call_events(response=text_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
return text_response
def _handle_tool_call(
@@ -861,6 +861,7 @@ class LLM(BaseLLM):
tool_args=function_args,
),
)
result = fn(**function_args)
crewai_event_bus.emit(
self,
@@ -874,7 +875,7 @@ class LLM(BaseLLM):
)
# --- 3.3) Emit success event
self._handle_emit_call_events(result, LLMCallType.TOOL_CALL)
self._handle_emit_call_events(response=result, call_type=LLMCallType.TOOL_CALL)
return result
except Exception as e:
# --- 3.4) Handle execution errors
@@ -991,17 +992,20 @@ class LLM(BaseLLM):
logging.error(f"LiteLLM call failed: {str(e)}")
raise
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None, from_agent: Optional[Any] = None):
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None, from_agent: Optional[Any] = None, messages: str | list[dict[str, Any]] | None = None):
"""Handle the events for the LLM call.
Args:
response (str): The response from the LLM call.
call_type (str): The type of call, either "tool_call" or "llm_call".
from_task: Optional task object
from_agent: Optional agent object
messages: Optional messages object
"""
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(response=response, call_type=call_type, from_task=from_task, from_agent=from_agent),
event=LLMCallCompletedEvent(messages=messages, response=response, call_type=call_type, from_task=from_task, from_agent=from_agent),
)
def _format_messages_for_provider(

View File

@@ -155,6 +155,7 @@ class CrewEvaluator:
)
console = Console()
console.print("\n")
console.print(table)
def evaluate(self, task_output: TaskOutput):

View File

@@ -48,8 +48,8 @@ class LLMCallStartedEvent(LLMEventBase):
"""
type: str = "llm_call_started"
messages: Union[str, List[Dict[str, Any]]]
tools: Optional[List[dict]] = None
messages: Optional[Union[str, List[Dict[str, Any]]]] = None
tools: Optional[List[dict[str, Any]]] = None
callbacks: Optional[List[Any]] = None
available_functions: Optional[Dict[str, Any]] = None
@@ -58,10 +58,10 @@ class LLMCallCompletedEvent(LLMEventBase):
"""Event emitted when a LLM call completes"""
type: str = "llm_call_completed"
messages: str | list[dict[str, Any]] | None = None
response: Any
call_type: LLMCallType
class LLMCallFailedEvent(LLMEventBase):
"""Event emitted when a LLM call fails"""

File diff suppressed because one or more lines are too long

View File

View File

View File

@@ -0,0 +1,28 @@
import pytest
from unittest.mock import MagicMock
from crewai.agent import Agent
from crewai.task import Task
class BaseEvaluationMetricsTest:
@pytest.fixture
def mock_agent(self):
agent = MagicMock(spec=Agent)
agent.id = "test_agent_id"
agent.role = "Test Agent"
agent.goal = "Test goal"
agent.tools = []
return agent
@pytest.fixture
def mock_task(self):
task = MagicMock(spec=Task)
task.description = "Test task description"
task.expected_output = "Test expected output"
return task
@pytest.fixture
def execution_trace(self):
return {
"thinking": ["I need to analyze this data carefully"],
"actions": ["Gathered information", "Analyzed data"]
}

View File

@@ -0,0 +1,59 @@
from unittest.mock import patch, MagicMock
from tests.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from crewai.evaluation.base_evaluator import EvaluationScore
from crewai.evaluation.metrics.goal_metrics import GoalAlignmentEvaluator
from crewai.utilities.llm_utils import LLM
class TestGoalAlignmentEvaluator(BaseEvaluationMetricsTest):
@patch("crewai.utilities.llm_utils.create_llm")
def test_evaluate_success(self, mock_create_llm, mock_agent, mock_task, execution_trace):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"score": 8.5,
"feedback": "The agent correctly understood the task and produced relevant output."
}
"""
mock_create_llm.return_value = mock_llm
evaluator = GoalAlignmentEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="This is the final output"
)
assert isinstance(result, EvaluationScore)
assert result.score == 8.5
assert "correctly understood the task" in result.feedback
mock_llm.call.assert_called_once()
prompt = mock_llm.call.call_args[0][0]
assert len(prompt) >= 2
assert "system" in prompt[0]["role"]
assert "user" in prompt[1]["role"]
assert mock_agent.role in prompt[1]["content"]
assert mock_task.description in prompt[1]["content"]
@patch("crewai.utilities.llm_utils.create_llm")
def test_evaluate_error_handling(self, mock_create_llm, mock_agent, mock_task, execution_trace):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = "Invalid JSON response"
mock_create_llm.return_value = mock_llm
evaluator = GoalAlignmentEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="This is the final output"
)
assert isinstance(result, EvaluationScore)
assert result.score is None
assert "Failed to parse" in result.feedback

View File

@@ -0,0 +1,166 @@
import pytest
from unittest.mock import patch, MagicMock
from typing import List, Dict, Any
from crewai.tasks.task_output import TaskOutput
from crewai.evaluation.metrics.reasoning_metrics import (
ReasoningEfficiencyEvaluator,
)
from tests.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from crewai.utilities.llm_utils import LLM
from crewai.evaluation.base_evaluator import EvaluationScore
class TestReasoningEfficiencyEvaluator(BaseEvaluationMetricsTest):
@pytest.fixture
def mock_output(self):
output = MagicMock(spec=TaskOutput)
output.raw = "This is the test output"
return output
@pytest.fixture
def llm_calls(self) -> List[Dict[str, Any]]:
return [
{
"prompt": "How should I approach this task?",
"response": "I'll first research the topic, then compile findings.",
"timestamp": 1626987654
},
{
"prompt": "What resources should I use?",
"response": "I'll use relevant academic papers and reliable websites.",
"timestamp": 1626987754
},
{
"prompt": "How should I structure the output?",
"response": "I'll organize information clearly with headings and bullet points.",
"timestamp": 1626987854
}
]
def test_insufficient_llm_calls(self, mock_agent, mock_task, mock_output):
execution_trace = {"llm_calls": []}
evaluator = ReasoningEfficiencyEvaluator()
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output=mock_output
)
assert isinstance(result, EvaluationScore)
assert result.score is None
assert "Insufficient LLM calls" in result.feedback
@patch("crewai.utilities.llm_utils.create_llm")
def test_successful_evaluation(self, mock_create_llm, mock_agent, mock_task, mock_output, llm_calls):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"scores": {
"focus": 8.0,
"progression": 7.0,
"decision_quality": 7.5,
"conciseness": 8.0,
"loop_avoidance": 9.0
},
"overall_score": 7.9,
"feedback": "The agent demonstrated good reasoning efficiency.",
"optimization_suggestions": "The agent could improve by being more concise."
}
"""
mock_create_llm.return_value = mock_llm
# Setup execution trace with sufficient LLM calls
execution_trace = {"llm_calls": llm_calls}
# Mock the _detect_loops method to return a simple result
evaluator = ReasoningEfficiencyEvaluator(llm=mock_llm)
evaluator._detect_loops = MagicMock(return_value=(False, []))
# Evaluate
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output=mock_output
)
# Assertions
assert isinstance(result, EvaluationScore)
assert result.score == 7.9
assert "The agent demonstrated good reasoning efficiency" in result.feedback
assert "Reasoning Efficiency Evaluation:" in result.feedback
assert "• Focus: 8.0/10" in result.feedback
# Verify LLM was called
mock_llm.call.assert_called_once()
@patch("crewai.utilities.llm_utils.create_llm")
def test_parse_error_handling(self, mock_create_llm, mock_agent, mock_task, mock_output, llm_calls):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = "Invalid JSON response"
mock_create_llm.return_value = mock_llm
# Setup execution trace
execution_trace = {"llm_calls": llm_calls}
# Mock the _detect_loops method
evaluator = ReasoningEfficiencyEvaluator(llm=mock_llm)
evaluator._detect_loops = MagicMock(return_value=(False, []))
# Evaluate
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output=mock_output
)
# Assertions for error handling
assert isinstance(result, EvaluationScore)
assert result.score is None
assert "Failed to parse reasoning efficiency evaluation" in result.feedback
@patch("crewai.utilities.llm_utils.create_llm")
def test_loop_detection(self, mock_create_llm, mock_agent, mock_task, mock_output):
# Setup LLM calls with a repeating pattern
repetitive_llm_calls = [
{"prompt": "How to solve?", "response": "I'll try method A", "timestamp": 1000},
{"prompt": "Let me try method A", "response": "It didn't work", "timestamp": 1100},
{"prompt": "How to solve?", "response": "I'll try method A again", "timestamp": 1200},
{"prompt": "Let me try method A", "response": "It didn't work", "timestamp": 1300},
{"prompt": "How to solve?", "response": "I'll try method A one more time", "timestamp": 1400}
]
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"scores": {
"focus": 6.0,
"progression": 3.0,
"decision_quality": 4.0,
"conciseness": 6.0,
"loop_avoidance": 2.0
},
"overall_score": 4.2,
"feedback": "The agent is stuck in a reasoning loop.",
"optimization_suggestions": "The agent should try different approaches when one fails."
}
"""
mock_create_llm.return_value = mock_llm
execution_trace = {"llm_calls": repetitive_llm_calls}
evaluator = ReasoningEfficiencyEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output=mock_output
)
assert isinstance(result, EvaluationScore)
assert result.score == 4.2
assert "• Loop Avoidance: 2.0/10" in result.feedback

View File

@@ -0,0 +1,82 @@
from unittest.mock import patch, MagicMock
from crewai.evaluation.base_evaluator import EvaluationScore
from crewai.evaluation.metrics.semantic_quality_metrics import SemanticQualityEvaluator
from tests.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from crewai.utilities.llm_utils import LLM
class TestSemanticQualityEvaluator(BaseEvaluationMetricsTest):
@patch("crewai.utilities.llm_utils.create_llm")
def test_evaluate_success(self, mock_create_llm, mock_agent, mock_task, execution_trace):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"score": 8.5,
"feedback": "The output is clear, coherent, and logically structured."
}
"""
mock_create_llm.return_value = mock_llm
evaluator = SemanticQualityEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="This is a well-structured analysis of the data."
)
assert isinstance(result, EvaluationScore)
assert result.score == 8.5
assert "clear, coherent" in result.feedback
mock_llm.call.assert_called_once()
prompt = mock_llm.call.call_args[0][0]
assert len(prompt) >= 2
assert "system" in prompt[0]["role"]
assert "user" in prompt[1]["role"]
assert mock_agent.role in prompt[1]["content"]
assert mock_task.description in prompt[1]["content"]
@patch("crewai.utilities.llm_utils.create_llm")
def test_evaluate_with_empty_output(self, mock_create_llm, mock_agent, mock_task, execution_trace):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"score": 2.0,
"feedback": "The output is empty or minimal, lacking substance."
}
"""
mock_create_llm.return_value = mock_llm
evaluator = SemanticQualityEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output=""
)
assert isinstance(result, EvaluationScore)
assert result.score == 2.0
assert "empty or minimal" in result.feedback
@patch("crewai.utilities.llm_utils.create_llm")
def test_evaluate_error_handling(self, mock_create_llm, mock_agent, mock_task, execution_trace):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = "Invalid JSON response"
mock_create_llm.return_value = mock_llm
evaluator = SemanticQualityEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="This is the output."
)
assert isinstance(result, EvaluationScore)
assert result.score is None
assert "Failed to parse" in result.feedback

View File

@@ -0,0 +1,230 @@
from unittest.mock import patch, MagicMock
from crewai.evaluation.metrics.tools_metrics import (
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.utilities.llm_utils import LLM
from tests.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
class TestToolSelectionEvaluator(BaseEvaluationMetricsTest):
def test_no_tools_available(self, mock_task, mock_agent):
# Create agent with no tools
mock_agent.tools = []
execution_trace = {"tool_uses": []}
evaluator = ToolSelectionEvaluator()
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score is None
assert "no tools available" in result.feedback.lower()
def test_tools_available_but_none_used(self, mock_agent, mock_task):
mock_agent.tools = ["tool1", "tool2"]
execution_trace = {"tool_uses": []}
evaluator = ToolSelectionEvaluator()
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score is None
assert "had tools available but didn't use any" in result.feedback.lower()
@patch("crewai.utilities.llm_utils.create_llm")
def test_successful_evaluation(self, mock_create_llm, mock_agent, mock_task):
# Setup mock LLM response
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"overall_score": 8.5,
"feedback": "The agent made good tool selections."
}
"""
mock_create_llm.return_value = mock_llm
# Setup execution trace with tool uses
execution_trace = {
"tool_uses": [
{"tool": "search_tool", "input": {"query": "test query"}, "output": "search results"},
{"tool": "calculator", "input": {"expression": "2+2"}, "output": "4"}
]
}
evaluator = ToolSelectionEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score == 8.5
assert "The agent made good tool selections" in result.feedback
# Verify LLM was called with correct prompt
mock_llm.call.assert_called_once()
prompt = mock_llm.call.call_args[0][0]
assert isinstance(prompt, list)
assert len(prompt) >= 2
assert "system" in prompt[0]["role"]
assert "user" in prompt[1]["role"]
class TestParameterExtractionEvaluator(BaseEvaluationMetricsTest):
def test_no_tool_uses(self, mock_agent, mock_task):
execution_trace = {"tool_uses": []}
evaluator = ParameterExtractionEvaluator()
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score is None
assert "no tool usage" in result.feedback.lower()
@patch("crewai.utilities.llm_utils.create_llm")
def test_successful_evaluation(self, mock_create_llm, mock_agent, mock_task):
mock_agent.tools = ["tool1", "tool2"]
# Setup mock LLM response
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"overall_score": 9.0,
"feedback": "The agent extracted parameters correctly."
}
"""
mock_create_llm.return_value = mock_llm
# Setup execution trace with tool uses
execution_trace = {
"tool_uses": [
{
"tool": "search_tool",
"input": {"query": "test query"},
"output": "search results",
"error": None
},
{
"tool": "calculator",
"input": {"expression": "2+2"},
"output": "4",
"error": None
}
]
}
evaluator = ParameterExtractionEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score == 9.0
assert "The agent extracted parameters correctly" in result.feedback
class TestToolInvocationEvaluator(BaseEvaluationMetricsTest):
def test_no_tool_uses(self, mock_agent, mock_task):
execution_trace = {"tool_uses": []}
evaluator = ToolInvocationEvaluator()
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score is None
assert "no tool usage" in result.feedback.lower()
@patch("crewai.utilities.llm_utils.create_llm")
def test_successful_evaluation(self, mock_create_llm, mock_agent, mock_task):
mock_agent.tools = ["tool1", "tool2"]
# Setup mock LLM response
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"overall_score": 8.0,
"feedback": "The agent invoked tools correctly."
}
"""
mock_create_llm.return_value = mock_llm
# Setup execution trace with tool uses
execution_trace = {
"tool_uses": [
{"tool": "search_tool", "input": {"query": "test query"}, "output": "search results"},
{"tool": "calculator", "input": {"expression": "2+2"}, "output": "4"}
]
}
evaluator = ToolInvocationEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score == 8.0
assert "The agent invoked tools correctly" in result.feedback
@patch("crewai.utilities.llm_utils.create_llm")
def test_evaluation_with_errors(self, mock_create_llm, mock_agent, mock_task):
mock_agent.tools = ["tool1", "tool2"]
# Setup mock LLM response
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"overall_score": 5.5,
"feedback": "The agent had some errors in tool invocation."
}
"""
mock_create_llm.return_value = mock_llm
# Setup execution trace with tool uses including errors
execution_trace = {
"tool_uses": [
{
"tool": "search_tool",
"input": {"query": "test query"},
"output": "search results",
"error": None
},
{
"tool": "calculator",
"input": {"expression": "2+"},
"output": None,
"error": "Invalid expression"
}
]
}
evaluator = ToolInvocationEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score == 5.5
assert "The agent had some errors in tool invocation" in result.feedback

View File

@@ -0,0 +1,95 @@
import pytest
from crewai.agent import Agent
from crewai.task import Task
from crewai.crew import Crew
from crewai.evaluation.agent_evaluator import AgentEvaluator
from crewai.evaluation.base_evaluator import AgentEvaluationResult
from crewai.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
)
from crewai.evaluation import create_default_evaluator
class TestAgentEvaluator:
@pytest.fixture
def mock_crew(self):
agent = Agent(
role="Test Agent",
goal="Complete test tasks successfully",
backstory="An agent created for testing purposes",
allow_delegation=False,
verbose=False
)
task = Task(
description="Test task description",
agent=agent,
expected_output="Expected test output"
)
crew = Crew(
agents=[agent],
tasks=[task]
)
return crew
def test_set_iteration(self):
agent_evaluator = AgentEvaluator()
agent_evaluator.set_iteration(3)
assert agent_evaluator.iteration == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_evaluate_current_iteration(self, mock_crew):
agent_evaluator = AgentEvaluator(crew=mock_crew, evaluators=[GoalAlignmentEvaluator()])
mock_crew.kickoff()
results = agent_evaluator.evaluate_current_iteration()
assert isinstance(results, dict)
agent, = mock_crew.agents
task, = mock_crew.tasks
assert len(mock_crew.agents) == 1
assert agent.role in results
assert len(results[agent.role]) == 1
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == str(task.id)
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 5.0
expected_feedback = "The agent's output demonstrates an understanding of the need for a comprehensive document"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 5' in goal_alignment.raw_response
def test_create_default_evaluator(self, mock_crew):
agent_evaluator = create_default_evaluator(crew=mock_crew)
assert isinstance(agent_evaluator, AgentEvaluator)
assert agent_evaluator.crew == mock_crew
expected_types = [
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
]
assert len(agent_evaluator.evaluators) == len(expected_types)
for evaluator, expected_type in zip(agent_evaluator.evaluators, expected_types):
assert isinstance(evaluator, expected_type)

View File

@@ -601,7 +601,7 @@ def test_handle_streaming_tool_calls(get_weather_tool_schema, mock_emit):
def test_handle_streaming_tool_calls_with_error(get_weather_tool_schema, mock_emit):
def get_weather_error(location):
raise Exception("Error")
llm = LLM(model="openai/gpt-4o", stream=True)
response = llm.call(
messages=[
@@ -619,7 +619,7 @@ def test_handle_streaming_tool_calls_with_error(get_weather_tool_schema, mock_em
expected_stream_chunk=9,
expected_completed_llm_call=1,
expected_tool_usage_started=1,
expected_tool_usage_error=1,
expected_tool_usage_error=1,
expected_final_chunk_result=expected_final_chunk_result,
)