mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 12:28:30 +00:00
Compare commits
12 Commits
1.6.1
...
lg-agent-e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
064997464e | ||
|
|
6f0ed6642b | ||
|
|
43f339fa84 | ||
|
|
5ea221e54e | ||
|
|
d4c15ec25f | ||
|
|
37cfbe7389 | ||
|
|
6d7c7d940e | ||
|
|
80bd23a8a9 | ||
|
|
50593d1485 | ||
|
|
60084af745 | ||
|
|
be4ade8c45 | ||
|
|
6a49a24810 |
@@ -1313,6 +1313,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
n_iterations: int,
|
||||
eval_llm: Union[str, InstanceOf[BaseLLM]],
|
||||
inputs: Optional[Dict[str, Any]] = None,
|
||||
include_agent_eval: Optional[bool] = False
|
||||
) -> None:
|
||||
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
|
||||
try:
|
||||
@@ -1331,13 +1332,29 @@ class Crew(FlowTrackable, BaseModel):
|
||||
),
|
||||
)
|
||||
test_crew = self.copy()
|
||||
|
||||
# TODO: Refator to use a single Evaluator Manage class
|
||||
evaluator = CrewEvaluator(test_crew, llm_instance)
|
||||
|
||||
if include_agent_eval:
|
||||
from crewai.evaluation import create_default_evaluator
|
||||
agent_evaluator = create_default_evaluator(crew=test_crew)
|
||||
|
||||
for i in range(1, n_iterations + 1):
|
||||
evaluator.set_iteration(i)
|
||||
|
||||
if include_agent_eval:
|
||||
agent_evaluator.set_iteration(i)
|
||||
|
||||
test_crew.kickoff(inputs=inputs)
|
||||
|
||||
# TODO: Refactor to use ListenerEvents instead of trigger each iteration manually
|
||||
if include_agent_eval:
|
||||
agent_evaluator.evaluate_current_iteration()
|
||||
|
||||
evaluator.print_crew_evaluation_result()
|
||||
if include_agent_eval:
|
||||
agent_evaluator.get_agent_evaluation(include_evaluation_feedback=True)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
|
||||
53
src/crewai/evaluation/__init__.py
Normal file
53
src/crewai/evaluation/__init__.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from crewai.evaluation.base_evaluator import (
|
||||
BaseEvaluator,
|
||||
EvaluationScore,
|
||||
MetricCategory,
|
||||
AgentEvaluationResult
|
||||
)
|
||||
|
||||
from crewai.evaluation.metrics.semantic_quality_metrics import (
|
||||
SemanticQualityEvaluator
|
||||
)
|
||||
|
||||
from crewai.evaluation.metrics.goal_metrics import (
|
||||
GoalAlignmentEvaluator
|
||||
)
|
||||
|
||||
from crewai.evaluation.metrics.reasoning_metrics import (
|
||||
ReasoningEfficiencyEvaluator
|
||||
)
|
||||
|
||||
|
||||
from crewai.evaluation.metrics.tools_metrics import (
|
||||
ToolSelectionEvaluator,
|
||||
ParameterExtractionEvaluator,
|
||||
ToolInvocationEvaluator
|
||||
)
|
||||
|
||||
from crewai.evaluation.evaluation_listener import (
|
||||
EvaluationTraceCallback,
|
||||
create_evaluation_callbacks
|
||||
)
|
||||
|
||||
|
||||
from crewai.evaluation.agent_evaluator import (
|
||||
AgentEvaluator,
|
||||
create_default_evaluator
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"BaseEvaluator",
|
||||
"EvaluationScore",
|
||||
"MetricCategory",
|
||||
"AgentEvaluationResult",
|
||||
"SemanticQualityEvaluator",
|
||||
"GoalAlignmentEvaluator",
|
||||
"ReasoningEfficiencyEvaluator",
|
||||
"ToolSelectionEvaluator",
|
||||
"ParameterExtractionEvaluator",
|
||||
"ToolInvocationEvaluator",
|
||||
"EvaluationTraceCallback",
|
||||
"create_evaluation_callbacks",
|
||||
"AgentEvaluator",
|
||||
"create_default_evaluator"
|
||||
]
|
||||
178
src/crewai/evaluation/agent_evaluator.py
Normal file
178
src/crewai/evaluation/agent_evaluator.py
Normal file
@@ -0,0 +1,178 @@
|
||||
from crewai.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
from crewai.evaluation.evaluation_display import EvaluationDisplayFormatter
|
||||
|
||||
from typing import Any, Dict
|
||||
from collections import defaultdict
|
||||
from crewai.evaluation import BaseEvaluator, create_evaluation_callbacks
|
||||
from collections.abc import Sequence
|
||||
from crewai.crew import Crew
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
|
||||
|
||||
class AgentEvaluator:
|
||||
def __init__(
|
||||
self,
|
||||
evaluators: Sequence[BaseEvaluator] | None = None,
|
||||
crew: Crew | None = None,
|
||||
):
|
||||
self.crew: Crew | None = crew
|
||||
self.evaluators: Sequence[BaseEvaluator] | None = evaluators
|
||||
|
||||
self.agent_evaluators: dict[str, Sequence[BaseEvaluator] | None] = {}
|
||||
if crew is not None:
|
||||
assert crew and crew.agents is not None
|
||||
for agent in crew.agents:
|
||||
self.agent_evaluators[str(agent.id)] = self.evaluators
|
||||
|
||||
self.callback = create_evaluation_callbacks()
|
||||
self.console_formatter = ConsoleFormatter()
|
||||
self.display_formatter = EvaluationDisplayFormatter()
|
||||
|
||||
self.iteration = 1
|
||||
self.iterations_results: dict[int, dict[str, list[AgentEvaluationResult]]] = {}
|
||||
|
||||
def set_iteration(self, iteration: int) -> None:
|
||||
self.iteration = iteration
|
||||
|
||||
def evaluate_current_iteration(self) -> dict[str, list[AgentEvaluationResult]]:
|
||||
if not self.crew:
|
||||
raise ValueError("Cannot evaluate: no crew was provided to the evaluator.")
|
||||
|
||||
if not self.callback:
|
||||
raise ValueError("Cannot evaluate: no callback was set. Use set_callback() method first.")
|
||||
|
||||
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
|
||||
evaluation_results: defaultdict[str, list[AgentEvaluationResult]] = defaultdict(list)
|
||||
|
||||
total_evals = 0
|
||||
for agent in self.crew.agents:
|
||||
for task in self.crew.tasks:
|
||||
if task.agent and task.agent.id == agent.id and self.agent_evaluators.get(str(agent.id)):
|
||||
total_evals += 1
|
||||
|
||||
with Progress(
|
||||
SpinnerColumn(),
|
||||
TextColumn("[bold blue]{task.description}[/bold blue]"),
|
||||
BarColumn(),
|
||||
TextColumn("{task.percentage:.0f}% completed"),
|
||||
console=self.console_formatter.console
|
||||
) as progress:
|
||||
eval_task = progress.add_task(f"Evaluating agents (iteration {self.iteration})...", total=total_evals)
|
||||
|
||||
for agent in self.crew.agents:
|
||||
evaluator = self.agent_evaluators.get(str(agent.id))
|
||||
if not evaluator:
|
||||
continue
|
||||
|
||||
for task in self.crew.tasks:
|
||||
|
||||
if task.agent and str(task.agent.id) != str(agent.id):
|
||||
continue
|
||||
|
||||
trace = self.callback.get_trace(str(agent.id), str(task.id))
|
||||
if not trace:
|
||||
self.console_formatter.print(f"[yellow]Warning: No trace found for agent {agent.role} on task {task.description[:30]}...[/yellow]")
|
||||
progress.update(eval_task, advance=1)
|
||||
continue
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
result = self.evaluate(
|
||||
agent=agent,
|
||||
task=task,
|
||||
execution_trace=trace,
|
||||
final_output=task.output
|
||||
)
|
||||
evaluation_results[agent.role].append(result)
|
||||
progress.update(eval_task, advance=1)
|
||||
|
||||
self.iterations_results[self.iteration] = evaluation_results
|
||||
return evaluation_results
|
||||
|
||||
def get_evaluation_results(self):
|
||||
if self.iteration in self.iterations_results:
|
||||
return self.iterations_results[self.iteration]
|
||||
|
||||
return self.evaluate_current_iteration()
|
||||
|
||||
def display_results_with_iterations(self):
|
||||
self.display_formatter.display_summary_results(self.iterations_results)
|
||||
|
||||
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = False):
|
||||
agent_results = {}
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
task_results = self.get_evaluation_results()
|
||||
for agent_role, results in task_results.items():
|
||||
if not results:
|
||||
continue
|
||||
|
||||
agent_id = results[0].agent_id
|
||||
|
||||
aggregated_result = self.display_formatter._aggregate_agent_results(
|
||||
agent_id=agent_id,
|
||||
agent_role=agent_role,
|
||||
results=results,
|
||||
strategy=strategy
|
||||
)
|
||||
|
||||
agent_results[agent_role] = aggregated_result
|
||||
|
||||
|
||||
if self.iteration == max(self.iterations_results.keys()):
|
||||
self.display_results_with_iterations()
|
||||
|
||||
if include_evaluation_feedback:
|
||||
self.display_evaluation_with_feedback()
|
||||
|
||||
return agent_results
|
||||
|
||||
def display_evaluation_with_feedback(self):
|
||||
self.display_formatter.display_evaluation_with_feedback(self.iterations_results)
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
agent: Agent,
|
||||
task: Task,
|
||||
execution_trace: Dict[str, Any],
|
||||
final_output: Any
|
||||
) -> AgentEvaluationResult:
|
||||
result = AgentEvaluationResult(
|
||||
agent_id=str(agent.id),
|
||||
task_id=str(task.id)
|
||||
)
|
||||
assert self.evaluators is not None
|
||||
for evaluator in self.evaluators:
|
||||
try:
|
||||
score = evaluator.evaluate(
|
||||
agent=agent,
|
||||
task=task,
|
||||
execution_trace=execution_trace,
|
||||
final_output=final_output
|
||||
)
|
||||
result.metrics[evaluator.metric_category] = score
|
||||
except Exception as e:
|
||||
self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}")
|
||||
|
||||
return result
|
||||
|
||||
def create_default_evaluator(crew, llm=None):
|
||||
from crewai.evaluation import (
|
||||
GoalAlignmentEvaluator,
|
||||
SemanticQualityEvaluator,
|
||||
ToolSelectionEvaluator,
|
||||
ParameterExtractionEvaluator,
|
||||
ToolInvocationEvaluator,
|
||||
ReasoningEfficiencyEvaluator
|
||||
)
|
||||
|
||||
evaluators = [
|
||||
GoalAlignmentEvaluator(llm=llm),
|
||||
SemanticQualityEvaluator(llm=llm),
|
||||
ToolSelectionEvaluator(llm=llm),
|
||||
ParameterExtractionEvaluator(llm=llm),
|
||||
ToolInvocationEvaluator(llm=llm),
|
||||
ReasoningEfficiencyEvaluator(llm=llm),
|
||||
]
|
||||
|
||||
return AgentEvaluator(evaluators=evaluators, crew=crew)
|
||||
125
src/crewai/evaluation/base_evaluator.py
Normal file
125
src/crewai/evaluation/base_evaluator.py
Normal file
@@ -0,0 +1,125 @@
|
||||
import abc
|
||||
import enum
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
from crewai.llm import BaseLLM
|
||||
from crewai.utilities.llm_utils import create_llm
|
||||
|
||||
class MetricCategory(enum.Enum):
|
||||
GOAL_ALIGNMENT = "goal_alignment"
|
||||
SEMANTIC_QUALITY = "semantic_quality"
|
||||
REASONING_EFFICIENCY = "reasoning_efficiency"
|
||||
TOOL_SELECTION = "tool_selection"
|
||||
PARAMETER_EXTRACTION = "parameter_extraction"
|
||||
TOOL_INVOCATION = "tool_invocation"
|
||||
|
||||
def title(self):
|
||||
return self.value.replace('_', ' ').title()
|
||||
|
||||
|
||||
class EvaluationScore(BaseModel):
|
||||
score: float | None = Field(
|
||||
default=5.0,
|
||||
description="Numeric score from 0-10 where 0 is worst and 10 is best, None if not applicable",
|
||||
ge=0.0,
|
||||
le=10.0
|
||||
)
|
||||
feedback: str = Field(
|
||||
default="",
|
||||
description="Detailed feedback explaining the evaluation score"
|
||||
)
|
||||
raw_response: str | None = Field(
|
||||
default=None,
|
||||
description="Raw response from the evaluator (e.g., LLM)"
|
||||
)
|
||||
|
||||
def __str__(self) -> str:
|
||||
if self.score is None:
|
||||
return f"Score: N/A - {self.feedback}"
|
||||
return f"Score: {self.score:.1f}/10 - {self.feedback}"
|
||||
|
||||
|
||||
class BaseEvaluator(abc.ABC):
|
||||
def __init__(self, llm: BaseLLM | None = None):
|
||||
self.llm: BaseLLM | None = create_llm(llm)
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def metric_category(self) -> MetricCategory:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def evaluate(
|
||||
self,
|
||||
agent: Agent,
|
||||
task: Task,
|
||||
execution_trace: Dict[str, Any],
|
||||
final_output: Any,
|
||||
) -> EvaluationScore:
|
||||
pass
|
||||
|
||||
|
||||
class AgentEvaluationResult(BaseModel):
|
||||
agent_id: str = Field(description="ID of the evaluated agent")
|
||||
task_id: str = Field(description="ID of the task that was executed")
|
||||
metrics: Dict[MetricCategory, EvaluationScore] = Field(
|
||||
default_factory=dict,
|
||||
description="Evaluation scores for each metric category"
|
||||
)
|
||||
|
||||
|
||||
class AggregationStrategy(Enum):
|
||||
SIMPLE_AVERAGE = "simple_average" # Equal weight to all tasks
|
||||
WEIGHTED_BY_COMPLEXITY = "weighted_by_complexity" # Weight by task complexity
|
||||
BEST_PERFORMANCE = "best_performance" # Use best scores across tasks
|
||||
WORST_PERFORMANCE = "worst_performance" # Use worst scores across tasks
|
||||
|
||||
|
||||
class AgentAggregatedEvaluationResult(BaseModel):
|
||||
agent_id: str = Field(
|
||||
default="",
|
||||
description="ID of the agent"
|
||||
)
|
||||
agent_role: str = Field(
|
||||
default="",
|
||||
description="Role of the agent"
|
||||
)
|
||||
task_count: int = Field(
|
||||
default=0,
|
||||
description="Number of tasks included in this aggregation"
|
||||
)
|
||||
aggregation_strategy: AggregationStrategy = Field(
|
||||
default=AggregationStrategy.SIMPLE_AVERAGE,
|
||||
description="Strategy used for aggregation"
|
||||
)
|
||||
metrics: Dict[MetricCategory, EvaluationScore] = Field(
|
||||
default_factory=dict,
|
||||
description="Aggregated metrics across all tasks"
|
||||
)
|
||||
task_results: List[str] = Field(
|
||||
default_factory=list,
|
||||
description="IDs of tasks included in this aggregation"
|
||||
)
|
||||
overall_score: Optional[float] = Field(
|
||||
default=None,
|
||||
description="Overall score for this agent"
|
||||
)
|
||||
|
||||
def __str__(self) -> str:
|
||||
result = f"Agent Evaluation: {self.agent_role}\n"
|
||||
result += f"Strategy: {self.aggregation_strategy.value}\n"
|
||||
result += f"Tasks evaluated: {self.task_count}\n"
|
||||
|
||||
for category, score in self.metrics.items():
|
||||
result += f"\n\n- {category.value.upper()}: {score.score}/10\n"
|
||||
|
||||
if score.feedback:
|
||||
detailed_feedback = "\n ".join(score.feedback.split('\n'))
|
||||
result += f" {detailed_feedback}\n"
|
||||
|
||||
return result
|
||||
341
src/crewai/evaluation/evaluation_display.py
Normal file
341
src/crewai/evaluation/evaluation_display.py
Normal file
@@ -0,0 +1,341 @@
|
||||
from collections import defaultdict
|
||||
from typing import Dict, Any, List
|
||||
from rich.table import Table
|
||||
from rich.box import HEAVY_EDGE, ROUNDED
|
||||
from collections.abc import Sequence
|
||||
from crewai.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy, AgentEvaluationResult, MetricCategory
|
||||
from crewai.evaluation import EvaluationScore
|
||||
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
|
||||
from crewai.utilities.llm_utils import create_llm
|
||||
|
||||
class EvaluationDisplayFormatter:
|
||||
def __init__(self):
|
||||
self.console_formatter = ConsoleFormatter()
|
||||
|
||||
def display_evaluation_with_feedback(self, iterations_results: Dict[int, Dict[str, List[Any]]]):
|
||||
if not iterations_results:
|
||||
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
|
||||
return
|
||||
|
||||
# Get all agent roles across all iterations
|
||||
all_agent_roles: set[str] = set()
|
||||
for iter_results in iterations_results.values():
|
||||
all_agent_roles.update(iter_results.keys())
|
||||
|
||||
for agent_role in sorted(all_agent_roles):
|
||||
self.console_formatter.print(f"\n[bold cyan]Agent: {agent_role}[/bold cyan]")
|
||||
|
||||
# Process each iteration
|
||||
for iter_num, results in sorted(iterations_results.items()):
|
||||
if agent_role not in results or not results[agent_role]:
|
||||
continue
|
||||
|
||||
agent_results = results[agent_role]
|
||||
agent_id = agent_results[0].agent_id
|
||||
|
||||
# Aggregate results for this agent in this iteration
|
||||
aggregated_result = self._aggregate_agent_results(
|
||||
agent_id=agent_id,
|
||||
agent_role=agent_role,
|
||||
results=agent_results,
|
||||
)
|
||||
|
||||
# Display iteration header
|
||||
self.console_formatter.print(f"\n[bold]Iteration {iter_num}[/bold]")
|
||||
|
||||
# Create table for this iteration
|
||||
table = Table(box=ROUNDED)
|
||||
table.add_column("Metric", style="cyan")
|
||||
table.add_column("Score (1-10)", justify="center")
|
||||
table.add_column("Feedback", style="green")
|
||||
|
||||
# Add metrics to table
|
||||
if aggregated_result.metrics:
|
||||
for metric, evaluation_score in aggregated_result.metrics.items():
|
||||
score = evaluation_score.score
|
||||
|
||||
if isinstance(score, (int, float)):
|
||||
if score >= 8.0:
|
||||
score_text = f"[green]{score:.1f}[/green]"
|
||||
elif score >= 6.0:
|
||||
score_text = f"[cyan]{score:.1f}[/cyan]"
|
||||
elif score >= 4.0:
|
||||
score_text = f"[yellow]{score:.1f}[/yellow]"
|
||||
else:
|
||||
score_text = f"[red]{score:.1f}[/red]"
|
||||
else:
|
||||
score_text = "[dim]N/A[/dim]"
|
||||
|
||||
table.add_section()
|
||||
table.add_row(
|
||||
metric.title(),
|
||||
score_text,
|
||||
evaluation_score.feedback or ""
|
||||
)
|
||||
|
||||
if aggregated_result.overall_score is not None:
|
||||
overall_score = aggregated_result.overall_score
|
||||
if overall_score >= 8.0:
|
||||
overall_color = "green"
|
||||
elif overall_score >= 6.0:
|
||||
overall_color = "cyan"
|
||||
elif overall_score >= 4.0:
|
||||
overall_color = "yellow"
|
||||
else:
|
||||
overall_color = "red"
|
||||
|
||||
table.add_section()
|
||||
table.add_row(
|
||||
"Overall Score",
|
||||
f"[{overall_color}]{overall_score:.1f}[/]",
|
||||
"Overall agent evaluation score"
|
||||
)
|
||||
|
||||
# Print the table for this iteration
|
||||
self.console_formatter.print(table)
|
||||
|
||||
def display_summary_results(self, iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]]):
|
||||
if not iterations_results:
|
||||
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
|
||||
return
|
||||
|
||||
self.console_formatter.print("\n")
|
||||
|
||||
table = Table(title="Agent Performance Scores \n (1-10 Higher is better)", box=HEAVY_EDGE)
|
||||
|
||||
table.add_column("Agent/Metric", style="cyan")
|
||||
|
||||
for iter_num in sorted(iterations_results.keys()):
|
||||
run_label = f"Run {iter_num}"
|
||||
table.add_column(run_label, justify="center")
|
||||
|
||||
table.add_column("Avg. Total", justify="center")
|
||||
|
||||
all_agent_roles: set[str] = set()
|
||||
for results in iterations_results.values():
|
||||
all_agent_roles.update(results.keys())
|
||||
|
||||
for agent_role in sorted(all_agent_roles):
|
||||
agent_scores_by_iteration = {}
|
||||
agent_metrics_by_iteration = {}
|
||||
|
||||
for iter_num, results in sorted(iterations_results.items()):
|
||||
if agent_role not in results or not results[agent_role]:
|
||||
continue
|
||||
|
||||
agent_results = results[agent_role]
|
||||
agent_id = agent_results[0].agent_id
|
||||
|
||||
aggregated_result = self._aggregate_agent_results(
|
||||
agent_id=agent_id,
|
||||
agent_role=agent_role,
|
||||
results=agent_results,
|
||||
strategy=AggregationStrategy.SIMPLE_AVERAGE
|
||||
)
|
||||
|
||||
valid_scores = [score.score for score in aggregated_result.metrics.values()
|
||||
if score.score is not None]
|
||||
if valid_scores:
|
||||
avg_score = sum(valid_scores) / len(valid_scores)
|
||||
agent_scores_by_iteration[iter_num] = avg_score
|
||||
|
||||
agent_metrics_by_iteration[iter_num] = aggregated_result.metrics
|
||||
|
||||
if not agent_scores_by_iteration:
|
||||
continue
|
||||
|
||||
avg_across_iterations = sum(agent_scores_by_iteration.values()) / len(agent_scores_by_iteration)
|
||||
|
||||
row = [f"[bold]{agent_role}[/bold]"]
|
||||
|
||||
for iter_num in sorted(iterations_results.keys()):
|
||||
if iter_num in agent_scores_by_iteration:
|
||||
score = agent_scores_by_iteration[iter_num]
|
||||
if score >= 8.0:
|
||||
color = "green"
|
||||
elif score >= 6.0:
|
||||
color = "cyan"
|
||||
elif score >= 4.0:
|
||||
color = "yellow"
|
||||
else:
|
||||
color = "red"
|
||||
row.append(f"[bold {color}]{score:.1f}[/]")
|
||||
else:
|
||||
row.append("-")
|
||||
|
||||
if avg_across_iterations >= 8.0:
|
||||
color = "green"
|
||||
elif avg_across_iterations >= 6.0:
|
||||
color = "cyan"
|
||||
elif avg_across_iterations >= 4.0:
|
||||
color = "yellow"
|
||||
else:
|
||||
color = "red"
|
||||
row.append(f"[bold {color}]{avg_across_iterations:.1f}[/]")
|
||||
|
||||
table.add_row(*row)
|
||||
|
||||
all_metrics: set[Any] = set()
|
||||
for metrics in agent_metrics_by_iteration.values():
|
||||
all_metrics.update(metrics.keys())
|
||||
|
||||
for metric in sorted(all_metrics, key=lambda x: x.value):
|
||||
metric_scores = []
|
||||
|
||||
row = [f" - {metric.title()}"]
|
||||
|
||||
for iter_num in sorted(iterations_results.keys()):
|
||||
if (iter_num in agent_metrics_by_iteration and
|
||||
metric in agent_metrics_by_iteration[iter_num]):
|
||||
metric_score = agent_metrics_by_iteration[iter_num][metric].score
|
||||
if metric_score is not None:
|
||||
metric_scores.append(metric_score)
|
||||
if metric_score >= 8.0:
|
||||
color = "green"
|
||||
elif metric_score >= 6.0:
|
||||
color = "cyan"
|
||||
elif metric_score >= 4.0:
|
||||
color = "yellow"
|
||||
else:
|
||||
color = "red"
|
||||
row.append(f"[{color}]{metric_score:.1f}[/]")
|
||||
else:
|
||||
row.append("[dim]N/A[/dim]")
|
||||
else:
|
||||
row.append("-")
|
||||
|
||||
if metric_scores:
|
||||
avg = sum(metric_scores) / len(metric_scores)
|
||||
if avg >= 8.0:
|
||||
color = "green"
|
||||
elif avg >= 6.0:
|
||||
color = "cyan"
|
||||
elif avg >= 4.0:
|
||||
color = "yellow"
|
||||
else:
|
||||
color = "red"
|
||||
row.append(f"[{color}]{avg:.1f}[/]")
|
||||
else:
|
||||
row.append("-")
|
||||
|
||||
table.add_row(*row)
|
||||
|
||||
table.add_row(*[""] * (len(sorted(iterations_results.keys())) + 2))
|
||||
|
||||
self.console_formatter.print(table)
|
||||
self.console_formatter.print("\n")
|
||||
|
||||
def _aggregate_agent_results(
|
||||
self,
|
||||
agent_id: str,
|
||||
agent_role: str,
|
||||
results: Sequence[AgentEvaluationResult],
|
||||
strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE,
|
||||
) -> AgentAggregatedEvaluationResult:
|
||||
metrics_by_category: dict[MetricCategory, list[EvaluationScore]] = defaultdict(list)
|
||||
|
||||
for result in results:
|
||||
for metric_name, evaluation_score in result.metrics.items():
|
||||
metrics_by_category[metric_name].append(evaluation_score)
|
||||
|
||||
aggregated_metrics: dict[MetricCategory, EvaluationScore] = {}
|
||||
for category, scores in metrics_by_category.items():
|
||||
valid_scores = [s.score for s in scores if s.score is not None]
|
||||
avg_score = sum(valid_scores) / len(valid_scores) if valid_scores else None
|
||||
|
||||
feedbacks = [s.feedback for s in scores if s.feedback]
|
||||
|
||||
feedback_summary = None
|
||||
if feedbacks:
|
||||
if len(feedbacks) > 1:
|
||||
# Use the summarization method for multiple feedbacks
|
||||
feedback_summary = self._summarize_feedbacks(
|
||||
agent_role=agent_role,
|
||||
metric=category.title(),
|
||||
feedbacks=feedbacks,
|
||||
scores=[s.score for s in scores],
|
||||
strategy=strategy
|
||||
)
|
||||
else:
|
||||
feedback_summary = feedbacks[0]
|
||||
|
||||
aggregated_metrics[category] = EvaluationScore(
|
||||
score=avg_score,
|
||||
feedback=feedback_summary
|
||||
)
|
||||
|
||||
overall_score = None
|
||||
if aggregated_metrics:
|
||||
valid_scores = [m.score for m in aggregated_metrics.values() if m.score is not None]
|
||||
if valid_scores:
|
||||
overall_score = sum(valid_scores) / len(valid_scores)
|
||||
|
||||
return AgentAggregatedEvaluationResult(
|
||||
agent_id=agent_id,
|
||||
agent_role=agent_role,
|
||||
metrics=aggregated_metrics,
|
||||
overall_score=overall_score,
|
||||
task_count=len(results),
|
||||
aggregation_strategy=strategy
|
||||
)
|
||||
|
||||
def _summarize_feedbacks(
|
||||
self,
|
||||
agent_role: str,
|
||||
metric: str,
|
||||
feedbacks: List[str],
|
||||
scores: List[float | None],
|
||||
strategy: AggregationStrategy
|
||||
) -> str:
|
||||
if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks):
|
||||
return "\n\n".join([f"Feedback {i+1}: {fb}" for i, fb in enumerate(feedbacks)])
|
||||
|
||||
try:
|
||||
llm = create_llm()
|
||||
|
||||
formatted_feedbacks = []
|
||||
for i, (feedback, score) in enumerate(zip(feedbacks, scores)):
|
||||
if len(feedback) > 500:
|
||||
feedback = feedback[:500] + "..."
|
||||
score_text = f"{score:.1f}" if score is not None else "N/A"
|
||||
formatted_feedbacks.append(f"Feedback #{i+1} (Score: {score_text}):\n{feedback}")
|
||||
|
||||
all_feedbacks = "\n\n" + "\n\n---\n\n".join(formatted_feedbacks)
|
||||
|
||||
strategy_guidance = ""
|
||||
if strategy == AggregationStrategy.BEST_PERFORMANCE:
|
||||
strategy_guidance = "Focus on the highest-scoring aspects and strengths demonstrated."
|
||||
elif strategy == AggregationStrategy.WORST_PERFORMANCE:
|
||||
strategy_guidance = "Focus on areas that need improvement and common issues across tasks."
|
||||
else: # Default/average strategies
|
||||
strategy_guidance = "Provide a balanced analysis of strengths and weaknesses across all tasks."
|
||||
|
||||
prompt = [
|
||||
{"role": "system", "content": f"""You are an expert evaluator creating a comprehensive summary of agent performance feedback.
|
||||
Your job is to synthesize multiple feedback points about the same metric across different tasks.
|
||||
|
||||
Create a concise, insightful summary that captures the key patterns and themes from all feedback.
|
||||
{strategy_guidance}
|
||||
|
||||
Your summary should be:
|
||||
1. Specific and concrete (not vague or general)
|
||||
2. Focused on actionable insights
|
||||
3. Highlighting patterns across tasks
|
||||
4. 150-250 words in length
|
||||
|
||||
The summary should be directly usable as final feedback for the agent's performance on this metric."""},
|
||||
{"role": "user", "content": f"""I need a synthesized summary of the following feedback for:
|
||||
|
||||
Agent Role: {agent_role}
|
||||
Metric: {metric.title()}
|
||||
|
||||
{all_feedbacks}
|
||||
"""}
|
||||
]
|
||||
assert llm is not None
|
||||
response = llm.call(prompt)
|
||||
|
||||
return response
|
||||
|
||||
except Exception:
|
||||
return "Synthesized from multiple tasks: " + "\n\n".join([f"- {fb[:500]}..." for fb in feedbacks])
|
||||
190
src/crewai/evaluation/evaluation_listener.py
Normal file
190
src/crewai/evaluation/evaluation_listener.py
Normal file
@@ -0,0 +1,190 @@
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from collections.abc import Sequence
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
from crewai.utilities.events.base_event_listener import BaseEventListener
|
||||
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
|
||||
from crewai.utilities.events.agent_events import (
|
||||
AgentExecutionStartedEvent,
|
||||
AgentExecutionCompletedEvent
|
||||
)
|
||||
from crewai.utilities.events.tool_usage_events import (
|
||||
ToolUsageFinishedEvent,
|
||||
ToolUsageErrorEvent,
|
||||
ToolExecutionErrorEvent,
|
||||
ToolSelectionErrorEvent,
|
||||
ToolValidateInputErrorEvent
|
||||
)
|
||||
from crewai.utilities.events.llm_events import (
|
||||
LLMCallStartedEvent,
|
||||
LLMCallCompletedEvent
|
||||
)
|
||||
|
||||
class EvaluationTraceCallback(BaseEventListener):
|
||||
"""Event listener for collecting execution traces for evaluation.
|
||||
|
||||
This listener attaches to the event bus to collect detailed information
|
||||
about the execution process, including agent steps, tool uses, knowledge
|
||||
retrievals, and final output - all for use in agent evaluation.
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
if not hasattr(self, "_initialized") or not self._initialized:
|
||||
super().__init__()
|
||||
self.traces = {}
|
||||
self.current_agent_id = None
|
||||
self.current_task_id = None
|
||||
self._initialized = True
|
||||
|
||||
def setup_listeners(self, event_bus: CrewAIEventsBus):
|
||||
@event_bus.on(AgentExecutionStartedEvent)
|
||||
def on_agent_started(source, event: AgentExecutionStartedEvent):
|
||||
self.on_agent_start(event.agent, event.task)
|
||||
|
||||
@event_bus.on(AgentExecutionCompletedEvent)
|
||||
def on_agent_completed(source, event: AgentExecutionCompletedEvent):
|
||||
self.on_agent_finish(event.agent, event.task, event.output)
|
||||
|
||||
@event_bus.on(ToolUsageFinishedEvent)
|
||||
def on_tool_completed(source, event: ToolUsageFinishedEvent):
|
||||
self.on_tool_use(event.tool_name, event.tool_args, event.output, success=True)
|
||||
|
||||
@event_bus.on(ToolUsageErrorEvent)
|
||||
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
|
||||
self.on_tool_use(event.tool_name, event.tool_args, event.error,
|
||||
success=False, error_type="usage_error")
|
||||
|
||||
@event_bus.on(ToolExecutionErrorEvent)
|
||||
def on_tool_execution_error(source, event: ToolExecutionErrorEvent):
|
||||
self.on_tool_use(event.tool_name, event.tool_args, event.error,
|
||||
success=False, error_type="execution_error")
|
||||
|
||||
@event_bus.on(ToolSelectionErrorEvent)
|
||||
def on_tool_selection_error(source, event: ToolSelectionErrorEvent):
|
||||
self.on_tool_use(event.tool_name, event.tool_args, event.error,
|
||||
success=False, error_type="selection_error")
|
||||
|
||||
@event_bus.on(ToolValidateInputErrorEvent)
|
||||
def on_tool_validate_input_error(source, event: ToolValidateInputErrorEvent):
|
||||
self.on_tool_use(event.tool_name, event.tool_args, event.error,
|
||||
success=False, error_type="validation_error")
|
||||
|
||||
@event_bus.on(LLMCallStartedEvent)
|
||||
def on_llm_call_started(source, event: LLMCallStartedEvent):
|
||||
self.on_llm_call_start(event.messages, event.tools)
|
||||
|
||||
@event_bus.on(LLMCallCompletedEvent)
|
||||
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
|
||||
self.on_llm_call_end(event.messages, event.response)
|
||||
|
||||
def on_agent_start(self, agent: Agent, task: Task):
|
||||
self.current_agent_id = agent.id
|
||||
self.current_task_id = task.id
|
||||
|
||||
trace_key = f"{agent.id}_{task.id}"
|
||||
self.traces[trace_key] = {
|
||||
"agent_id": agent.id,
|
||||
"task_id": task.id,
|
||||
"tool_uses": [],
|
||||
"llm_calls": [],
|
||||
"start_time": datetime.now(),
|
||||
"final_output": None
|
||||
}
|
||||
|
||||
def on_agent_finish(self, agent: Agent, task: Task, output: Any):
|
||||
trace_key = f"{agent.id}_{task.id}"
|
||||
if trace_key in self.traces:
|
||||
self.traces[trace_key]["final_output"] = output
|
||||
self.traces[trace_key]["end_time"] = datetime.now()
|
||||
|
||||
self.current_agent_id = None
|
||||
self.current_task_id = None
|
||||
|
||||
def on_tool_use(self, tool_name: str, tool_args: dict[str, Any] | str, result: Any,
|
||||
success: bool = True, error_type: str | None = None):
|
||||
if not self.current_agent_id or not self.current_task_id:
|
||||
return
|
||||
|
||||
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
|
||||
if trace_key in self.traces:
|
||||
tool_use = {
|
||||
"tool": tool_name,
|
||||
"args": tool_args,
|
||||
"result": result,
|
||||
"success": success,
|
||||
"timestamp": datetime.now()
|
||||
}
|
||||
|
||||
# Add error information if applicable
|
||||
if not success and error_type:
|
||||
tool_use["error"] = True
|
||||
tool_use["error_type"] = error_type
|
||||
|
||||
self.traces[trace_key]["tool_uses"].append(tool_use)
|
||||
|
||||
def on_llm_call_start(self, messages: str | Sequence[dict[str, Any]] | None, tools: Sequence[dict[str, Any]] | None = None):
|
||||
if not self.current_agent_id or not self.current_task_id:
|
||||
return
|
||||
|
||||
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
|
||||
if trace_key not in self.traces:
|
||||
return
|
||||
|
||||
self.current_llm_call = {
|
||||
"messages": messages,
|
||||
"tools": tools,
|
||||
"start_time": datetime.now(),
|
||||
"response": None,
|
||||
"end_time": None
|
||||
}
|
||||
|
||||
def on_llm_call_end(self, messages: str | list[dict[str, Any]] | None, response: Any):
|
||||
if not self.current_agent_id or not self.current_task_id:
|
||||
return
|
||||
|
||||
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
|
||||
if trace_key not in self.traces:
|
||||
return
|
||||
|
||||
total_tokens = 0
|
||||
if hasattr(response, "usage") and hasattr(response.usage, "total_tokens"):
|
||||
total_tokens = response.usage.total_tokens
|
||||
|
||||
current_time = datetime.now()
|
||||
start_time = None
|
||||
if hasattr(self, "current_llm_call") and self.current_llm_call:
|
||||
start_time = self.current_llm_call.get("start_time")
|
||||
|
||||
if not start_time:
|
||||
start_time = current_time
|
||||
llm_call = {
|
||||
"messages": messages,
|
||||
"response": response,
|
||||
"start_time": start_time,
|
||||
"end_time": current_time,
|
||||
"total_tokens": total_tokens
|
||||
}
|
||||
|
||||
self.traces[trace_key]["llm_calls"].append(llm_call)
|
||||
|
||||
if hasattr(self, "current_llm_call"):
|
||||
self.current_llm_call = {}
|
||||
|
||||
def get_trace(self, agent_id: str, task_id: str) -> Optional[Dict[str, Any]]:
|
||||
trace_key = f"{agent_id}_{task_id}"
|
||||
return self.traces.get(trace_key)
|
||||
|
||||
|
||||
def create_evaluation_callbacks() -> EvaluationTraceCallback:
|
||||
return EvaluationTraceCallback()
|
||||
30
src/crewai/evaluation/json_parser.py
Normal file
30
src/crewai/evaluation/json_parser.py
Normal file
@@ -0,0 +1,30 @@
|
||||
"""Robust JSON parsing utilities for evaluation responses."""
|
||||
|
||||
import json
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
|
||||
def extract_json_from_llm_response(text: str) -> dict[str, Any]:
|
||||
try:
|
||||
return json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
json_patterns = [
|
||||
# Standard markdown code blocks with json
|
||||
r'```json\s*([\s\S]*?)\s*```',
|
||||
# Code blocks without language specifier
|
||||
r'```\s*([\s\S]*?)\s*```',
|
||||
# Inline code with JSON
|
||||
r'`([{\\[].*[}\]])`',
|
||||
]
|
||||
|
||||
for pattern in json_patterns:
|
||||
matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL)
|
||||
for match in matches:
|
||||
try:
|
||||
return json.loads(match.strip())
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
raise ValueError("No valid JSON found in the response")
|
||||
0
src/crewai/evaluation/metrics/__init__.py
Normal file
0
src/crewai/evaluation/metrics/__init__.py
Normal file
66
src/crewai/evaluation/metrics/goal_metrics.py
Normal file
66
src/crewai/evaluation/metrics/goal_metrics.py
Normal file
@@ -0,0 +1,66 @@
|
||||
from typing import Any, Dict
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
|
||||
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
|
||||
from crewai.evaluation.json_parser import extract_json_from_llm_response
|
||||
|
||||
class GoalAlignmentEvaluator(BaseEvaluator):
|
||||
@property
|
||||
def metric_category(self) -> MetricCategory:
|
||||
return MetricCategory.GOAL_ALIGNMENT
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
agent: Agent,
|
||||
task: Task,
|
||||
execution_trace: Dict[str, Any],
|
||||
final_output: Any,
|
||||
) -> EvaluationScore:
|
||||
prompt = [
|
||||
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent's output aligns with its assigned task goal.
|
||||
|
||||
Score the agent's goal alignment on a scale from 0-10 where:
|
||||
- 0: Complete misalignment, agent did not understand or attempt the task goal
|
||||
- 5: Partial alignment, agent attempted the task but missed key requirements
|
||||
- 10: Perfect alignment, agent fully satisfied all task requirements
|
||||
|
||||
Consider:
|
||||
1. Did the agent correctly interpret the task goal?
|
||||
2. Did the final output directly address the requirements?
|
||||
3. Did the agent focus on relevant aspects of the task?
|
||||
4. Did the agent provide all requested information or deliverables?
|
||||
|
||||
Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string).
|
||||
"""},
|
||||
{"role": "user", "content": f"""
|
||||
Agent role: {agent.role}
|
||||
Agent goal: {agent.goal}
|
||||
Task description: {task.description}
|
||||
Expected output: {task.expected_output}
|
||||
|
||||
Agent's final output:
|
||||
{final_output}
|
||||
|
||||
Evaluate how well the agent's output aligns with the assigned task goal.
|
||||
"""}
|
||||
]
|
||||
assert self.llm is not None
|
||||
response = self.llm.call(prompt)
|
||||
|
||||
try:
|
||||
evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
|
||||
assert evaluation_data is not None
|
||||
|
||||
return EvaluationScore(
|
||||
score=evaluation_data.get("score", 0),
|
||||
feedback=evaluation_data.get("feedback", response),
|
||||
raw_response=response
|
||||
)
|
||||
except Exception:
|
||||
return EvaluationScore(
|
||||
score=None,
|
||||
feedback=f"Failed to parse evaluation. Raw response: {response}",
|
||||
raw_response=response
|
||||
)
|
||||
355
src/crewai/evaluation/metrics/reasoning_metrics.py
Normal file
355
src/crewai/evaluation/metrics/reasoning_metrics.py
Normal file
@@ -0,0 +1,355 @@
|
||||
"""Agent reasoning efficiency evaluators.
|
||||
|
||||
This module provides evaluator implementations for:
|
||||
- Reasoning efficiency
|
||||
- Loop detection
|
||||
- Thinking-to-action ratio
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Tuple
|
||||
import numpy as np
|
||||
from collections.abc import Sequence
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
|
||||
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
|
||||
from crewai.evaluation.json_parser import extract_json_from_llm_response
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
class ReasoningPatternType(Enum):
|
||||
EFFICIENT = "efficient" # Good reasoning flow
|
||||
LOOP = "loop" # Agent is stuck in a loop
|
||||
VERBOSE = "verbose" # Agent is unnecessarily verbose
|
||||
INDECISIVE = "indecisive" # Agent struggles to make decisions
|
||||
SCATTERED = "scattered" # Agent jumps between topics without focus
|
||||
|
||||
|
||||
class ReasoningEfficiencyEvaluator(BaseEvaluator):
|
||||
@property
|
||||
def metric_category(self) -> MetricCategory:
|
||||
return MetricCategory.REASONING_EFFICIENCY
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
agent: Agent,
|
||||
task: Task,
|
||||
execution_trace: Dict[str, Any],
|
||||
final_output: TaskOutput,
|
||||
) -> EvaluationScore:
|
||||
llm_calls = execution_trace.get("llm_calls", [])
|
||||
|
||||
if not llm_calls or len(llm_calls) < 2:
|
||||
return EvaluationScore(
|
||||
score=None,
|
||||
feedback="Insufficient LLM calls to evaluate reasoning efficiency."
|
||||
)
|
||||
|
||||
total_calls = len(llm_calls)
|
||||
total_tokens = sum(call.get("total_tokens", 0) for call in llm_calls)
|
||||
avg_tokens_per_call = total_tokens / total_calls if total_calls > 0 else 0
|
||||
time_intervals = []
|
||||
has_reliable_timing = True
|
||||
for i in range(1, len(llm_calls)):
|
||||
start_time = llm_calls[i-1].get("end_time")
|
||||
end_time = llm_calls[i].get("start_time")
|
||||
if start_time and end_time and start_time != end_time:
|
||||
try:
|
||||
interval = end_time - start_time
|
||||
time_intervals.append(interval.total_seconds() if hasattr(interval, 'total_seconds') else 0)
|
||||
except Exception:
|
||||
has_reliable_timing = False
|
||||
else:
|
||||
has_reliable_timing = False
|
||||
|
||||
loop_detected, loop_details = self._detect_loops(llm_calls)
|
||||
pattern_analysis = self._analyze_reasoning_patterns(llm_calls)
|
||||
|
||||
efficiency_metrics = {
|
||||
"total_llm_calls": total_calls,
|
||||
"total_tokens": total_tokens,
|
||||
"avg_tokens_per_call": avg_tokens_per_call,
|
||||
"reasoning_pattern": pattern_analysis["primary_pattern"].value,
|
||||
"loops_detected": loop_detected,
|
||||
}
|
||||
|
||||
if has_reliable_timing and time_intervals:
|
||||
efficiency_metrics["avg_time_between_calls"] = np.mean(time_intervals)
|
||||
|
||||
loop_info = f"Detected {len(loop_details)} potential reasoning loops." if loop_detected else "No significant reasoning loops detected."
|
||||
|
||||
call_samples = self._get_call_samples(llm_calls)
|
||||
|
||||
prompt = [
|
||||
{"role": "system", "content": """You are an expert evaluator assessing the reasoning efficiency of an AI agent's thought process.
|
||||
|
||||
Evaluate the agent's reasoning efficiency across these five key subcategories:
|
||||
|
||||
1. Focus (0-10): How well the agent stays on topic and avoids unnecessary tangents
|
||||
2. Progression (0-10): How effectively the agent builds on previous thoughts rather than repeating or circling
|
||||
3. Decision Quality (0-10): How decisively and appropriately the agent makes decisions
|
||||
4. Conciseness (0-10): How efficiently the agent communicates without unnecessary verbosity
|
||||
5. Loop Avoidance (0-10): How well the agent avoids getting stuck in repetitive thinking patterns
|
||||
|
||||
For each subcategory, provide a score from 0-10 where:
|
||||
- 0: Completely inefficient
|
||||
- 5: Moderately efficient
|
||||
- 10: Highly efficient
|
||||
|
||||
The overall score should be a weighted average of these subcategories.
|
||||
|
||||
Return your evaluation as JSON with the following structure:
|
||||
{
|
||||
"overall_score": float,
|
||||
"scores": {
|
||||
"focus": float,
|
||||
"progression": float,
|
||||
"decision_quality": float,
|
||||
"conciseness": float,
|
||||
"loop_avoidance": float
|
||||
},
|
||||
"feedback": string (general feedback about overall reasoning efficiency),
|
||||
"optimization_suggestions": string (concrete suggestions for improving reasoning efficiency),
|
||||
"detected_patterns": string (describe any inefficient reasoning patterns you observe)
|
||||
}"""},
|
||||
{"role": "user", "content": f"""
|
||||
Agent role: {agent.role}
|
||||
Task description: {task.description}
|
||||
|
||||
Reasoning efficiency metrics:
|
||||
- Total LLM calls: {efficiency_metrics["total_llm_calls"]}
|
||||
- Average tokens per call: {efficiency_metrics["avg_tokens_per_call"]:.1f}
|
||||
- Primary reasoning pattern: {efficiency_metrics["reasoning_pattern"]}
|
||||
- {loop_info}
|
||||
{"- Average time between calls: {:.2f} seconds".format(efficiency_metrics.get("avg_time_between_calls", 0)) if "avg_time_between_calls" in efficiency_metrics else ""}
|
||||
|
||||
Sample of agent reasoning flow (chronological sequence):
|
||||
{call_samples}
|
||||
|
||||
Agent's final output:
|
||||
{final_output.raw[:500]}... (truncated)
|
||||
|
||||
Evaluate the reasoning efficiency of this agent based on these interaction patterns.
|
||||
Identify any inefficient reasoning patterns and provide specific suggestions for optimization.
|
||||
"""}
|
||||
]
|
||||
|
||||
assert self.llm is not None
|
||||
response = self.llm.call(prompt)
|
||||
|
||||
try:
|
||||
evaluation_data = extract_json_from_llm_response(response)
|
||||
|
||||
scores = evaluation_data.get("scores", {})
|
||||
focus = scores.get("focus", 5.0)
|
||||
progression = scores.get("progression", 5.0)
|
||||
decision_quality = scores.get("decision_quality", 5.0)
|
||||
conciseness = scores.get("conciseness", 5.0)
|
||||
loop_avoidance = scores.get("loop_avoidance", 5.0)
|
||||
|
||||
overall_score = evaluation_data.get("overall_score", evaluation_data.get("score", 5.0))
|
||||
feedback = evaluation_data.get("feedback", "No detailed feedback provided.")
|
||||
optimization_suggestions = evaluation_data.get("optimization_suggestions", "No specific suggestions provided.")
|
||||
|
||||
detailed_feedback = "Reasoning Efficiency Evaluation:\n"
|
||||
detailed_feedback += f"• Focus: {focus}/10 - Staying on topic without tangents\n"
|
||||
detailed_feedback += f"• Progression: {progression}/10 - Building on previous thinking\n"
|
||||
detailed_feedback += f"• Decision Quality: {decision_quality}/10 - Making appropriate decisions\n"
|
||||
detailed_feedback += f"• Conciseness: {conciseness}/10 - Communicating efficiently\n"
|
||||
detailed_feedback += f"• Loop Avoidance: {loop_avoidance}/10 - Avoiding repetitive patterns\n\n"
|
||||
|
||||
detailed_feedback += f"Feedback:\n{feedback}\n\n"
|
||||
detailed_feedback += f"Optimization Suggestions:\n{optimization_suggestions}"
|
||||
|
||||
return EvaluationScore(
|
||||
score=float(overall_score),
|
||||
feedback=detailed_feedback,
|
||||
raw_response=response
|
||||
)
|
||||
except Exception as e:
|
||||
logging.warning(f"Failed to parse reasoning efficiency evaluation: {e}")
|
||||
return EvaluationScore(
|
||||
score=None,
|
||||
feedback=f"Failed to parse reasoning efficiency evaluation. Raw response: {response[:200]}...",
|
||||
raw_response=response
|
||||
)
|
||||
|
||||
def _detect_loops(self, llm_calls: List[Dict]) -> Tuple[bool, List[Dict]]:
|
||||
loop_details = []
|
||||
|
||||
messages = []
|
||||
for call in llm_calls:
|
||||
content = call.get("response", "")
|
||||
if isinstance(content, str):
|
||||
messages.append(content)
|
||||
elif isinstance(content, list) and len(content) > 0:
|
||||
# Handle message list format
|
||||
for msg in content:
|
||||
if isinstance(msg, dict) and "content" in msg:
|
||||
messages.append(msg["content"])
|
||||
|
||||
# Simple n-gram based similarity detection
|
||||
# For a more robust implementation, consider using embedding-based similarity
|
||||
for i in range(len(messages) - 2):
|
||||
for j in range(i + 1, len(messages) - 1):
|
||||
# Check for repeated patterns (simplistic approach)
|
||||
# A more sophisticated approach would use semantic similarity
|
||||
similarity = self._calculate_text_similarity(messages[i], messages[j])
|
||||
if similarity > 0.7: # Arbitrary threshold
|
||||
loop_details.append({
|
||||
"first_occurrence": i,
|
||||
"second_occurrence": j,
|
||||
"similarity": similarity,
|
||||
"snippet": messages[i][:100] + "..."
|
||||
})
|
||||
|
||||
return len(loop_details) > 0, loop_details
|
||||
|
||||
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
|
||||
text1 = re.sub(r'\s+', ' ', text1.lower()).strip()
|
||||
text2 = re.sub(r'\s+', ' ', text2.lower()).strip()
|
||||
|
||||
# Simple Jaccard similarity on word sets
|
||||
words1 = set(text1.split())
|
||||
words2 = set(text2.split())
|
||||
|
||||
intersection = len(words1.intersection(words2))
|
||||
union = len(words1.union(words2))
|
||||
|
||||
return intersection / union if union > 0 else 0.0
|
||||
|
||||
def _analyze_reasoning_patterns(self, llm_calls: List[Dict]) -> Dict[str, Any]:
|
||||
call_lengths = []
|
||||
response_times = []
|
||||
|
||||
for call in llm_calls:
|
||||
content = call.get("response", "")
|
||||
if isinstance(content, str):
|
||||
call_lengths.append(len(content))
|
||||
elif isinstance(content, list) and len(content) > 0:
|
||||
# Handle message list format
|
||||
total_length = 0
|
||||
for msg in content:
|
||||
if isinstance(msg, dict) and "content" in msg:
|
||||
total_length += len(msg["content"])
|
||||
call_lengths.append(total_length)
|
||||
|
||||
start_time = call.get("start_time")
|
||||
end_time = call.get("end_time")
|
||||
if start_time and end_time:
|
||||
try:
|
||||
response_times.append(end_time - start_time)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
avg_length = np.mean(call_lengths) if call_lengths else 0
|
||||
std_length = np.std(call_lengths) if call_lengths else 0
|
||||
length_trend = self._calculate_trend(call_lengths)
|
||||
|
||||
primary_pattern = ReasoningPatternType.EFFICIENT
|
||||
details = "Agent demonstrates efficient reasoning patterns."
|
||||
|
||||
loop_score = self._calculate_loop_likelihood(call_lengths, response_times)
|
||||
if loop_score > 0.7:
|
||||
primary_pattern = ReasoningPatternType.LOOP
|
||||
details = "Agent appears to be stuck in repetitive thinking patterns."
|
||||
elif avg_length > 1000 and std_length / avg_length < 0.3:
|
||||
primary_pattern = ReasoningPatternType.VERBOSE
|
||||
details = "Agent is consistently verbose across interactions."
|
||||
elif len(llm_calls) > 10 and length_trend > 0.5:
|
||||
primary_pattern = ReasoningPatternType.INDECISIVE
|
||||
details = "Agent shows signs of indecisiveness with increasing message lengths."
|
||||
elif std_length / avg_length > 0.8:
|
||||
primary_pattern = ReasoningPatternType.SCATTERED
|
||||
details = "Agent shows inconsistent reasoning flow with highly variable responses."
|
||||
|
||||
return {
|
||||
"primary_pattern": primary_pattern,
|
||||
"details": details,
|
||||
"metrics": {
|
||||
"avg_length": avg_length,
|
||||
"std_length": std_length,
|
||||
"length_trend": length_trend,
|
||||
"loop_score": loop_score
|
||||
}
|
||||
}
|
||||
|
||||
def _calculate_trend(self, values: Sequence[float | int]) -> float:
|
||||
if not values or len(values) < 2:
|
||||
return 0.0
|
||||
|
||||
try:
|
||||
x = np.arange(len(values))
|
||||
y = np.array(values)
|
||||
|
||||
# Simple linear regression
|
||||
slope = np.polyfit(x, y, 1)[0]
|
||||
|
||||
# Normalize slope to -1 to 1 range
|
||||
max_possible_slope = max(values) - min(values)
|
||||
if max_possible_slope > 0:
|
||||
normalized_slope = slope / max_possible_slope
|
||||
return max(min(normalized_slope, 1.0), -1.0)
|
||||
return 0.0
|
||||
except Exception:
|
||||
return 0.0
|
||||
|
||||
def _calculate_loop_likelihood(self, call_lengths: Sequence[float], response_times: Sequence[float]) -> float:
|
||||
if not call_lengths or len(call_lengths) < 3:
|
||||
return 0.0
|
||||
|
||||
indicators = []
|
||||
|
||||
if len(call_lengths) >= 4:
|
||||
repeated_lengths = 0
|
||||
for i in range(len(call_lengths) - 2):
|
||||
ratio = call_lengths[i] / call_lengths[i + 2] if call_lengths[i + 2] > 0 else 0
|
||||
if 0.85 <= ratio <= 1.15:
|
||||
repeated_lengths += 1
|
||||
|
||||
length_repetition_score = repeated_lengths / (len(call_lengths) - 2)
|
||||
indicators.append(length_repetition_score)
|
||||
|
||||
if response_times and len(response_times) >= 3:
|
||||
try:
|
||||
std_time = np.std(response_times)
|
||||
mean_time = np.mean(response_times)
|
||||
if mean_time > 0:
|
||||
time_consistency = 1.0 - (std_time / mean_time)
|
||||
indicators.append(max(0, time_consistency - 0.3) * 1.5)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return np.mean(indicators) if indicators else 0.0
|
||||
|
||||
def _get_call_samples(self, llm_calls: List[Dict]) -> str:
|
||||
samples = []
|
||||
|
||||
if len(llm_calls) <= 6:
|
||||
sample_indices = list(range(len(llm_calls)))
|
||||
else:
|
||||
sample_indices = [0, 1, len(llm_calls) // 2 - 1, len(llm_calls) // 2,
|
||||
len(llm_calls) - 2, len(llm_calls) - 1]
|
||||
|
||||
for idx in sample_indices:
|
||||
call = llm_calls[idx]
|
||||
content = call.get("response", "")
|
||||
|
||||
if isinstance(content, str):
|
||||
sample = content
|
||||
elif isinstance(content, list) and len(content) > 0:
|
||||
sample_parts = []
|
||||
for msg in content:
|
||||
if isinstance(msg, dict) and "content" in msg:
|
||||
sample_parts.append(msg["content"])
|
||||
sample = "\n".join(sample_parts)
|
||||
else:
|
||||
sample = str(content)
|
||||
|
||||
truncated = sample[:200] + "..." if len(sample) > 200 else sample
|
||||
samples.append(f"Call {idx + 1}:\n{truncated}\n")
|
||||
|
||||
return "\n".join(samples)
|
||||
65
src/crewai/evaluation/metrics/semantic_quality_metrics.py
Normal file
65
src/crewai/evaluation/metrics/semantic_quality_metrics.py
Normal file
@@ -0,0 +1,65 @@
|
||||
from typing import Any, Dict
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
|
||||
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
|
||||
from crewai.evaluation.json_parser import extract_json_from_llm_response
|
||||
|
||||
class SemanticQualityEvaluator(BaseEvaluator):
|
||||
@property
|
||||
def metric_category(self) -> MetricCategory:
|
||||
return MetricCategory.SEMANTIC_QUALITY
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
agent: Agent,
|
||||
task: Task,
|
||||
execution_trace: Dict[str, Any],
|
||||
final_output: Any,
|
||||
) -> EvaluationScore:
|
||||
prompt = [
|
||||
{"role": "system", "content": """You are an expert evaluator assessing the semantic quality of an AI agent's output.
|
||||
|
||||
Score the semantic quality on a scale from 0-10 where:
|
||||
- 0: Completely incoherent, confusing, or logically flawed output
|
||||
- 5: Moderately clear and logical output with some issues
|
||||
- 10: Exceptionally clear, coherent, and logically sound output
|
||||
|
||||
Consider:
|
||||
1. Is the output well-structured and organized?
|
||||
2. Is the reasoning logical and well-supported?
|
||||
3. Is the language clear, precise, and appropriate for the task?
|
||||
4. Are claims supported by evidence when appropriate?
|
||||
5. Is the output free from contradictions and logical fallacies?
|
||||
|
||||
Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string).
|
||||
"""},
|
||||
{"role": "user", "content": f"""
|
||||
Agent role: {agent.role}
|
||||
Task description: {task.description}
|
||||
|
||||
Agent's final output:
|
||||
{final_output}
|
||||
|
||||
Evaluate the semantic quality and reasoning of this output.
|
||||
"""}
|
||||
]
|
||||
|
||||
assert self.llm is not None
|
||||
response = self.llm.call(prompt)
|
||||
|
||||
try:
|
||||
evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
|
||||
assert evaluation_data is not None
|
||||
return EvaluationScore(
|
||||
score=float(evaluation_data["score"]) if evaluation_data.get("score") is not None else None,
|
||||
feedback=evaluation_data.get("feedback", response),
|
||||
raw_response=response
|
||||
)
|
||||
except Exception:
|
||||
return EvaluationScore(
|
||||
score=None,
|
||||
feedback=f"Failed to parse evaluation. Raw response: {response}",
|
||||
raw_response=response
|
||||
)
|
||||
400
src/crewai/evaluation/metrics/tools_metrics.py
Normal file
400
src/crewai/evaluation/metrics/tools_metrics.py
Normal file
@@ -0,0 +1,400 @@
|
||||
import json
|
||||
from typing import Dict, Any
|
||||
|
||||
from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
|
||||
from crewai.evaluation.json_parser import extract_json_from_llm_response
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
class ToolSelectionEvaluator(BaseEvaluator):
|
||||
|
||||
@property
|
||||
def metric_category(self) -> MetricCategory:
|
||||
return MetricCategory.TOOL_SELECTION
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
agent: Agent,
|
||||
task: Task,
|
||||
execution_trace: Dict[str, Any],
|
||||
final_output: str,
|
||||
) -> EvaluationScore:
|
||||
tool_uses = execution_trace.get("tool_uses", [])
|
||||
tool_count = len(tool_uses)
|
||||
unique_tool_types = set([tool.get("tool", "Unknown tool") for tool in tool_uses])
|
||||
|
||||
if tool_count == 0:
|
||||
if not agent.tools:
|
||||
return EvaluationScore(
|
||||
score=None,
|
||||
feedback="Agent had no tools available to use."
|
||||
)
|
||||
else:
|
||||
return EvaluationScore(
|
||||
score=None,
|
||||
feedback="Agent had tools available but didn't use any."
|
||||
)
|
||||
|
||||
available_tools_info = ""
|
||||
if agent.tools:
|
||||
for tool in agent.tools:
|
||||
available_tools_info += f"- {tool.name}: {tool.description}\n"
|
||||
else:
|
||||
available_tools_info = "No tools available"
|
||||
|
||||
tool_types_summary = "Tools selected by the agent:\n"
|
||||
for tool_type in sorted(unique_tool_types):
|
||||
tool_types_summary += f"- {tool_type}\n"
|
||||
|
||||
prompt = [
|
||||
{"role": "system", "content": """You are an expert evaluator assessing if an AI agent selected the most appropriate tools for a given task.
|
||||
|
||||
You must evaluate based on these 2 criteria:
|
||||
1. Relevance (0-10): Were the tools chosen directly aligned with the task's goals?
|
||||
2. Coverage (0-10): Did the agent select ALL appropriate tools from the AVAILABLE tools?
|
||||
|
||||
IMPORTANT:
|
||||
- ONLY consider tools that are listed as available to the agent
|
||||
- DO NOT suggest tools that aren't in the 'Available tools' list
|
||||
- DO NOT evaluate the quality or accuracy of tool outputs/results
|
||||
- DO NOT evaluate how many times each tool was used
|
||||
- DO NOT evaluate how the agent used the parameters
|
||||
- DO NOT evaluate whether the agent interpreted the task correctly
|
||||
|
||||
Focus ONLY on whether the correct CATEGORIES of tools were selected from what was available.
|
||||
|
||||
Return your evaluation as JSON with these fields:
|
||||
- scores: {"relevance": number, "coverage": number}
|
||||
- overall_score: number (average of all scores, 0-10)
|
||||
- feedback: string (focused ONLY on tool selection decisions from available tools)
|
||||
- improvement_suggestions: string (ONLY suggest better selection from the AVAILABLE tools list, NOT new tools)
|
||||
"""},
|
||||
{"role": "user", "content": f"""
|
||||
Agent role: {agent.role}
|
||||
Task description: {task.description}
|
||||
|
||||
Available tools for this agent:
|
||||
{available_tools_info}
|
||||
|
||||
{tool_types_summary}
|
||||
|
||||
Based ONLY on the task description and comparing the AVAILABLE tools with those that were selected (listed above), evaluate if the agent selected the appropriate tool types for this task.
|
||||
|
||||
IMPORTANT:
|
||||
- ONLY evaluate selection from tools listed as available
|
||||
- DO NOT suggest new tools that aren't in the available tools list
|
||||
- DO NOT evaluate tool usage or results
|
||||
"""}
|
||||
]
|
||||
assert self.llm is not None
|
||||
response = self.llm.call(prompt)
|
||||
|
||||
try:
|
||||
evaluation_data = extract_json_from_llm_response(response)
|
||||
assert evaluation_data is not None
|
||||
|
||||
scores = evaluation_data.get("scores", {})
|
||||
relevance = scores.get("relevance", 5.0)
|
||||
coverage = scores.get("coverage", 5.0)
|
||||
overall_score = float(evaluation_data.get("overall_score", 5.0))
|
||||
|
||||
feedback = "Tool Selection Evaluation:\n"
|
||||
feedback += f"• Relevance: {relevance}/10 - Selection of appropriate tool types for the task\n"
|
||||
feedback += f"• Coverage: {coverage}/10 - Selection of all necessary tool types\n"
|
||||
if "improvement_suggestions" in evaluation_data:
|
||||
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
|
||||
else:
|
||||
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
|
||||
|
||||
return EvaluationScore(
|
||||
score=overall_score,
|
||||
feedback=feedback,
|
||||
raw_response=response
|
||||
)
|
||||
except Exception as e:
|
||||
return EvaluationScore(
|
||||
score=None,
|
||||
feedback=f"Error evaluating tool selection: {e}",
|
||||
raw_response=response
|
||||
)
|
||||
|
||||
|
||||
class ParameterExtractionEvaluator(BaseEvaluator):
|
||||
@property
|
||||
def metric_category(self) -> MetricCategory:
|
||||
return MetricCategory.PARAMETER_EXTRACTION
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
agent: Agent,
|
||||
task: Task,
|
||||
execution_trace: Dict[str, Any],
|
||||
final_output: str,
|
||||
) -> EvaluationScore:
|
||||
tool_uses = execution_trace.get("tool_uses", [])
|
||||
tool_count = len(tool_uses)
|
||||
|
||||
if tool_count == 0:
|
||||
return EvaluationScore(
|
||||
score=None,
|
||||
feedback="No tool usage detected. Cannot evaluate parameter extraction."
|
||||
)
|
||||
|
||||
validation_errors = []
|
||||
for tool_use in tool_uses:
|
||||
if not tool_use.get("success", True) and tool_use.get("error_type") == "validation_error":
|
||||
validation_errors.append({
|
||||
"tool": tool_use.get("tool", "Unknown tool"),
|
||||
"error": tool_use.get("result"),
|
||||
"args": tool_use.get("args", {})
|
||||
})
|
||||
|
||||
validation_error_rate = len(validation_errors) / tool_count if tool_count > 0 else 0
|
||||
|
||||
param_samples = []
|
||||
for i, tool_use in enumerate(tool_uses[:5]):
|
||||
tool_name = tool_use.get("tool", "Unknown tool")
|
||||
tool_args = tool_use.get("args", {})
|
||||
success = tool_use.get("success", True) and not tool_use.get("error", False)
|
||||
error_type = tool_use.get("error_type", "") if not success else ""
|
||||
|
||||
is_validation_error = error_type == "validation_error"
|
||||
|
||||
sample = f"Tool use #{i+1} - {tool_name}:\n"
|
||||
sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n"
|
||||
sample += f"- Success: {'No' if not success else 'Yes'}"
|
||||
|
||||
if is_validation_error:
|
||||
sample += " (PARAMETER VALIDATION ERROR)\n"
|
||||
sample += f"- Error: {tool_use.get('result', 'Unknown error')}"
|
||||
elif not success:
|
||||
sample += f" (Other error: {error_type})\n"
|
||||
|
||||
param_samples.append(sample)
|
||||
|
||||
validation_errors_info = ""
|
||||
if validation_errors:
|
||||
validation_errors_info = f"\nParameter validation errors detected: {len(validation_errors)} ({validation_error_rate:.1%} of tool uses)\n"
|
||||
for i, err in enumerate(validation_errors[:3]):
|
||||
tool_name = err.get("tool", "Unknown tool")
|
||||
error_msg = err.get("error", "Unknown error")
|
||||
args = err.get("args", {})
|
||||
validation_errors_info += f"\nValidation Error #{i+1}:\n- Tool: {tool_name}\n- Args: {json.dumps(args, indent=2)}\n- Error: {error_msg}"
|
||||
|
||||
if len(validation_errors) > 3:
|
||||
validation_errors_info += f"\n...and {len(validation_errors) - 3} more validation errors."
|
||||
param_samples_text = "\n\n".join(param_samples)
|
||||
prompt = [
|
||||
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent extracts and formats PARAMETER VALUES for tool calls.
|
||||
|
||||
Your job is to evaluate ONLY whether the agent used the correct parameter VALUES, not whether the right tools were selected or how the tools were invoked.
|
||||
|
||||
Evaluate parameter extraction based on these criteria:
|
||||
1. Accuracy (0-10): Are parameter values correctly identified from the context/task?
|
||||
2. Formatting (0-10): Are values formatted correctly for each tool's requirements?
|
||||
3. Completeness (0-10): Are all required parameter values provided, with no missing information?
|
||||
|
||||
IMPORTANT: DO NOT evaluate:
|
||||
- Whether the right tool was chosen (that's the ToolSelectionEvaluator's job)
|
||||
- How the tools were structurally invoked (that's the ToolInvocationEvaluator's job)
|
||||
- The quality of results from tools
|
||||
|
||||
Focus ONLY on the PARAMETER VALUES - whether they were correctly extracted from the context, properly formatted, and complete.
|
||||
|
||||
Validation errors are important signals that parameter values weren't properly extracted or formatted.
|
||||
|
||||
Return your evaluation as JSON with these fields:
|
||||
- scores: {"accuracy": number, "formatting": number, "completeness": number}
|
||||
- overall_score: number (average of all scores, 0-10)
|
||||
- feedback: string (focused ONLY on parameter value extraction quality)
|
||||
- improvement_suggestions: string (concrete suggestions for better parameter VALUE extraction)
|
||||
"""},
|
||||
{"role": "user", "content": f"""
|
||||
Agent role: {agent.role}
|
||||
Task description: {task.description}
|
||||
|
||||
Parameter extraction examples:
|
||||
{param_samples_text}
|
||||
{validation_errors_info}
|
||||
|
||||
Evaluate the quality of the agent's parameter extraction for this task.
|
||||
"""}
|
||||
]
|
||||
|
||||
assert self.llm is not None
|
||||
response = self.llm.call(prompt)
|
||||
|
||||
try:
|
||||
evaluation_data = extract_json_from_llm_response(response)
|
||||
assert evaluation_data is not None
|
||||
|
||||
scores = evaluation_data.get("scores", {})
|
||||
accuracy = scores.get("accuracy", 5.0)
|
||||
formatting = scores.get("formatting", 5.0)
|
||||
completeness = scores.get("completeness", 5.0)
|
||||
|
||||
overall_score = float(evaluation_data.get("overall_score", 5.0))
|
||||
|
||||
feedback = "Parameter Extraction Evaluation:\n"
|
||||
feedback += f"• Accuracy: {accuracy}/10 - Correctly identifying required parameters\n"
|
||||
feedback += f"• Formatting: {formatting}/10 - Properly formatting parameters for tools\n"
|
||||
feedback += f"• Completeness: {completeness}/10 - Including all necessary information\n\n"
|
||||
|
||||
if "improvement_suggestions" in evaluation_data:
|
||||
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
|
||||
else:
|
||||
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
|
||||
|
||||
return EvaluationScore(
|
||||
score=overall_score,
|
||||
feedback=feedback,
|
||||
raw_response=response
|
||||
)
|
||||
except Exception as e:
|
||||
return EvaluationScore(
|
||||
score=None,
|
||||
feedback=f"Error evaluating parameter extraction: {e}",
|
||||
raw_response=response
|
||||
)
|
||||
|
||||
|
||||
class ToolInvocationEvaluator(BaseEvaluator):
|
||||
@property
|
||||
def metric_category(self) -> MetricCategory:
|
||||
return MetricCategory.TOOL_INVOCATION
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
agent: Agent,
|
||||
task: Task,
|
||||
execution_trace: Dict[str, Any],
|
||||
final_output: str,
|
||||
) -> EvaluationScore:
|
||||
tool_uses = execution_trace.get("tool_uses", [])
|
||||
tool_errors = []
|
||||
tool_count = len(tool_uses)
|
||||
|
||||
if tool_count == 0:
|
||||
return EvaluationScore(
|
||||
score=None,
|
||||
feedback="No tool usage detected. Cannot evaluate tool invocation."
|
||||
)
|
||||
|
||||
for tool_use in tool_uses:
|
||||
if not tool_use.get("success", True) or tool_use.get("error", False):
|
||||
error_info = {
|
||||
"tool": tool_use.get("tool", "Unknown tool"),
|
||||
"error": tool_use.get("result"),
|
||||
"error_type": tool_use.get("error_type", "unknown_error")
|
||||
}
|
||||
tool_errors.append(error_info)
|
||||
|
||||
error_rate = len(tool_errors) / tool_count if tool_count > 0 else 0
|
||||
|
||||
error_types = {}
|
||||
for error in tool_errors:
|
||||
error_type = error.get("error_type", "unknown_error")
|
||||
if error_type not in error_types:
|
||||
error_types[error_type] = 0
|
||||
error_types[error_type] += 1
|
||||
|
||||
invocation_samples = []
|
||||
for i, tool_use in enumerate(tool_uses[:5]):
|
||||
tool_name = tool_use.get("tool", "Unknown tool")
|
||||
tool_args = tool_use.get("args", {})
|
||||
success = tool_use.get("success", True) and not tool_use.get("error", False)
|
||||
error_type = tool_use.get("error_type", "") if not success else ""
|
||||
error_msg = tool_use.get("result", "No error") if not success else "No error"
|
||||
|
||||
sample = f"Tool invocation #{i+1}:\n"
|
||||
sample += f"- Tool: {tool_name}\n"
|
||||
sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n"
|
||||
sample += f"- Success: {'No' if not success else 'Yes'}\n"
|
||||
if not success:
|
||||
sample += f"- Error type: {error_type}\n"
|
||||
sample += f"- Error: {error_msg}"
|
||||
invocation_samples.append(sample)
|
||||
|
||||
error_type_summary = ""
|
||||
if error_types:
|
||||
error_type_summary = "Error type breakdown:\n"
|
||||
for error_type, count in error_types.items():
|
||||
error_type_summary += f"- {error_type}: {count} occurrences ({(count/tool_count):.1%})\n"
|
||||
|
||||
invocation_samples_text = "\n\n".join(invocation_samples)
|
||||
prompt = [
|
||||
{"role": "system", "content": """You are an expert evaluator assessing how correctly an AI agent's tool invocations are STRUCTURED.
|
||||
|
||||
Your job is to evaluate ONLY the structural and syntactical aspects of how the agent called tools, NOT which tools were selected or what parameter values were used.
|
||||
|
||||
Evaluate the agent's tool invocation based on these criteria:
|
||||
1. Structure (0-10): Does the tool call follow the expected syntax and format?
|
||||
2. Error Handling (0-10): Does the agent handle tool errors appropriately?
|
||||
3. Invocation Patterns (0-10): Are tool calls properly sequenced, batched, or managed?
|
||||
|
||||
Error types that indicate invocation issues:
|
||||
- execution_error: The tool was called correctly but failed during execution
|
||||
- usage_error: General errors in how the tool was used structurally
|
||||
|
||||
IMPORTANT: DO NOT evaluate:
|
||||
- Whether the right tool was chosen (that's the ToolSelectionEvaluator's job)
|
||||
- Whether the parameter values are correct (that's the ParameterExtractionEvaluator's job)
|
||||
- The quality of results from tools
|
||||
|
||||
Focus ONLY on HOW tools were invoked - the structure, format, and handling of the invocation process.
|
||||
|
||||
Return your evaluation as JSON with these fields:
|
||||
- scores: {"structure": number, "error_handling": number, "invocation_patterns": number}
|
||||
- overall_score: number (average of all scores, 0-10)
|
||||
- feedback: string (focused ONLY on structural aspects of tool invocation)
|
||||
- improvement_suggestions: string (concrete suggestions for better structuring of tool calls)
|
||||
"""},
|
||||
{"role": "user", "content": f"""
|
||||
Agent role: {agent.role}
|
||||
Task description: {task.description}
|
||||
|
||||
Tool invocation examples:
|
||||
{invocation_samples_text}
|
||||
|
||||
Tool error rate: {error_rate:.2%} ({len(tool_errors)} errors out of {tool_count} invocations)
|
||||
{error_type_summary}
|
||||
|
||||
Evaluate the quality of the agent's tool invocation structure during this task.
|
||||
"""}
|
||||
]
|
||||
|
||||
assert self.llm is not None
|
||||
response = self.llm.call(prompt)
|
||||
|
||||
try:
|
||||
evaluation_data = extract_json_from_llm_response(response)
|
||||
assert evaluation_data is not None
|
||||
scores = evaluation_data.get("scores", {})
|
||||
structure = scores.get("structure", 5.0)
|
||||
error_handling = scores.get("error_handling", 5.0)
|
||||
invocation_patterns = scores.get("invocation_patterns", 5.0)
|
||||
|
||||
overall_score = float(evaluation_data.get("overall_score", 5.0))
|
||||
|
||||
feedback = "Tool Invocation Evaluation:\n"
|
||||
feedback += f"• Structure: {structure}/10 - Following proper syntax and format\n"
|
||||
feedback += f"• Error Handling: {error_handling}/10 - Appropriately handling tool errors\n"
|
||||
feedback += f"• Invocation Patterns: {invocation_patterns}/10 - Proper sequencing and management of calls\n\n"
|
||||
|
||||
if "improvement_suggestions" in evaluation_data:
|
||||
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
|
||||
else:
|
||||
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
|
||||
|
||||
return EvaluationScore(
|
||||
score=overall_score,
|
||||
feedback=feedback,
|
||||
raw_response=response
|
||||
)
|
||||
except Exception as e:
|
||||
return EvaluationScore(
|
||||
score=None,
|
||||
feedback=f"Error evaluating tool invocation: {e}",
|
||||
raw_response=response
|
||||
)
|
||||
@@ -537,6 +537,7 @@ class LiteAgent(FlowTrackable, BaseModel):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallCompletedEvent(
|
||||
messages=self._messages,
|
||||
response=answer,
|
||||
call_type=LLMCallType.LLM_CALL,
|
||||
from_agent=self,
|
||||
|
||||
@@ -508,7 +508,6 @@ class LLM(BaseLLM):
|
||||
# Enable tool calls using streaming
|
||||
if "tool_calls" in delta:
|
||||
tool_calls = delta["tool_calls"]
|
||||
|
||||
if tool_calls:
|
||||
result = self._handle_streaming_tool_calls(
|
||||
tool_calls=tool_calls,
|
||||
@@ -517,6 +516,7 @@ class LLM(BaseLLM):
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
)
|
||||
|
||||
if result is not None:
|
||||
chunk_content = result
|
||||
|
||||
@@ -631,7 +631,7 @@ class LLM(BaseLLM):
|
||||
# Log token usage if available in streaming mode
|
||||
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
|
||||
# Emit completion event and return response
|
||||
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
|
||||
self._handle_emit_call_events(response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
|
||||
return full_response
|
||||
|
||||
# --- 9) Handle tool calls if present
|
||||
@@ -643,7 +643,7 @@ class LLM(BaseLLM):
|
||||
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
|
||||
|
||||
# --- 11) Emit completion event and return response
|
||||
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
|
||||
self._handle_emit_call_events(response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
|
||||
return full_response
|
||||
|
||||
except ContextWindowExceededError as e:
|
||||
@@ -655,7 +655,7 @@ class LLM(BaseLLM):
|
||||
logging.error(f"Error in streaming response: {str(e)}")
|
||||
if full_response.strip():
|
||||
logging.warning(f"Returning partial response despite error: {str(e)}")
|
||||
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
|
||||
self._handle_emit_call_events(response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
|
||||
return full_response
|
||||
|
||||
# Emit failed event and re-raise the exception
|
||||
@@ -809,7 +809,7 @@ class LLM(BaseLLM):
|
||||
|
||||
# --- 5) If no tool calls or no available functions, return the text response directly
|
||||
if not tool_calls or not available_functions:
|
||||
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task, from_agent)
|
||||
self._handle_emit_call_events(response=text_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
|
||||
return text_response
|
||||
|
||||
# --- 6) Handle tool calls if present
|
||||
@@ -818,7 +818,7 @@ class LLM(BaseLLM):
|
||||
return tool_result
|
||||
|
||||
# --- 7) If tool call handling didn't return a result, emit completion event and return text response
|
||||
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task, from_agent)
|
||||
self._handle_emit_call_events(response=text_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
|
||||
return text_response
|
||||
|
||||
def _handle_tool_call(
|
||||
@@ -861,6 +861,7 @@ class LLM(BaseLLM):
|
||||
tool_args=function_args,
|
||||
),
|
||||
)
|
||||
|
||||
result = fn(**function_args)
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
@@ -874,7 +875,7 @@ class LLM(BaseLLM):
|
||||
)
|
||||
|
||||
# --- 3.3) Emit success event
|
||||
self._handle_emit_call_events(result, LLMCallType.TOOL_CALL)
|
||||
self._handle_emit_call_events(response=result, call_type=LLMCallType.TOOL_CALL)
|
||||
return result
|
||||
except Exception as e:
|
||||
# --- 3.4) Handle execution errors
|
||||
@@ -991,17 +992,20 @@ class LLM(BaseLLM):
|
||||
logging.error(f"LiteLLM call failed: {str(e)}")
|
||||
raise
|
||||
|
||||
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None, from_agent: Optional[Any] = None):
|
||||
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None, from_agent: Optional[Any] = None, messages: str | list[dict[str, Any]] | None = None):
|
||||
"""Handle the events for the LLM call.
|
||||
|
||||
Args:
|
||||
response (str): The response from the LLM call.
|
||||
call_type (str): The type of call, either "tool_call" or "llm_call".
|
||||
from_task: Optional task object
|
||||
from_agent: Optional agent object
|
||||
messages: Optional messages object
|
||||
"""
|
||||
assert hasattr(crewai_event_bus, "emit")
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallCompletedEvent(response=response, call_type=call_type, from_task=from_task, from_agent=from_agent),
|
||||
event=LLMCallCompletedEvent(messages=messages, response=response, call_type=call_type, from_task=from_task, from_agent=from_agent),
|
||||
)
|
||||
|
||||
def _format_messages_for_provider(
|
||||
|
||||
@@ -155,6 +155,7 @@ class CrewEvaluator:
|
||||
)
|
||||
|
||||
console = Console()
|
||||
console.print("\n")
|
||||
console.print(table)
|
||||
|
||||
def evaluate(self, task_output: TaskOutput):
|
||||
|
||||
@@ -48,8 +48,8 @@ class LLMCallStartedEvent(LLMEventBase):
|
||||
"""
|
||||
|
||||
type: str = "llm_call_started"
|
||||
messages: Union[str, List[Dict[str, Any]]]
|
||||
tools: Optional[List[dict]] = None
|
||||
messages: Optional[Union[str, List[Dict[str, Any]]]] = None
|
||||
tools: Optional[List[dict[str, Any]]] = None
|
||||
callbacks: Optional[List[Any]] = None
|
||||
available_functions: Optional[Dict[str, Any]] = None
|
||||
|
||||
@@ -58,10 +58,10 @@ class LLMCallCompletedEvent(LLMEventBase):
|
||||
"""Event emitted when a LLM call completes"""
|
||||
|
||||
type: str = "llm_call_completed"
|
||||
messages: str | list[dict[str, Any]] | None = None
|
||||
response: Any
|
||||
call_type: LLMCallType
|
||||
|
||||
|
||||
class LLMCallFailedEvent(LLMEventBase):
|
||||
"""Event emitted when a LLM call fails"""
|
||||
|
||||
|
||||
File diff suppressed because one or more lines are too long
0
tests/evaluation/__init__.py
Normal file
0
tests/evaluation/__init__.py
Normal file
0
tests/evaluation/metrics/__init__.py
Normal file
0
tests/evaluation/metrics/__init__.py
Normal file
28
tests/evaluation/metrics/base_evaluation_metrics_test.py
Normal file
28
tests/evaluation/metrics/base_evaluation_metrics_test.py
Normal file
@@ -0,0 +1,28 @@
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
|
||||
class BaseEvaluationMetricsTest:
|
||||
@pytest.fixture
|
||||
def mock_agent(self):
|
||||
agent = MagicMock(spec=Agent)
|
||||
agent.id = "test_agent_id"
|
||||
agent.role = "Test Agent"
|
||||
agent.goal = "Test goal"
|
||||
agent.tools = []
|
||||
return agent
|
||||
|
||||
@pytest.fixture
|
||||
def mock_task(self):
|
||||
task = MagicMock(spec=Task)
|
||||
task.description = "Test task description"
|
||||
task.expected_output = "Test expected output"
|
||||
return task
|
||||
|
||||
@pytest.fixture
|
||||
def execution_trace(self):
|
||||
return {
|
||||
"thinking": ["I need to analyze this data carefully"],
|
||||
"actions": ["Gathered information", "Analyzed data"]
|
||||
}
|
||||
59
tests/evaluation/metrics/test_goal_metrics.py
Normal file
59
tests/evaluation/metrics/test_goal_metrics.py
Normal file
@@ -0,0 +1,59 @@
|
||||
from unittest.mock import patch, MagicMock
|
||||
from tests.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
|
||||
|
||||
from crewai.evaluation.base_evaluator import EvaluationScore
|
||||
from crewai.evaluation.metrics.goal_metrics import GoalAlignmentEvaluator
|
||||
from crewai.utilities.llm_utils import LLM
|
||||
|
||||
|
||||
class TestGoalAlignmentEvaluator(BaseEvaluationMetricsTest):
|
||||
@patch("crewai.utilities.llm_utils.create_llm")
|
||||
def test_evaluate_success(self, mock_create_llm, mock_agent, mock_task, execution_trace):
|
||||
mock_llm = MagicMock(spec=LLM)
|
||||
mock_llm.call.return_value = """
|
||||
{
|
||||
"score": 8.5,
|
||||
"feedback": "The agent correctly understood the task and produced relevant output."
|
||||
}
|
||||
"""
|
||||
mock_create_llm.return_value = mock_llm
|
||||
|
||||
evaluator = GoalAlignmentEvaluator(llm=mock_llm)
|
||||
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output="This is the final output"
|
||||
)
|
||||
|
||||
assert isinstance(result, EvaluationScore)
|
||||
assert result.score == 8.5
|
||||
assert "correctly understood the task" in result.feedback
|
||||
|
||||
mock_llm.call.assert_called_once()
|
||||
prompt = mock_llm.call.call_args[0][0]
|
||||
assert len(prompt) >= 2
|
||||
assert "system" in prompt[0]["role"]
|
||||
assert "user" in prompt[1]["role"]
|
||||
assert mock_agent.role in prompt[1]["content"]
|
||||
assert mock_task.description in prompt[1]["content"]
|
||||
|
||||
@patch("crewai.utilities.llm_utils.create_llm")
|
||||
def test_evaluate_error_handling(self, mock_create_llm, mock_agent, mock_task, execution_trace):
|
||||
mock_llm = MagicMock(spec=LLM)
|
||||
mock_llm.call.return_value = "Invalid JSON response"
|
||||
mock_create_llm.return_value = mock_llm
|
||||
|
||||
evaluator = GoalAlignmentEvaluator(llm=mock_llm)
|
||||
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output="This is the final output"
|
||||
)
|
||||
|
||||
assert isinstance(result, EvaluationScore)
|
||||
assert result.score is None
|
||||
assert "Failed to parse" in result.feedback
|
||||
166
tests/evaluation/metrics/test_reasoning_metrics.py
Normal file
166
tests/evaluation/metrics/test_reasoning_metrics.py
Normal file
@@ -0,0 +1,166 @@
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from typing import List, Dict, Any
|
||||
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.evaluation.metrics.reasoning_metrics import (
|
||||
ReasoningEfficiencyEvaluator,
|
||||
)
|
||||
from tests.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
|
||||
from crewai.utilities.llm_utils import LLM
|
||||
from crewai.evaluation.base_evaluator import EvaluationScore
|
||||
|
||||
class TestReasoningEfficiencyEvaluator(BaseEvaluationMetricsTest):
|
||||
@pytest.fixture
|
||||
def mock_output(self):
|
||||
output = MagicMock(spec=TaskOutput)
|
||||
output.raw = "This is the test output"
|
||||
return output
|
||||
|
||||
@pytest.fixture
|
||||
def llm_calls(self) -> List[Dict[str, Any]]:
|
||||
return [
|
||||
{
|
||||
"prompt": "How should I approach this task?",
|
||||
"response": "I'll first research the topic, then compile findings.",
|
||||
"timestamp": 1626987654
|
||||
},
|
||||
{
|
||||
"prompt": "What resources should I use?",
|
||||
"response": "I'll use relevant academic papers and reliable websites.",
|
||||
"timestamp": 1626987754
|
||||
},
|
||||
{
|
||||
"prompt": "How should I structure the output?",
|
||||
"response": "I'll organize information clearly with headings and bullet points.",
|
||||
"timestamp": 1626987854
|
||||
}
|
||||
]
|
||||
|
||||
def test_insufficient_llm_calls(self, mock_agent, mock_task, mock_output):
|
||||
execution_trace = {"llm_calls": []}
|
||||
|
||||
evaluator = ReasoningEfficiencyEvaluator()
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output=mock_output
|
||||
)
|
||||
|
||||
assert isinstance(result, EvaluationScore)
|
||||
assert result.score is None
|
||||
assert "Insufficient LLM calls" in result.feedback
|
||||
|
||||
@patch("crewai.utilities.llm_utils.create_llm")
|
||||
def test_successful_evaluation(self, mock_create_llm, mock_agent, mock_task, mock_output, llm_calls):
|
||||
mock_llm = MagicMock(spec=LLM)
|
||||
mock_llm.call.return_value = """
|
||||
{
|
||||
"scores": {
|
||||
"focus": 8.0,
|
||||
"progression": 7.0,
|
||||
"decision_quality": 7.5,
|
||||
"conciseness": 8.0,
|
||||
"loop_avoidance": 9.0
|
||||
},
|
||||
"overall_score": 7.9,
|
||||
"feedback": "The agent demonstrated good reasoning efficiency.",
|
||||
"optimization_suggestions": "The agent could improve by being more concise."
|
||||
}
|
||||
"""
|
||||
mock_create_llm.return_value = mock_llm
|
||||
|
||||
# Setup execution trace with sufficient LLM calls
|
||||
execution_trace = {"llm_calls": llm_calls}
|
||||
|
||||
# Mock the _detect_loops method to return a simple result
|
||||
evaluator = ReasoningEfficiencyEvaluator(llm=mock_llm)
|
||||
evaluator._detect_loops = MagicMock(return_value=(False, []))
|
||||
|
||||
# Evaluate
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output=mock_output
|
||||
)
|
||||
|
||||
# Assertions
|
||||
assert isinstance(result, EvaluationScore)
|
||||
assert result.score == 7.9
|
||||
assert "The agent demonstrated good reasoning efficiency" in result.feedback
|
||||
assert "Reasoning Efficiency Evaluation:" in result.feedback
|
||||
assert "• Focus: 8.0/10" in result.feedback
|
||||
|
||||
# Verify LLM was called
|
||||
mock_llm.call.assert_called_once()
|
||||
|
||||
@patch("crewai.utilities.llm_utils.create_llm")
|
||||
def test_parse_error_handling(self, mock_create_llm, mock_agent, mock_task, mock_output, llm_calls):
|
||||
mock_llm = MagicMock(spec=LLM)
|
||||
mock_llm.call.return_value = "Invalid JSON response"
|
||||
mock_create_llm.return_value = mock_llm
|
||||
|
||||
# Setup execution trace
|
||||
execution_trace = {"llm_calls": llm_calls}
|
||||
|
||||
# Mock the _detect_loops method
|
||||
evaluator = ReasoningEfficiencyEvaluator(llm=mock_llm)
|
||||
evaluator._detect_loops = MagicMock(return_value=(False, []))
|
||||
|
||||
# Evaluate
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output=mock_output
|
||||
)
|
||||
|
||||
# Assertions for error handling
|
||||
assert isinstance(result, EvaluationScore)
|
||||
assert result.score is None
|
||||
assert "Failed to parse reasoning efficiency evaluation" in result.feedback
|
||||
|
||||
@patch("crewai.utilities.llm_utils.create_llm")
|
||||
def test_loop_detection(self, mock_create_llm, mock_agent, mock_task, mock_output):
|
||||
# Setup LLM calls with a repeating pattern
|
||||
repetitive_llm_calls = [
|
||||
{"prompt": "How to solve?", "response": "I'll try method A", "timestamp": 1000},
|
||||
{"prompt": "Let me try method A", "response": "It didn't work", "timestamp": 1100},
|
||||
{"prompt": "How to solve?", "response": "I'll try method A again", "timestamp": 1200},
|
||||
{"prompt": "Let me try method A", "response": "It didn't work", "timestamp": 1300},
|
||||
{"prompt": "How to solve?", "response": "I'll try method A one more time", "timestamp": 1400}
|
||||
]
|
||||
|
||||
mock_llm = MagicMock(spec=LLM)
|
||||
mock_llm.call.return_value = """
|
||||
{
|
||||
"scores": {
|
||||
"focus": 6.0,
|
||||
"progression": 3.0,
|
||||
"decision_quality": 4.0,
|
||||
"conciseness": 6.0,
|
||||
"loop_avoidance": 2.0
|
||||
},
|
||||
"overall_score": 4.2,
|
||||
"feedback": "The agent is stuck in a reasoning loop.",
|
||||
"optimization_suggestions": "The agent should try different approaches when one fails."
|
||||
}
|
||||
"""
|
||||
mock_create_llm.return_value = mock_llm
|
||||
|
||||
execution_trace = {"llm_calls": repetitive_llm_calls}
|
||||
|
||||
evaluator = ReasoningEfficiencyEvaluator(llm=mock_llm)
|
||||
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output=mock_output
|
||||
)
|
||||
|
||||
assert isinstance(result, EvaluationScore)
|
||||
assert result.score == 4.2
|
||||
assert "• Loop Avoidance: 2.0/10" in result.feedback
|
||||
82
tests/evaluation/metrics/test_semantic_quality_metrics.py
Normal file
82
tests/evaluation/metrics/test_semantic_quality_metrics.py
Normal file
@@ -0,0 +1,82 @@
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from crewai.evaluation.base_evaluator import EvaluationScore
|
||||
from crewai.evaluation.metrics.semantic_quality_metrics import SemanticQualityEvaluator
|
||||
from tests.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
|
||||
from crewai.utilities.llm_utils import LLM
|
||||
|
||||
class TestSemanticQualityEvaluator(BaseEvaluationMetricsTest):
|
||||
@patch("crewai.utilities.llm_utils.create_llm")
|
||||
def test_evaluate_success(self, mock_create_llm, mock_agent, mock_task, execution_trace):
|
||||
mock_llm = MagicMock(spec=LLM)
|
||||
mock_llm.call.return_value = """
|
||||
{
|
||||
"score": 8.5,
|
||||
"feedback": "The output is clear, coherent, and logically structured."
|
||||
}
|
||||
"""
|
||||
mock_create_llm.return_value = mock_llm
|
||||
|
||||
evaluator = SemanticQualityEvaluator(llm=mock_llm)
|
||||
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output="This is a well-structured analysis of the data."
|
||||
)
|
||||
|
||||
assert isinstance(result, EvaluationScore)
|
||||
assert result.score == 8.5
|
||||
assert "clear, coherent" in result.feedback
|
||||
|
||||
mock_llm.call.assert_called_once()
|
||||
prompt = mock_llm.call.call_args[0][0]
|
||||
assert len(prompt) >= 2
|
||||
assert "system" in prompt[0]["role"]
|
||||
assert "user" in prompt[1]["role"]
|
||||
assert mock_agent.role in prompt[1]["content"]
|
||||
assert mock_task.description in prompt[1]["content"]
|
||||
|
||||
@patch("crewai.utilities.llm_utils.create_llm")
|
||||
def test_evaluate_with_empty_output(self, mock_create_llm, mock_agent, mock_task, execution_trace):
|
||||
mock_llm = MagicMock(spec=LLM)
|
||||
mock_llm.call.return_value = """
|
||||
{
|
||||
"score": 2.0,
|
||||
"feedback": "The output is empty or minimal, lacking substance."
|
||||
}
|
||||
"""
|
||||
mock_create_llm.return_value = mock_llm
|
||||
|
||||
evaluator = SemanticQualityEvaluator(llm=mock_llm)
|
||||
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output=""
|
||||
)
|
||||
|
||||
assert isinstance(result, EvaluationScore)
|
||||
assert result.score == 2.0
|
||||
assert "empty or minimal" in result.feedback
|
||||
|
||||
@patch("crewai.utilities.llm_utils.create_llm")
|
||||
def test_evaluate_error_handling(self, mock_create_llm, mock_agent, mock_task, execution_trace):
|
||||
mock_llm = MagicMock(spec=LLM)
|
||||
mock_llm.call.return_value = "Invalid JSON response"
|
||||
mock_create_llm.return_value = mock_llm
|
||||
|
||||
evaluator = SemanticQualityEvaluator(llm=mock_llm)
|
||||
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output="This is the output."
|
||||
)
|
||||
|
||||
assert isinstance(result, EvaluationScore)
|
||||
assert result.score is None
|
||||
assert "Failed to parse" in result.feedback
|
||||
230
tests/evaluation/metrics/test_tools_metrics.py
Normal file
230
tests/evaluation/metrics/test_tools_metrics.py
Normal file
@@ -0,0 +1,230 @@
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from crewai.evaluation.metrics.tools_metrics import (
|
||||
ToolSelectionEvaluator,
|
||||
ParameterExtractionEvaluator,
|
||||
ToolInvocationEvaluator
|
||||
)
|
||||
from crewai.utilities.llm_utils import LLM
|
||||
from tests.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
|
||||
|
||||
class TestToolSelectionEvaluator(BaseEvaluationMetricsTest):
|
||||
def test_no_tools_available(self, mock_task, mock_agent):
|
||||
# Create agent with no tools
|
||||
mock_agent.tools = []
|
||||
|
||||
execution_trace = {"tool_uses": []}
|
||||
|
||||
evaluator = ToolSelectionEvaluator()
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output="Final output"
|
||||
)
|
||||
|
||||
assert result.score is None
|
||||
assert "no tools available" in result.feedback.lower()
|
||||
|
||||
def test_tools_available_but_none_used(self, mock_agent, mock_task):
|
||||
mock_agent.tools = ["tool1", "tool2"]
|
||||
execution_trace = {"tool_uses": []}
|
||||
|
||||
evaluator = ToolSelectionEvaluator()
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output="Final output"
|
||||
)
|
||||
|
||||
assert result.score is None
|
||||
assert "had tools available but didn't use any" in result.feedback.lower()
|
||||
|
||||
@patch("crewai.utilities.llm_utils.create_llm")
|
||||
def test_successful_evaluation(self, mock_create_llm, mock_agent, mock_task):
|
||||
# Setup mock LLM response
|
||||
mock_llm = MagicMock(spec=LLM)
|
||||
mock_llm.call.return_value = """
|
||||
{
|
||||
"overall_score": 8.5,
|
||||
"feedback": "The agent made good tool selections."
|
||||
}
|
||||
"""
|
||||
mock_create_llm.return_value = mock_llm
|
||||
|
||||
# Setup execution trace with tool uses
|
||||
execution_trace = {
|
||||
"tool_uses": [
|
||||
{"tool": "search_tool", "input": {"query": "test query"}, "output": "search results"},
|
||||
{"tool": "calculator", "input": {"expression": "2+2"}, "output": "4"}
|
||||
]
|
||||
}
|
||||
|
||||
evaluator = ToolSelectionEvaluator(llm=mock_llm)
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output="Final output"
|
||||
)
|
||||
|
||||
assert result.score == 8.5
|
||||
assert "The agent made good tool selections" in result.feedback
|
||||
|
||||
# Verify LLM was called with correct prompt
|
||||
mock_llm.call.assert_called_once()
|
||||
prompt = mock_llm.call.call_args[0][0]
|
||||
assert isinstance(prompt, list)
|
||||
assert len(prompt) >= 2
|
||||
assert "system" in prompt[0]["role"]
|
||||
assert "user" in prompt[1]["role"]
|
||||
|
||||
|
||||
class TestParameterExtractionEvaluator(BaseEvaluationMetricsTest):
|
||||
def test_no_tool_uses(self, mock_agent, mock_task):
|
||||
execution_trace = {"tool_uses": []}
|
||||
|
||||
evaluator = ParameterExtractionEvaluator()
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output="Final output"
|
||||
)
|
||||
|
||||
assert result.score is None
|
||||
assert "no tool usage" in result.feedback.lower()
|
||||
|
||||
@patch("crewai.utilities.llm_utils.create_llm")
|
||||
def test_successful_evaluation(self, mock_create_llm, mock_agent, mock_task):
|
||||
mock_agent.tools = ["tool1", "tool2"]
|
||||
|
||||
# Setup mock LLM response
|
||||
mock_llm = MagicMock(spec=LLM)
|
||||
mock_llm.call.return_value = """
|
||||
{
|
||||
"overall_score": 9.0,
|
||||
"feedback": "The agent extracted parameters correctly."
|
||||
}
|
||||
"""
|
||||
mock_create_llm.return_value = mock_llm
|
||||
|
||||
# Setup execution trace with tool uses
|
||||
execution_trace = {
|
||||
"tool_uses": [
|
||||
{
|
||||
"tool": "search_tool",
|
||||
"input": {"query": "test query"},
|
||||
"output": "search results",
|
||||
"error": None
|
||||
},
|
||||
{
|
||||
"tool": "calculator",
|
||||
"input": {"expression": "2+2"},
|
||||
"output": "4",
|
||||
"error": None
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
evaluator = ParameterExtractionEvaluator(llm=mock_llm)
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output="Final output"
|
||||
)
|
||||
|
||||
assert result.score == 9.0
|
||||
assert "The agent extracted parameters correctly" in result.feedback
|
||||
|
||||
|
||||
class TestToolInvocationEvaluator(BaseEvaluationMetricsTest):
|
||||
def test_no_tool_uses(self, mock_agent, mock_task):
|
||||
execution_trace = {"tool_uses": []}
|
||||
|
||||
evaluator = ToolInvocationEvaluator()
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output="Final output"
|
||||
)
|
||||
|
||||
assert result.score is None
|
||||
assert "no tool usage" in result.feedback.lower()
|
||||
|
||||
@patch("crewai.utilities.llm_utils.create_llm")
|
||||
def test_successful_evaluation(self, mock_create_llm, mock_agent, mock_task):
|
||||
mock_agent.tools = ["tool1", "tool2"]
|
||||
# Setup mock LLM response
|
||||
mock_llm = MagicMock(spec=LLM)
|
||||
mock_llm.call.return_value = """
|
||||
{
|
||||
"overall_score": 8.0,
|
||||
"feedback": "The agent invoked tools correctly."
|
||||
}
|
||||
"""
|
||||
mock_create_llm.return_value = mock_llm
|
||||
|
||||
# Setup execution trace with tool uses
|
||||
execution_trace = {
|
||||
"tool_uses": [
|
||||
{"tool": "search_tool", "input": {"query": "test query"}, "output": "search results"},
|
||||
{"tool": "calculator", "input": {"expression": "2+2"}, "output": "4"}
|
||||
]
|
||||
}
|
||||
|
||||
evaluator = ToolInvocationEvaluator(llm=mock_llm)
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output="Final output"
|
||||
)
|
||||
|
||||
assert result.score == 8.0
|
||||
assert "The agent invoked tools correctly" in result.feedback
|
||||
|
||||
@patch("crewai.utilities.llm_utils.create_llm")
|
||||
def test_evaluation_with_errors(self, mock_create_llm, mock_agent, mock_task):
|
||||
mock_agent.tools = ["tool1", "tool2"]
|
||||
# Setup mock LLM response
|
||||
mock_llm = MagicMock(spec=LLM)
|
||||
mock_llm.call.return_value = """
|
||||
{
|
||||
"overall_score": 5.5,
|
||||
"feedback": "The agent had some errors in tool invocation."
|
||||
}
|
||||
"""
|
||||
mock_create_llm.return_value = mock_llm
|
||||
|
||||
# Setup execution trace with tool uses including errors
|
||||
execution_trace = {
|
||||
"tool_uses": [
|
||||
{
|
||||
"tool": "search_tool",
|
||||
"input": {"query": "test query"},
|
||||
"output": "search results",
|
||||
"error": None
|
||||
},
|
||||
{
|
||||
"tool": "calculator",
|
||||
"input": {"expression": "2+"},
|
||||
"output": None,
|
||||
"error": "Invalid expression"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
evaluator = ToolInvocationEvaluator(llm=mock_llm)
|
||||
result = evaluator.evaluate(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
execution_trace=execution_trace,
|
||||
final_output="Final output"
|
||||
)
|
||||
|
||||
assert result.score == 5.5
|
||||
assert "The agent had some errors in tool invocation" in result.feedback
|
||||
95
tests/evaluation/test_agent_evaluator.py
Normal file
95
tests/evaluation/test_agent_evaluator.py
Normal file
@@ -0,0 +1,95 @@
|
||||
import pytest
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
from crewai.crew import Crew
|
||||
from crewai.evaluation.agent_evaluator import AgentEvaluator
|
||||
from crewai.evaluation.base_evaluator import AgentEvaluationResult
|
||||
from crewai.evaluation import (
|
||||
GoalAlignmentEvaluator,
|
||||
SemanticQualityEvaluator,
|
||||
ToolSelectionEvaluator,
|
||||
ParameterExtractionEvaluator,
|
||||
ToolInvocationEvaluator,
|
||||
ReasoningEfficiencyEvaluator
|
||||
)
|
||||
|
||||
from crewai.evaluation import create_default_evaluator
|
||||
class TestAgentEvaluator:
|
||||
@pytest.fixture
|
||||
def mock_crew(self):
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Complete test tasks successfully",
|
||||
backstory="An agent created for testing purposes",
|
||||
allow_delegation=False,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task description",
|
||||
agent=agent,
|
||||
expected_output="Expected test output"
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task]
|
||||
)
|
||||
return crew
|
||||
|
||||
def test_set_iteration(self):
|
||||
agent_evaluator = AgentEvaluator()
|
||||
|
||||
agent_evaluator.set_iteration(3)
|
||||
assert agent_evaluator.iteration == 3
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_evaluate_current_iteration(self, mock_crew):
|
||||
agent_evaluator = AgentEvaluator(crew=mock_crew, evaluators=[GoalAlignmentEvaluator()])
|
||||
|
||||
mock_crew.kickoff()
|
||||
|
||||
results = agent_evaluator.evaluate_current_iteration()
|
||||
|
||||
assert isinstance(results, dict)
|
||||
|
||||
agent, = mock_crew.agents
|
||||
task, = mock_crew.tasks
|
||||
|
||||
assert len(mock_crew.agents) == 1
|
||||
assert agent.role in results
|
||||
assert len(results[agent.role]) == 1
|
||||
|
||||
result, = results[agent.role]
|
||||
assert isinstance(result, AgentEvaluationResult)
|
||||
|
||||
assert result.agent_id == str(agent.id)
|
||||
assert result.task_id == str(task.id)
|
||||
|
||||
goal_alignment, = result.metrics.values()
|
||||
assert goal_alignment.score == 5.0
|
||||
|
||||
expected_feedback = "The agent's output demonstrates an understanding of the need for a comprehensive document"
|
||||
assert expected_feedback in goal_alignment.feedback
|
||||
|
||||
assert goal_alignment.raw_response is not None
|
||||
assert '"score": 5' in goal_alignment.raw_response
|
||||
|
||||
def test_create_default_evaluator(self, mock_crew):
|
||||
agent_evaluator = create_default_evaluator(crew=mock_crew)
|
||||
assert isinstance(agent_evaluator, AgentEvaluator)
|
||||
assert agent_evaluator.crew == mock_crew
|
||||
|
||||
expected_types = [
|
||||
GoalAlignmentEvaluator,
|
||||
SemanticQualityEvaluator,
|
||||
ToolSelectionEvaluator,
|
||||
ParameterExtractionEvaluator,
|
||||
ToolInvocationEvaluator,
|
||||
ReasoningEfficiencyEvaluator
|
||||
]
|
||||
|
||||
assert len(agent_evaluator.evaluators) == len(expected_types)
|
||||
for evaluator, expected_type in zip(agent_evaluator.evaluators, expected_types):
|
||||
assert isinstance(evaluator, expected_type)
|
||||
@@ -601,7 +601,7 @@ def test_handle_streaming_tool_calls(get_weather_tool_schema, mock_emit):
|
||||
def test_handle_streaming_tool_calls_with_error(get_weather_tool_schema, mock_emit):
|
||||
def get_weather_error(location):
|
||||
raise Exception("Error")
|
||||
|
||||
|
||||
llm = LLM(model="openai/gpt-4o", stream=True)
|
||||
response = llm.call(
|
||||
messages=[
|
||||
@@ -619,7 +619,7 @@ def test_handle_streaming_tool_calls_with_error(get_weather_tool_schema, mock_em
|
||||
expected_stream_chunk=9,
|
||||
expected_completed_llm_call=1,
|
||||
expected_tool_usage_started=1,
|
||||
expected_tool_usage_error=1,
|
||||
expected_tool_usage_error=1,
|
||||
expected_final_chunk_result=expected_final_chunk_result,
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user