diff --git a/src/crewai/evaluation/__init__.py b/src/crewai/evaluation/__init__.py deleted file mode 100644 index 0c9f626b6..000000000 --- a/src/crewai/evaluation/__init__.py +++ /dev/null @@ -1,53 +0,0 @@ -from crewai.evaluation.base_evaluator import ( - BaseEvaluator, - EvaluationScore, - MetricCategory, - AgentEvaluationResult -) - -from crewai.evaluation.metrics.semantic_quality_metrics import ( - SemanticQualityEvaluator -) - -from crewai.evaluation.metrics.goal_metrics import ( - GoalAlignmentEvaluator -) - -from crewai.evaluation.metrics.reasoning_metrics import ( - ReasoningEfficiencyEvaluator -) - - -from crewai.evaluation.metrics.tools_metrics import ( - ToolSelectionEvaluator, - ParameterExtractionEvaluator, - ToolInvocationEvaluator -) - -from crewai.evaluation.evaluation_listener import ( - EvaluationTraceCallback, - create_evaluation_callbacks -) - - -from crewai.evaluation.agent_evaluator import ( - AgentEvaluator, - create_default_evaluator -) - -__all__ = [ - "BaseEvaluator", - "EvaluationScore", - "MetricCategory", - "AgentEvaluationResult", - "SemanticQualityEvaluator", - "GoalAlignmentEvaluator", - "ReasoningEfficiencyEvaluator", - "ToolSelectionEvaluator", - "ParameterExtractionEvaluator", - "ToolInvocationEvaluator", - "EvaluationTraceCallback", - "create_evaluation_callbacks", - "AgentEvaluator", - "create_default_evaluator" -] \ No newline at end of file diff --git a/src/crewai/evaluation/agent_evaluator.py b/src/crewai/evaluation/agent_evaluator.py deleted file mode 100644 index 430196d14..000000000 --- a/src/crewai/evaluation/agent_evaluator.py +++ /dev/null @@ -1,178 +0,0 @@ -from crewai.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy -from crewai.agent import Agent -from crewai.task import Task -from crewai.evaluation.evaluation_display import EvaluationDisplayFormatter - -from typing import Any, Dict -from collections import defaultdict -from crewai.evaluation import BaseEvaluator, create_evaluation_callbacks -from collections.abc import Sequence -from crewai.crew import Crew -from crewai.utilities.events.crewai_event_bus import crewai_event_bus -from crewai.utilities.events.utils.console_formatter import ConsoleFormatter - -class AgentEvaluator: - def __init__( - self, - evaluators: Sequence[BaseEvaluator] | None = None, - crew: Crew | None = None, - ): - self.crew: Crew | None = crew - self.evaluators: Sequence[BaseEvaluator] | None = evaluators - - self.agent_evaluators: dict[str, Sequence[BaseEvaluator] | None] = {} - if crew is not None: - assert crew and crew.agents is not None - for agent in crew.agents: - self.agent_evaluators[str(agent.id)] = self.evaluators - - self.callback = create_evaluation_callbacks() - self.console_formatter = ConsoleFormatter() - self.display_formatter = EvaluationDisplayFormatter() - - self.iteration = 1 - self.iterations_results: dict[int, dict[str, list[AgentEvaluationResult]]] = {} - - def set_iteration(self, iteration: int) -> None: - self.iteration = iteration - - def evaluate_current_iteration(self) -> dict[str, list[AgentEvaluationResult]]: - if not self.crew: - raise ValueError("Cannot evaluate: no crew was provided to the evaluator.") - - if not self.callback: - raise ValueError("Cannot evaluate: no callback was set. Use set_callback() method first.") - - from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn - evaluation_results: defaultdict[str, list[AgentEvaluationResult]] = defaultdict(list) - - total_evals = 0 - for agent in self.crew.agents: - for task in self.crew.tasks: - if task.agent and task.agent.id == agent.id and self.agent_evaluators.get(str(agent.id)): - total_evals += 1 - - with Progress( - SpinnerColumn(), - TextColumn("[bold blue]{task.description}[/bold blue]"), - BarColumn(), - TextColumn("{task.percentage:.0f}% completed"), - console=self.console_formatter.console - ) as progress: - eval_task = progress.add_task(f"Evaluating agents (iteration {self.iteration})...", total=total_evals) - - for agent in self.crew.agents: - evaluator = self.agent_evaluators.get(str(agent.id)) - if not evaluator: - continue - - for task in self.crew.tasks: - - if task.agent and str(task.agent.id) != str(agent.id): - continue - - trace = self.callback.get_trace(str(agent.id), str(task.id)) - if not trace: - self.console_formatter.print(f"[yellow]Warning: No trace found for agent {agent.role} on task {task.description[:30]}...[/yellow]") - progress.update(eval_task, advance=1) - continue - - with crewai_event_bus.scoped_handlers(): - result = self.evaluate( - agent=agent, - task=task, - execution_trace=trace, - final_output=task.output - ) - evaluation_results[agent.role].append(result) - progress.update(eval_task, advance=1) - - self.iterations_results[self.iteration] = evaluation_results - return evaluation_results - - def get_evaluation_results(self): - if self.iteration in self.iterations_results: - return self.iterations_results[self.iteration] - - return self.evaluate_current_iteration() - - def display_results_with_iterations(self): - self.display_formatter.display_summary_results(self.iterations_results) - - def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = False): - agent_results = {} - with crewai_event_bus.scoped_handlers(): - task_results = self.get_evaluation_results() - for agent_role, results in task_results.items(): - if not results: - continue - - agent_id = results[0].agent_id - - aggregated_result = self.display_formatter._aggregate_agent_results( - agent_id=agent_id, - agent_role=agent_role, - results=results, - strategy=strategy - ) - - agent_results[agent_role] = aggregated_result - - - if self.iteration == max(self.iterations_results.keys()): - self.display_results_with_iterations() - - if include_evaluation_feedback: - self.display_evaluation_with_feedback() - - return agent_results - - def display_evaluation_with_feedback(self): - self.display_formatter.display_evaluation_with_feedback(self.iterations_results) - - def evaluate( - self, - agent: Agent, - task: Task, - execution_trace: Dict[str, Any], - final_output: Any - ) -> AgentEvaluationResult: - result = AgentEvaluationResult( - agent_id=str(agent.id), - task_id=str(task.id) - ) - assert self.evaluators is not None - for evaluator in self.evaluators: - try: - score = evaluator.evaluate( - agent=agent, - task=task, - execution_trace=execution_trace, - final_output=final_output - ) - result.metrics[evaluator.metric_category] = score - except Exception as e: - self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}") - - return result - -def create_default_evaluator(crew, llm=None): - from crewai.evaluation import ( - GoalAlignmentEvaluator, - SemanticQualityEvaluator, - ToolSelectionEvaluator, - ParameterExtractionEvaluator, - ToolInvocationEvaluator, - ReasoningEfficiencyEvaluator - ) - - evaluators = [ - GoalAlignmentEvaluator(llm=llm), - SemanticQualityEvaluator(llm=llm), - ToolSelectionEvaluator(llm=llm), - ParameterExtractionEvaluator(llm=llm), - ToolInvocationEvaluator(llm=llm), - ReasoningEfficiencyEvaluator(llm=llm), - ] - - return AgentEvaluator(evaluators=evaluators, crew=crew) diff --git a/src/crewai/evaluation/base_evaluator.py b/src/crewai/evaluation/base_evaluator.py deleted file mode 100644 index b11c61973..000000000 --- a/src/crewai/evaluation/base_evaluator.py +++ /dev/null @@ -1,125 +0,0 @@ -import abc -import enum -from enum import Enum -from typing import Any, Dict, List, Optional - -from pydantic import BaseModel, Field - -from crewai.agent import Agent -from crewai.task import Task -from crewai.llm import BaseLLM -from crewai.utilities.llm_utils import create_llm - -class MetricCategory(enum.Enum): - GOAL_ALIGNMENT = "goal_alignment" - SEMANTIC_QUALITY = "semantic_quality" - REASONING_EFFICIENCY = "reasoning_efficiency" - TOOL_SELECTION = "tool_selection" - PARAMETER_EXTRACTION = "parameter_extraction" - TOOL_INVOCATION = "tool_invocation" - - def title(self): - return self.value.replace('_', ' ').title() - - -class EvaluationScore(BaseModel): - score: float | None = Field( - default=5.0, - description="Numeric score from 0-10 where 0 is worst and 10 is best, None if not applicable", - ge=0.0, - le=10.0 - ) - feedback: str = Field( - default="", - description="Detailed feedback explaining the evaluation score" - ) - raw_response: str | None = Field( - default=None, - description="Raw response from the evaluator (e.g., LLM)" - ) - - def __str__(self) -> str: - if self.score is None: - return f"Score: N/A - {self.feedback}" - return f"Score: {self.score:.1f}/10 - {self.feedback}" - - -class BaseEvaluator(abc.ABC): - def __init__(self, llm: BaseLLM | None = None): - self.llm: BaseLLM | None = create_llm(llm) - - @property - @abc.abstractmethod - def metric_category(self) -> MetricCategory: - pass - - @abc.abstractmethod - def evaluate( - self, - agent: Agent, - task: Task, - execution_trace: Dict[str, Any], - final_output: Any, - ) -> EvaluationScore: - pass - - -class AgentEvaluationResult(BaseModel): - agent_id: str = Field(description="ID of the evaluated agent") - task_id: str = Field(description="ID of the task that was executed") - metrics: Dict[MetricCategory, EvaluationScore] = Field( - default_factory=dict, - description="Evaluation scores for each metric category" - ) - - -class AggregationStrategy(Enum): - SIMPLE_AVERAGE = "simple_average" # Equal weight to all tasks - WEIGHTED_BY_COMPLEXITY = "weighted_by_complexity" # Weight by task complexity - BEST_PERFORMANCE = "best_performance" # Use best scores across tasks - WORST_PERFORMANCE = "worst_performance" # Use worst scores across tasks - - -class AgentAggregatedEvaluationResult(BaseModel): - agent_id: str = Field( - default="", - description="ID of the agent" - ) - agent_role: str = Field( - default="", - description="Role of the agent" - ) - task_count: int = Field( - default=0, - description="Number of tasks included in this aggregation" - ) - aggregation_strategy: AggregationStrategy = Field( - default=AggregationStrategy.SIMPLE_AVERAGE, - description="Strategy used for aggregation" - ) - metrics: Dict[MetricCategory, EvaluationScore] = Field( - default_factory=dict, - description="Aggregated metrics across all tasks" - ) - task_results: List[str] = Field( - default_factory=list, - description="IDs of tasks included in this aggregation" - ) - overall_score: Optional[float] = Field( - default=None, - description="Overall score for this agent" - ) - - def __str__(self) -> str: - result = f"Agent Evaluation: {self.agent_role}\n" - result += f"Strategy: {self.aggregation_strategy.value}\n" - result += f"Tasks evaluated: {self.task_count}\n" - - for category, score in self.metrics.items(): - result += f"\n\n- {category.value.upper()}: {score.score}/10\n" - - if score.feedback: - detailed_feedback = "\n ".join(score.feedback.split('\n')) - result += f" {detailed_feedback}\n" - - return result \ No newline at end of file diff --git a/src/crewai/evaluation/evaluation_display.py b/src/crewai/evaluation/evaluation_display.py deleted file mode 100644 index 0e30c53f0..000000000 --- a/src/crewai/evaluation/evaluation_display.py +++ /dev/null @@ -1,341 +0,0 @@ -from collections import defaultdict -from typing import Dict, Any, List -from rich.table import Table -from rich.box import HEAVY_EDGE, ROUNDED -from collections.abc import Sequence -from crewai.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy, AgentEvaluationResult, MetricCategory -from crewai.evaluation import EvaluationScore -from crewai.utilities.events.utils.console_formatter import ConsoleFormatter -from crewai.utilities.llm_utils import create_llm - -class EvaluationDisplayFormatter: - def __init__(self): - self.console_formatter = ConsoleFormatter() - - def display_evaluation_with_feedback(self, iterations_results: Dict[int, Dict[str, List[Any]]]): - if not iterations_results: - self.console_formatter.print("[yellow]No evaluation results to display[/yellow]") - return - - # Get all agent roles across all iterations - all_agent_roles: set[str] = set() - for iter_results in iterations_results.values(): - all_agent_roles.update(iter_results.keys()) - - for agent_role in sorted(all_agent_roles): - self.console_formatter.print(f"\n[bold cyan]Agent: {agent_role}[/bold cyan]") - - # Process each iteration - for iter_num, results in sorted(iterations_results.items()): - if agent_role not in results or not results[agent_role]: - continue - - agent_results = results[agent_role] - agent_id = agent_results[0].agent_id - - # Aggregate results for this agent in this iteration - aggregated_result = self._aggregate_agent_results( - agent_id=agent_id, - agent_role=agent_role, - results=agent_results, - ) - - # Display iteration header - self.console_formatter.print(f"\n[bold]Iteration {iter_num}[/bold]") - - # Create table for this iteration - table = Table(box=ROUNDED) - table.add_column("Metric", style="cyan") - table.add_column("Score (1-10)", justify="center") - table.add_column("Feedback", style="green") - - # Add metrics to table - if aggregated_result.metrics: - for metric, evaluation_score in aggregated_result.metrics.items(): - score = evaluation_score.score - - if isinstance(score, (int, float)): - if score >= 8.0: - score_text = f"[green]{score:.1f}[/green]" - elif score >= 6.0: - score_text = f"[cyan]{score:.1f}[/cyan]" - elif score >= 4.0: - score_text = f"[yellow]{score:.1f}[/yellow]" - else: - score_text = f"[red]{score:.1f}[/red]" - else: - score_text = "[dim]N/A[/dim]" - - table.add_section() - table.add_row( - metric.title(), - score_text, - evaluation_score.feedback or "" - ) - - if aggregated_result.overall_score is not None: - overall_score = aggregated_result.overall_score - if overall_score >= 8.0: - overall_color = "green" - elif overall_score >= 6.0: - overall_color = "cyan" - elif overall_score >= 4.0: - overall_color = "yellow" - else: - overall_color = "red" - - table.add_section() - table.add_row( - "Overall Score", - f"[{overall_color}]{overall_score:.1f}[/]", - "Overall agent evaluation score" - ) - - # Print the table for this iteration - self.console_formatter.print(table) - - def display_summary_results(self, iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]]): - if not iterations_results: - self.console_formatter.print("[yellow]No evaluation results to display[/yellow]") - return - - self.console_formatter.print("\n") - - table = Table(title="Agent Performance Scores \n (1-10 Higher is better)", box=HEAVY_EDGE) - - table.add_column("Agent/Metric", style="cyan") - - for iter_num in sorted(iterations_results.keys()): - run_label = f"Run {iter_num}" - table.add_column(run_label, justify="center") - - table.add_column("Avg. Total", justify="center") - - all_agent_roles: set[str] = set() - for results in iterations_results.values(): - all_agent_roles.update(results.keys()) - - for agent_role in sorted(all_agent_roles): - agent_scores_by_iteration = {} - agent_metrics_by_iteration = {} - - for iter_num, results in sorted(iterations_results.items()): - if agent_role not in results or not results[agent_role]: - continue - - agent_results = results[agent_role] - agent_id = agent_results[0].agent_id - - aggregated_result = self._aggregate_agent_results( - agent_id=agent_id, - agent_role=agent_role, - results=agent_results, - strategy=AggregationStrategy.SIMPLE_AVERAGE - ) - - valid_scores = [score.score for score in aggregated_result.metrics.values() - if score.score is not None] - if valid_scores: - avg_score = sum(valid_scores) / len(valid_scores) - agent_scores_by_iteration[iter_num] = avg_score - - agent_metrics_by_iteration[iter_num] = aggregated_result.metrics - - if not agent_scores_by_iteration: - continue - - avg_across_iterations = sum(agent_scores_by_iteration.values()) / len(agent_scores_by_iteration) - - row = [f"[bold]{agent_role}[/bold]"] - - for iter_num in sorted(iterations_results.keys()): - if iter_num in agent_scores_by_iteration: - score = agent_scores_by_iteration[iter_num] - if score >= 8.0: - color = "green" - elif score >= 6.0: - color = "cyan" - elif score >= 4.0: - color = "yellow" - else: - color = "red" - row.append(f"[bold {color}]{score:.1f}[/]") - else: - row.append("-") - - if avg_across_iterations >= 8.0: - color = "green" - elif avg_across_iterations >= 6.0: - color = "cyan" - elif avg_across_iterations >= 4.0: - color = "yellow" - else: - color = "red" - row.append(f"[bold {color}]{avg_across_iterations:.1f}[/]") - - table.add_row(*row) - - all_metrics: set[Any] = set() - for metrics in agent_metrics_by_iteration.values(): - all_metrics.update(metrics.keys()) - - for metric in sorted(all_metrics, key=lambda x: x.value): - metric_scores = [] - - row = [f" - {metric.title()}"] - - for iter_num in sorted(iterations_results.keys()): - if (iter_num in agent_metrics_by_iteration and - metric in agent_metrics_by_iteration[iter_num]): - metric_score = agent_metrics_by_iteration[iter_num][metric].score - if metric_score is not None: - metric_scores.append(metric_score) - if metric_score >= 8.0: - color = "green" - elif metric_score >= 6.0: - color = "cyan" - elif metric_score >= 4.0: - color = "yellow" - else: - color = "red" - row.append(f"[{color}]{metric_score:.1f}[/]") - else: - row.append("[dim]N/A[/dim]") - else: - row.append("-") - - if metric_scores: - avg = sum(metric_scores) / len(metric_scores) - if avg >= 8.0: - color = "green" - elif avg >= 6.0: - color = "cyan" - elif avg >= 4.0: - color = "yellow" - else: - color = "red" - row.append(f"[{color}]{avg:.1f}[/]") - else: - row.append("-") - - table.add_row(*row) - - table.add_row(*[""] * (len(sorted(iterations_results.keys())) + 2)) - - self.console_formatter.print(table) - self.console_formatter.print("\n") - - def _aggregate_agent_results( - self, - agent_id: str, - agent_role: str, - results: Sequence[AgentEvaluationResult], - strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, - ) -> AgentAggregatedEvaluationResult: - metrics_by_category: dict[MetricCategory, list[EvaluationScore]] = defaultdict(list) - - for result in results: - for metric_name, evaluation_score in result.metrics.items(): - metrics_by_category[metric_name].append(evaluation_score) - - aggregated_metrics: dict[MetricCategory, EvaluationScore] = {} - for category, scores in metrics_by_category.items(): - valid_scores = [s.score for s in scores if s.score is not None] - avg_score = sum(valid_scores) / len(valid_scores) if valid_scores else None - - feedbacks = [s.feedback for s in scores if s.feedback] - - feedback_summary = None - if feedbacks: - if len(feedbacks) > 1: - # Use the summarization method for multiple feedbacks - feedback_summary = self._summarize_feedbacks( - agent_role=agent_role, - metric=category.title(), - feedbacks=feedbacks, - scores=[s.score for s in scores], - strategy=strategy - ) - else: - feedback_summary = feedbacks[0] - - aggregated_metrics[category] = EvaluationScore( - score=avg_score, - feedback=feedback_summary - ) - - overall_score = None - if aggregated_metrics: - valid_scores = [m.score for m in aggregated_metrics.values() if m.score is not None] - if valid_scores: - overall_score = sum(valid_scores) / len(valid_scores) - - return AgentAggregatedEvaluationResult( - agent_id=agent_id, - agent_role=agent_role, - metrics=aggregated_metrics, - overall_score=overall_score, - task_count=len(results), - aggregation_strategy=strategy - ) - - def _summarize_feedbacks( - self, - agent_role: str, - metric: str, - feedbacks: List[str], - scores: List[float | None], - strategy: AggregationStrategy - ) -> str: - if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks): - return "\n\n".join([f"Feedback {i+1}: {fb}" for i, fb in enumerate(feedbacks)]) - - try: - llm = create_llm() - - formatted_feedbacks = [] - for i, (feedback, score) in enumerate(zip(feedbacks, scores)): - if len(feedback) > 500: - feedback = feedback[:500] + "..." - score_text = f"{score:.1f}" if score is not None else "N/A" - formatted_feedbacks.append(f"Feedback #{i+1} (Score: {score_text}):\n{feedback}") - - all_feedbacks = "\n\n" + "\n\n---\n\n".join(formatted_feedbacks) - - strategy_guidance = "" - if strategy == AggregationStrategy.BEST_PERFORMANCE: - strategy_guidance = "Focus on the highest-scoring aspects and strengths demonstrated." - elif strategy == AggregationStrategy.WORST_PERFORMANCE: - strategy_guidance = "Focus on areas that need improvement and common issues across tasks." - else: # Default/average strategies - strategy_guidance = "Provide a balanced analysis of strengths and weaknesses across all tasks." - - prompt = [ - {"role": "system", "content": f"""You are an expert evaluator creating a comprehensive summary of agent performance feedback. - Your job is to synthesize multiple feedback points about the same metric across different tasks. - - Create a concise, insightful summary that captures the key patterns and themes from all feedback. - {strategy_guidance} - - Your summary should be: - 1. Specific and concrete (not vague or general) - 2. Focused on actionable insights - 3. Highlighting patterns across tasks - 4. 150-250 words in length - - The summary should be directly usable as final feedback for the agent's performance on this metric."""}, - {"role": "user", "content": f"""I need a synthesized summary of the following feedback for: - - Agent Role: {agent_role} - Metric: {metric.title()} - - {all_feedbacks} - """} - ] - assert llm is not None - response = llm.call(prompt) - - return response - - except Exception: - return "Synthesized from multiple tasks: " + "\n\n".join([f"- {fb[:500]}..." for fb in feedbacks]) diff --git a/src/crewai/evaluation/evaluation_listener.py b/src/crewai/evaluation/evaluation_listener.py deleted file mode 100644 index 8fba03e14..000000000 --- a/src/crewai/evaluation/evaluation_listener.py +++ /dev/null @@ -1,190 +0,0 @@ -from datetime import datetime -from typing import Any, Dict, Optional - -from collections.abc import Sequence - -from crewai.agent import Agent -from crewai.task import Task -from crewai.utilities.events.base_event_listener import BaseEventListener -from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus -from crewai.utilities.events.agent_events import ( - AgentExecutionStartedEvent, - AgentExecutionCompletedEvent -) -from crewai.utilities.events.tool_usage_events import ( - ToolUsageFinishedEvent, - ToolUsageErrorEvent, - ToolExecutionErrorEvent, - ToolSelectionErrorEvent, - ToolValidateInputErrorEvent -) -from crewai.utilities.events.llm_events import ( - LLMCallStartedEvent, - LLMCallCompletedEvent -) - -class EvaluationTraceCallback(BaseEventListener): - """Event listener for collecting execution traces for evaluation. - - This listener attaches to the event bus to collect detailed information - about the execution process, including agent steps, tool uses, knowledge - retrievals, and final output - all for use in agent evaluation. - """ - - _instance = None - - def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._initialized = False - return cls._instance - - def __init__(self): - if not hasattr(self, "_initialized") or not self._initialized: - super().__init__() - self.traces = {} - self.current_agent_id = None - self.current_task_id = None - self._initialized = True - - def setup_listeners(self, event_bus: CrewAIEventsBus): - @event_bus.on(AgentExecutionStartedEvent) - def on_agent_started(source, event: AgentExecutionStartedEvent): - self.on_agent_start(event.agent, event.task) - - @event_bus.on(AgentExecutionCompletedEvent) - def on_agent_completed(source, event: AgentExecutionCompletedEvent): - self.on_agent_finish(event.agent, event.task, event.output) - - @event_bus.on(ToolUsageFinishedEvent) - def on_tool_completed(source, event: ToolUsageFinishedEvent): - self.on_tool_use(event.tool_name, event.tool_args, event.output, success=True) - - @event_bus.on(ToolUsageErrorEvent) - def on_tool_usage_error(source, event: ToolUsageErrorEvent): - self.on_tool_use(event.tool_name, event.tool_args, event.error, - success=False, error_type="usage_error") - - @event_bus.on(ToolExecutionErrorEvent) - def on_tool_execution_error(source, event: ToolExecutionErrorEvent): - self.on_tool_use(event.tool_name, event.tool_args, event.error, - success=False, error_type="execution_error") - - @event_bus.on(ToolSelectionErrorEvent) - def on_tool_selection_error(source, event: ToolSelectionErrorEvent): - self.on_tool_use(event.tool_name, event.tool_args, event.error, - success=False, error_type="selection_error") - - @event_bus.on(ToolValidateInputErrorEvent) - def on_tool_validate_input_error(source, event: ToolValidateInputErrorEvent): - self.on_tool_use(event.tool_name, event.tool_args, event.error, - success=False, error_type="validation_error") - - @event_bus.on(LLMCallStartedEvent) - def on_llm_call_started(source, event: LLMCallStartedEvent): - self.on_llm_call_start(event.messages, event.tools) - - @event_bus.on(LLMCallCompletedEvent) - def on_llm_call_completed(source, event: LLMCallCompletedEvent): - self.on_llm_call_end(event.messages, event.response) - - def on_agent_start(self, agent: Agent, task: Task): - self.current_agent_id = agent.id - self.current_task_id = task.id - - trace_key = f"{agent.id}_{task.id}" - self.traces[trace_key] = { - "agent_id": agent.id, - "task_id": task.id, - "tool_uses": [], - "llm_calls": [], - "start_time": datetime.now(), - "final_output": None - } - - def on_agent_finish(self, agent: Agent, task: Task, output: Any): - trace_key = f"{agent.id}_{task.id}" - if trace_key in self.traces: - self.traces[trace_key]["final_output"] = output - self.traces[trace_key]["end_time"] = datetime.now() - - self.current_agent_id = None - self.current_task_id = None - - def on_tool_use(self, tool_name: str, tool_args: dict[str, Any] | str, result: Any, - success: bool = True, error_type: str | None = None): - if not self.current_agent_id or not self.current_task_id: - return - - trace_key = f"{self.current_agent_id}_{self.current_task_id}" - if trace_key in self.traces: - tool_use = { - "tool": tool_name, - "args": tool_args, - "result": result, - "success": success, - "timestamp": datetime.now() - } - - # Add error information if applicable - if not success and error_type: - tool_use["error"] = True - tool_use["error_type"] = error_type - - self.traces[trace_key]["tool_uses"].append(tool_use) - - def on_llm_call_start(self, messages: str | Sequence[dict[str, Any]] | None, tools: Sequence[dict[str, Any]] | None = None): - if not self.current_agent_id or not self.current_task_id: - return - - trace_key = f"{self.current_agent_id}_{self.current_task_id}" - if trace_key not in self.traces: - return - - self.current_llm_call = { - "messages": messages, - "tools": tools, - "start_time": datetime.now(), - "response": None, - "end_time": None - } - - def on_llm_call_end(self, messages: str | list[dict[str, Any]] | None, response: Any): - if not self.current_agent_id or not self.current_task_id: - return - - trace_key = f"{self.current_agent_id}_{self.current_task_id}" - if trace_key not in self.traces: - return - - total_tokens = 0 - if hasattr(response, "usage") and hasattr(response.usage, "total_tokens"): - total_tokens = response.usage.total_tokens - - current_time = datetime.now() - start_time = None - if hasattr(self, "current_llm_call") and self.current_llm_call: - start_time = self.current_llm_call.get("start_time") - - if not start_time: - start_time = current_time - llm_call = { - "messages": messages, - "response": response, - "start_time": start_time, - "end_time": current_time, - "total_tokens": total_tokens - } - - self.traces[trace_key]["llm_calls"].append(llm_call) - - if hasattr(self, "current_llm_call"): - self.current_llm_call = {} - - def get_trace(self, agent_id: str, task_id: str) -> Optional[Dict[str, Any]]: - trace_key = f"{agent_id}_{task_id}" - return self.traces.get(trace_key) - - -def create_evaluation_callbacks() -> EvaluationTraceCallback: - return EvaluationTraceCallback() \ No newline at end of file diff --git a/src/crewai/evaluation/experiment/testing.py b/src/crewai/evaluation/experiment/testing.py deleted file mode 100644 index 11de59a80..000000000 --- a/src/crewai/evaluation/experiment/testing.py +++ /dev/null @@ -1,49 +0,0 @@ -import warnings -from crewai.experimental.evaluation import ExperimentResults - -def assert_experiment_successfully(experiment_results: ExperimentResults) -> None: - """ - Assert that all experiment results passed successfully. - - Args: - experiment_results: The experiment results to check - - Raises: - AssertionError: If any test case failed - """ - failed_tests = [result for result in experiment_results.results if not result.passed] - - if failed_tests: - detailed_failures: list[str] = [] - - for result in failed_tests: - expected = result.expected_score - actual = result.score - detailed_failures.append(f"- {result.identifier}: expected {expected}, got {actual}") - - failure_details = "\n".join(detailed_failures) - raise AssertionError(f"The following test cases failed:\n{failure_details}") - -def assert_experiment_no_regression(comparison_result: dict[str, list[str]]) -> None: - """ - Assert that there are no regressions in the experiment results compared to baseline. - Also warns if there are missing tests. - - Args: - comparison_result: The result from compare_with_baseline() - - Raises: - AssertionError: If there are regressions - """ - # Check for regressions - regressed = comparison_result.get("regressed", []) - if regressed: - raise AssertionError(f"Regression detected! The following tests that previously passed now fail: {regressed}") - - # Check for missing tests and warn - missing_tests = comparison_result.get("missing_tests", []) - if missing_tests: - warnings.warn( - f"Warning: {len(missing_tests)} tests from the baseline are missing in the current run: {missing_tests}", - UserWarning - ) \ No newline at end of file diff --git a/src/crewai/evaluation/json_parser.py b/src/crewai/evaluation/json_parser.py deleted file mode 100644 index ce7303cde..000000000 --- a/src/crewai/evaluation/json_parser.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Robust JSON parsing utilities for evaluation responses.""" - -import json -import re -from typing import Any - - -def extract_json_from_llm_response(text: str) -> dict[str, Any]: - try: - return json.loads(text) - except json.JSONDecodeError: - pass - - json_patterns = [ - # Standard markdown code blocks with json - r'```json\s*([\s\S]*?)\s*```', - # Code blocks without language specifier - r'```\s*([\s\S]*?)\s*```', - # Inline code with JSON - r'`([{\\[].*[}\]])`', - ] - - for pattern in json_patterns: - matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL) - for match in matches: - try: - return json.loads(match.strip()) - except json.JSONDecodeError: - continue - raise ValueError("No valid JSON found in the response") diff --git a/src/crewai/evaluation/metrics/__init__.py b/src/crewai/evaluation/metrics/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/crewai/evaluation/metrics/goal_metrics.py b/src/crewai/evaluation/metrics/goal_metrics.py deleted file mode 100644 index bc6c63801..000000000 --- a/src/crewai/evaluation/metrics/goal_metrics.py +++ /dev/null @@ -1,66 +0,0 @@ -from typing import Any, Dict - -from crewai.agent import Agent -from crewai.task import Task - -from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory -from crewai.evaluation.json_parser import extract_json_from_llm_response - -class GoalAlignmentEvaluator(BaseEvaluator): - @property - def metric_category(self) -> MetricCategory: - return MetricCategory.GOAL_ALIGNMENT - - def evaluate( - self, - agent: Agent, - task: Task, - execution_trace: Dict[str, Any], - final_output: Any, - ) -> EvaluationScore: - prompt = [ - {"role": "system", "content": """You are an expert evaluator assessing how well an AI agent's output aligns with its assigned task goal. - -Score the agent's goal alignment on a scale from 0-10 where: -- 0: Complete misalignment, agent did not understand or attempt the task goal -- 5: Partial alignment, agent attempted the task but missed key requirements -- 10: Perfect alignment, agent fully satisfied all task requirements - -Consider: -1. Did the agent correctly interpret the task goal? -2. Did the final output directly address the requirements? -3. Did the agent focus on relevant aspects of the task? -4. Did the agent provide all requested information or deliverables? - -Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string). -"""}, - {"role": "user", "content": f""" -Agent role: {agent.role} -Agent goal: {agent.goal} -Task description: {task.description} -Expected output: {task.expected_output} - -Agent's final output: -{final_output} - -Evaluate how well the agent's output aligns with the assigned task goal. -"""} - ] - assert self.llm is not None - response = self.llm.call(prompt) - - try: - evaluation_data: dict[str, Any] = extract_json_from_llm_response(response) - assert evaluation_data is not None - - return EvaluationScore( - score=evaluation_data.get("score", 0), - feedback=evaluation_data.get("feedback", response), - raw_response=response - ) - except Exception: - return EvaluationScore( - score=None, - feedback=f"Failed to parse evaluation. Raw response: {response}", - raw_response=response - ) diff --git a/src/crewai/evaluation/metrics/reasoning_metrics.py b/src/crewai/evaluation/metrics/reasoning_metrics.py deleted file mode 100644 index e1ce06c23..000000000 --- a/src/crewai/evaluation/metrics/reasoning_metrics.py +++ /dev/null @@ -1,355 +0,0 @@ -"""Agent reasoning efficiency evaluators. - -This module provides evaluator implementations for: -- Reasoning efficiency -- Loop detection -- Thinking-to-action ratio -""" - -import logging -import re -from enum import Enum -from typing import Any, Dict, List, Tuple -import numpy as np -from collections.abc import Sequence - -from crewai.agent import Agent -from crewai.task import Task - -from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory -from crewai.evaluation.json_parser import extract_json_from_llm_response -from crewai.tasks.task_output import TaskOutput - -class ReasoningPatternType(Enum): - EFFICIENT = "efficient" # Good reasoning flow - LOOP = "loop" # Agent is stuck in a loop - VERBOSE = "verbose" # Agent is unnecessarily verbose - INDECISIVE = "indecisive" # Agent struggles to make decisions - SCATTERED = "scattered" # Agent jumps between topics without focus - - -class ReasoningEfficiencyEvaluator(BaseEvaluator): - @property - def metric_category(self) -> MetricCategory: - return MetricCategory.REASONING_EFFICIENCY - - def evaluate( - self, - agent: Agent, - task: Task, - execution_trace: Dict[str, Any], - final_output: TaskOutput, - ) -> EvaluationScore: - llm_calls = execution_trace.get("llm_calls", []) - - if not llm_calls or len(llm_calls) < 2: - return EvaluationScore( - score=None, - feedback="Insufficient LLM calls to evaluate reasoning efficiency." - ) - - total_calls = len(llm_calls) - total_tokens = sum(call.get("total_tokens", 0) for call in llm_calls) - avg_tokens_per_call = total_tokens / total_calls if total_calls > 0 else 0 - time_intervals = [] - has_reliable_timing = True - for i in range(1, len(llm_calls)): - start_time = llm_calls[i-1].get("end_time") - end_time = llm_calls[i].get("start_time") - if start_time and end_time and start_time != end_time: - try: - interval = end_time - start_time - time_intervals.append(interval.total_seconds() if hasattr(interval, 'total_seconds') else 0) - except Exception: - has_reliable_timing = False - else: - has_reliable_timing = False - - loop_detected, loop_details = self._detect_loops(llm_calls) - pattern_analysis = self._analyze_reasoning_patterns(llm_calls) - - efficiency_metrics = { - "total_llm_calls": total_calls, - "total_tokens": total_tokens, - "avg_tokens_per_call": avg_tokens_per_call, - "reasoning_pattern": pattern_analysis["primary_pattern"].value, - "loops_detected": loop_detected, - } - - if has_reliable_timing and time_intervals: - efficiency_metrics["avg_time_between_calls"] = np.mean(time_intervals) - - loop_info = f"Detected {len(loop_details)} potential reasoning loops." if loop_detected else "No significant reasoning loops detected." - - call_samples = self._get_call_samples(llm_calls) - - prompt = [ - {"role": "system", "content": """You are an expert evaluator assessing the reasoning efficiency of an AI agent's thought process. - -Evaluate the agent's reasoning efficiency across these five key subcategories: - -1. Focus (0-10): How well the agent stays on topic and avoids unnecessary tangents -2. Progression (0-10): How effectively the agent builds on previous thoughts rather than repeating or circling -3. Decision Quality (0-10): How decisively and appropriately the agent makes decisions -4. Conciseness (0-10): How efficiently the agent communicates without unnecessary verbosity -5. Loop Avoidance (0-10): How well the agent avoids getting stuck in repetitive thinking patterns - -For each subcategory, provide a score from 0-10 where: -- 0: Completely inefficient -- 5: Moderately efficient -- 10: Highly efficient - -The overall score should be a weighted average of these subcategories. - -Return your evaluation as JSON with the following structure: -{ - "overall_score": float, - "scores": { - "focus": float, - "progression": float, - "decision_quality": float, - "conciseness": float, - "loop_avoidance": float - }, - "feedback": string (general feedback about overall reasoning efficiency), - "optimization_suggestions": string (concrete suggestions for improving reasoning efficiency), - "detected_patterns": string (describe any inefficient reasoning patterns you observe) -}"""}, - {"role": "user", "content": f""" -Agent role: {agent.role} -Task description: {task.description} - -Reasoning efficiency metrics: -- Total LLM calls: {efficiency_metrics["total_llm_calls"]} -- Average tokens per call: {efficiency_metrics["avg_tokens_per_call"]:.1f} -- Primary reasoning pattern: {efficiency_metrics["reasoning_pattern"]} -- {loop_info} -{"- Average time between calls: {:.2f} seconds".format(efficiency_metrics.get("avg_time_between_calls", 0)) if "avg_time_between_calls" in efficiency_metrics else ""} - -Sample of agent reasoning flow (chronological sequence): -{call_samples} - -Agent's final output: -{final_output.raw[:500]}... (truncated) - -Evaluate the reasoning efficiency of this agent based on these interaction patterns. -Identify any inefficient reasoning patterns and provide specific suggestions for optimization. -"""} - ] - - assert self.llm is not None - response = self.llm.call(prompt) - - try: - evaluation_data = extract_json_from_llm_response(response) - - scores = evaluation_data.get("scores", {}) - focus = scores.get("focus", 5.0) - progression = scores.get("progression", 5.0) - decision_quality = scores.get("decision_quality", 5.0) - conciseness = scores.get("conciseness", 5.0) - loop_avoidance = scores.get("loop_avoidance", 5.0) - - overall_score = evaluation_data.get("overall_score", evaluation_data.get("score", 5.0)) - feedback = evaluation_data.get("feedback", "No detailed feedback provided.") - optimization_suggestions = evaluation_data.get("optimization_suggestions", "No specific suggestions provided.") - - detailed_feedback = "Reasoning Efficiency Evaluation:\n" - detailed_feedback += f"• Focus: {focus}/10 - Staying on topic without tangents\n" - detailed_feedback += f"• Progression: {progression}/10 - Building on previous thinking\n" - detailed_feedback += f"• Decision Quality: {decision_quality}/10 - Making appropriate decisions\n" - detailed_feedback += f"• Conciseness: {conciseness}/10 - Communicating efficiently\n" - detailed_feedback += f"• Loop Avoidance: {loop_avoidance}/10 - Avoiding repetitive patterns\n\n" - - detailed_feedback += f"Feedback:\n{feedback}\n\n" - detailed_feedback += f"Optimization Suggestions:\n{optimization_suggestions}" - - return EvaluationScore( - score=float(overall_score), - feedback=detailed_feedback, - raw_response=response - ) - except Exception as e: - logging.warning(f"Failed to parse reasoning efficiency evaluation: {e}") - return EvaluationScore( - score=None, - feedback=f"Failed to parse reasoning efficiency evaluation. Raw response: {response[:200]}...", - raw_response=response - ) - - def _detect_loops(self, llm_calls: List[Dict]) -> Tuple[bool, List[Dict]]: - loop_details = [] - - messages = [] - for call in llm_calls: - content = call.get("response", "") - if isinstance(content, str): - messages.append(content) - elif isinstance(content, list) and len(content) > 0: - # Handle message list format - for msg in content: - if isinstance(msg, dict) and "content" in msg: - messages.append(msg["content"]) - - # Simple n-gram based similarity detection - # For a more robust implementation, consider using embedding-based similarity - for i in range(len(messages) - 2): - for j in range(i + 1, len(messages) - 1): - # Check for repeated patterns (simplistic approach) - # A more sophisticated approach would use semantic similarity - similarity = self._calculate_text_similarity(messages[i], messages[j]) - if similarity > 0.7: # Arbitrary threshold - loop_details.append({ - "first_occurrence": i, - "second_occurrence": j, - "similarity": similarity, - "snippet": messages[i][:100] + "..." - }) - - return len(loop_details) > 0, loop_details - - def _calculate_text_similarity(self, text1: str, text2: str) -> float: - text1 = re.sub(r'\s+', ' ', text1.lower()).strip() - text2 = re.sub(r'\s+', ' ', text2.lower()).strip() - - # Simple Jaccard similarity on word sets - words1 = set(text1.split()) - words2 = set(text2.split()) - - intersection = len(words1.intersection(words2)) - union = len(words1.union(words2)) - - return intersection / union if union > 0 else 0.0 - - def _analyze_reasoning_patterns(self, llm_calls: List[Dict]) -> Dict[str, Any]: - call_lengths = [] - response_times = [] - - for call in llm_calls: - content = call.get("response", "") - if isinstance(content, str): - call_lengths.append(len(content)) - elif isinstance(content, list) and len(content) > 0: - # Handle message list format - total_length = 0 - for msg in content: - if isinstance(msg, dict) and "content" in msg: - total_length += len(msg["content"]) - call_lengths.append(total_length) - - start_time = call.get("start_time") - end_time = call.get("end_time") - if start_time and end_time: - try: - response_times.append(end_time - start_time) - except Exception: - pass - - avg_length = np.mean(call_lengths) if call_lengths else 0 - std_length = np.std(call_lengths) if call_lengths else 0 - length_trend = self._calculate_trend(call_lengths) - - primary_pattern = ReasoningPatternType.EFFICIENT - details = "Agent demonstrates efficient reasoning patterns." - - loop_score = self._calculate_loop_likelihood(call_lengths, response_times) - if loop_score > 0.7: - primary_pattern = ReasoningPatternType.LOOP - details = "Agent appears to be stuck in repetitive thinking patterns." - elif avg_length > 1000 and std_length / avg_length < 0.3: - primary_pattern = ReasoningPatternType.VERBOSE - details = "Agent is consistently verbose across interactions." - elif len(llm_calls) > 10 and length_trend > 0.5: - primary_pattern = ReasoningPatternType.INDECISIVE - details = "Agent shows signs of indecisiveness with increasing message lengths." - elif std_length / avg_length > 0.8: - primary_pattern = ReasoningPatternType.SCATTERED - details = "Agent shows inconsistent reasoning flow with highly variable responses." - - return { - "primary_pattern": primary_pattern, - "details": details, - "metrics": { - "avg_length": avg_length, - "std_length": std_length, - "length_trend": length_trend, - "loop_score": loop_score - } - } - - def _calculate_trend(self, values: Sequence[float | int]) -> float: - if not values or len(values) < 2: - return 0.0 - - try: - x = np.arange(len(values)) - y = np.array(values) - - # Simple linear regression - slope = np.polyfit(x, y, 1)[0] - - # Normalize slope to -1 to 1 range - max_possible_slope = max(values) - min(values) - if max_possible_slope > 0: - normalized_slope = slope / max_possible_slope - return max(min(normalized_slope, 1.0), -1.0) - return 0.0 - except Exception: - return 0.0 - - def _calculate_loop_likelihood(self, call_lengths: Sequence[float], response_times: Sequence[float]) -> float: - if not call_lengths or len(call_lengths) < 3: - return 0.0 - - indicators = [] - - if len(call_lengths) >= 4: - repeated_lengths = 0 - for i in range(len(call_lengths) - 2): - ratio = call_lengths[i] / call_lengths[i + 2] if call_lengths[i + 2] > 0 else 0 - if 0.85 <= ratio <= 1.15: - repeated_lengths += 1 - - length_repetition_score = repeated_lengths / (len(call_lengths) - 2) - indicators.append(length_repetition_score) - - if response_times and len(response_times) >= 3: - try: - std_time = np.std(response_times) - mean_time = np.mean(response_times) - if mean_time > 0: - time_consistency = 1.0 - (std_time / mean_time) - indicators.append(max(0, time_consistency - 0.3) * 1.5) - except Exception: - pass - - return np.mean(indicators) if indicators else 0.0 - - def _get_call_samples(self, llm_calls: List[Dict]) -> str: - samples = [] - - if len(llm_calls) <= 6: - sample_indices = list(range(len(llm_calls))) - else: - sample_indices = [0, 1, len(llm_calls) // 2 - 1, len(llm_calls) // 2, - len(llm_calls) - 2, len(llm_calls) - 1] - - for idx in sample_indices: - call = llm_calls[idx] - content = call.get("response", "") - - if isinstance(content, str): - sample = content - elif isinstance(content, list) and len(content) > 0: - sample_parts = [] - for msg in content: - if isinstance(msg, dict) and "content" in msg: - sample_parts.append(msg["content"]) - sample = "\n".join(sample_parts) - else: - sample = str(content) - - truncated = sample[:200] + "..." if len(sample) > 200 else sample - samples.append(f"Call {idx + 1}:\n{truncated}\n") - - return "\n".join(samples) diff --git a/src/crewai/evaluation/metrics/semantic_quality_metrics.py b/src/crewai/evaluation/metrics/semantic_quality_metrics.py deleted file mode 100644 index a12c62ae3..000000000 --- a/src/crewai/evaluation/metrics/semantic_quality_metrics.py +++ /dev/null @@ -1,65 +0,0 @@ -from typing import Any, Dict - -from crewai.agent import Agent -from crewai.task import Task - -from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory -from crewai.evaluation.json_parser import extract_json_from_llm_response - -class SemanticQualityEvaluator(BaseEvaluator): - @property - def metric_category(self) -> MetricCategory: - return MetricCategory.SEMANTIC_QUALITY - - def evaluate( - self, - agent: Agent, - task: Task, - execution_trace: Dict[str, Any], - final_output: Any, - ) -> EvaluationScore: - prompt = [ - {"role": "system", "content": """You are an expert evaluator assessing the semantic quality of an AI agent's output. - -Score the semantic quality on a scale from 0-10 where: -- 0: Completely incoherent, confusing, or logically flawed output -- 5: Moderately clear and logical output with some issues -- 10: Exceptionally clear, coherent, and logically sound output - -Consider: -1. Is the output well-structured and organized? -2. Is the reasoning logical and well-supported? -3. Is the language clear, precise, and appropriate for the task? -4. Are claims supported by evidence when appropriate? -5. Is the output free from contradictions and logical fallacies? - -Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string). -"""}, - {"role": "user", "content": f""" -Agent role: {agent.role} -Task description: {task.description} - -Agent's final output: -{final_output} - -Evaluate the semantic quality and reasoning of this output. -"""} - ] - - assert self.llm is not None - response = self.llm.call(prompt) - - try: - evaluation_data: dict[str, Any] = extract_json_from_llm_response(response) - assert evaluation_data is not None - return EvaluationScore( - score=float(evaluation_data["score"]) if evaluation_data.get("score") is not None else None, - feedback=evaluation_data.get("feedback", response), - raw_response=response - ) - except Exception: - return EvaluationScore( - score=None, - feedback=f"Failed to parse evaluation. Raw response: {response}", - raw_response=response - ) \ No newline at end of file diff --git a/src/crewai/evaluation/metrics/tools_metrics.py b/src/crewai/evaluation/metrics/tools_metrics.py deleted file mode 100644 index 00762fc76..000000000 --- a/src/crewai/evaluation/metrics/tools_metrics.py +++ /dev/null @@ -1,400 +0,0 @@ -import json -from typing import Dict, Any - -from crewai.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory -from crewai.evaluation.json_parser import extract_json_from_llm_response -from crewai.agent import Agent -from crewai.task import Task - - -class ToolSelectionEvaluator(BaseEvaluator): - - @property - def metric_category(self) -> MetricCategory: - return MetricCategory.TOOL_SELECTION - - def evaluate( - self, - agent: Agent, - task: Task, - execution_trace: Dict[str, Any], - final_output: str, - ) -> EvaluationScore: - tool_uses = execution_trace.get("tool_uses", []) - tool_count = len(tool_uses) - unique_tool_types = set([tool.get("tool", "Unknown tool") for tool in tool_uses]) - - if tool_count == 0: - if not agent.tools: - return EvaluationScore( - score=None, - feedback="Agent had no tools available to use." - ) - else: - return EvaluationScore( - score=None, - feedback="Agent had tools available but didn't use any." - ) - - available_tools_info = "" - if agent.tools: - for tool in agent.tools: - available_tools_info += f"- {tool.name}: {tool.description}\n" - else: - available_tools_info = "No tools available" - - tool_types_summary = "Tools selected by the agent:\n" - for tool_type in sorted(unique_tool_types): - tool_types_summary += f"- {tool_type}\n" - - prompt = [ - {"role": "system", "content": """You are an expert evaluator assessing if an AI agent selected the most appropriate tools for a given task. - -You must evaluate based on these 2 criteria: -1. Relevance (0-10): Were the tools chosen directly aligned with the task's goals? -2. Coverage (0-10): Did the agent select ALL appropriate tools from the AVAILABLE tools? - -IMPORTANT: -- ONLY consider tools that are listed as available to the agent -- DO NOT suggest tools that aren't in the 'Available tools' list -- DO NOT evaluate the quality or accuracy of tool outputs/results -- DO NOT evaluate how many times each tool was used -- DO NOT evaluate how the agent used the parameters -- DO NOT evaluate whether the agent interpreted the task correctly - -Focus ONLY on whether the correct CATEGORIES of tools were selected from what was available. - -Return your evaluation as JSON with these fields: -- scores: {"relevance": number, "coverage": number} -- overall_score: number (average of all scores, 0-10) -- feedback: string (focused ONLY on tool selection decisions from available tools) -- improvement_suggestions: string (ONLY suggest better selection from the AVAILABLE tools list, NOT new tools) -"""}, - {"role": "user", "content": f""" -Agent role: {agent.role} -Task description: {task.description} - -Available tools for this agent: -{available_tools_info} - -{tool_types_summary} - -Based ONLY on the task description and comparing the AVAILABLE tools with those that were selected (listed above), evaluate if the agent selected the appropriate tool types for this task. - -IMPORTANT: -- ONLY evaluate selection from tools listed as available -- DO NOT suggest new tools that aren't in the available tools list -- DO NOT evaluate tool usage or results -"""} - ] - assert self.llm is not None - response = self.llm.call(prompt) - - try: - evaluation_data = extract_json_from_llm_response(response) - assert evaluation_data is not None - - scores = evaluation_data.get("scores", {}) - relevance = scores.get("relevance", 5.0) - coverage = scores.get("coverage", 5.0) - overall_score = float(evaluation_data.get("overall_score", 5.0)) - - feedback = "Tool Selection Evaluation:\n" - feedback += f"• Relevance: {relevance}/10 - Selection of appropriate tool types for the task\n" - feedback += f"• Coverage: {coverage}/10 - Selection of all necessary tool types\n" - if "improvement_suggestions" in evaluation_data: - feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}" - else: - feedback += evaluation_data.get("feedback", "No detailed feedback available.") - - return EvaluationScore( - score=overall_score, - feedback=feedback, - raw_response=response - ) - except Exception as e: - return EvaluationScore( - score=None, - feedback=f"Error evaluating tool selection: {e}", - raw_response=response - ) - - -class ParameterExtractionEvaluator(BaseEvaluator): - @property - def metric_category(self) -> MetricCategory: - return MetricCategory.PARAMETER_EXTRACTION - - def evaluate( - self, - agent: Agent, - task: Task, - execution_trace: Dict[str, Any], - final_output: str, - ) -> EvaluationScore: - tool_uses = execution_trace.get("tool_uses", []) - tool_count = len(tool_uses) - - if tool_count == 0: - return EvaluationScore( - score=None, - feedback="No tool usage detected. Cannot evaluate parameter extraction." - ) - - validation_errors = [] - for tool_use in tool_uses: - if not tool_use.get("success", True) and tool_use.get("error_type") == "validation_error": - validation_errors.append({ - "tool": tool_use.get("tool", "Unknown tool"), - "error": tool_use.get("result"), - "args": tool_use.get("args", {}) - }) - - validation_error_rate = len(validation_errors) / tool_count if tool_count > 0 else 0 - - param_samples = [] - for i, tool_use in enumerate(tool_uses[:5]): - tool_name = tool_use.get("tool", "Unknown tool") - tool_args = tool_use.get("args", {}) - success = tool_use.get("success", True) and not tool_use.get("error", False) - error_type = tool_use.get("error_type", "") if not success else "" - - is_validation_error = error_type == "validation_error" - - sample = f"Tool use #{i+1} - {tool_name}:\n" - sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n" - sample += f"- Success: {'No' if not success else 'Yes'}" - - if is_validation_error: - sample += " (PARAMETER VALIDATION ERROR)\n" - sample += f"- Error: {tool_use.get('result', 'Unknown error')}" - elif not success: - sample += f" (Other error: {error_type})\n" - - param_samples.append(sample) - - validation_errors_info = "" - if validation_errors: - validation_errors_info = f"\nParameter validation errors detected: {len(validation_errors)} ({validation_error_rate:.1%} of tool uses)\n" - for i, err in enumerate(validation_errors[:3]): - tool_name = err.get("tool", "Unknown tool") - error_msg = err.get("error", "Unknown error") - args = err.get("args", {}) - validation_errors_info += f"\nValidation Error #{i+1}:\n- Tool: {tool_name}\n- Args: {json.dumps(args, indent=2)}\n- Error: {error_msg}" - - if len(validation_errors) > 3: - validation_errors_info += f"\n...and {len(validation_errors) - 3} more validation errors." - param_samples_text = "\n\n".join(param_samples) - prompt = [ - {"role": "system", "content": """You are an expert evaluator assessing how well an AI agent extracts and formats PARAMETER VALUES for tool calls. - -Your job is to evaluate ONLY whether the agent used the correct parameter VALUES, not whether the right tools were selected or how the tools were invoked. - -Evaluate parameter extraction based on these criteria: -1. Accuracy (0-10): Are parameter values correctly identified from the context/task? -2. Formatting (0-10): Are values formatted correctly for each tool's requirements? -3. Completeness (0-10): Are all required parameter values provided, with no missing information? - -IMPORTANT: DO NOT evaluate: -- Whether the right tool was chosen (that's the ToolSelectionEvaluator's job) -- How the tools were structurally invoked (that's the ToolInvocationEvaluator's job) -- The quality of results from tools - -Focus ONLY on the PARAMETER VALUES - whether they were correctly extracted from the context, properly formatted, and complete. - -Validation errors are important signals that parameter values weren't properly extracted or formatted. - -Return your evaluation as JSON with these fields: -- scores: {"accuracy": number, "formatting": number, "completeness": number} -- overall_score: number (average of all scores, 0-10) -- feedback: string (focused ONLY on parameter value extraction quality) -- improvement_suggestions: string (concrete suggestions for better parameter VALUE extraction) -"""}, - {"role": "user", "content": f""" -Agent role: {agent.role} -Task description: {task.description} - -Parameter extraction examples: -{param_samples_text} -{validation_errors_info} - -Evaluate the quality of the agent's parameter extraction for this task. -"""} - ] - - assert self.llm is not None - response = self.llm.call(prompt) - - try: - evaluation_data = extract_json_from_llm_response(response) - assert evaluation_data is not None - - scores = evaluation_data.get("scores", {}) - accuracy = scores.get("accuracy", 5.0) - formatting = scores.get("formatting", 5.0) - completeness = scores.get("completeness", 5.0) - - overall_score = float(evaluation_data.get("overall_score", 5.0)) - - feedback = "Parameter Extraction Evaluation:\n" - feedback += f"• Accuracy: {accuracy}/10 - Correctly identifying required parameters\n" - feedback += f"• Formatting: {formatting}/10 - Properly formatting parameters for tools\n" - feedback += f"• Completeness: {completeness}/10 - Including all necessary information\n\n" - - if "improvement_suggestions" in evaluation_data: - feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}" - else: - feedback += evaluation_data.get("feedback", "No detailed feedback available.") - - return EvaluationScore( - score=overall_score, - feedback=feedback, - raw_response=response - ) - except Exception as e: - return EvaluationScore( - score=None, - feedback=f"Error evaluating parameter extraction: {e}", - raw_response=response - ) - - -class ToolInvocationEvaluator(BaseEvaluator): - @property - def metric_category(self) -> MetricCategory: - return MetricCategory.TOOL_INVOCATION - - def evaluate( - self, - agent: Agent, - task: Task, - execution_trace: Dict[str, Any], - final_output: str, - ) -> EvaluationScore: - tool_uses = execution_trace.get("tool_uses", []) - tool_errors = [] - tool_count = len(tool_uses) - - if tool_count == 0: - return EvaluationScore( - score=None, - feedback="No tool usage detected. Cannot evaluate tool invocation." - ) - - for tool_use in tool_uses: - if not tool_use.get("success", True) or tool_use.get("error", False): - error_info = { - "tool": tool_use.get("tool", "Unknown tool"), - "error": tool_use.get("result"), - "error_type": tool_use.get("error_type", "unknown_error") - } - tool_errors.append(error_info) - - error_rate = len(tool_errors) / tool_count if tool_count > 0 else 0 - - error_types = {} - for error in tool_errors: - error_type = error.get("error_type", "unknown_error") - if error_type not in error_types: - error_types[error_type] = 0 - error_types[error_type] += 1 - - invocation_samples = [] - for i, tool_use in enumerate(tool_uses[:5]): - tool_name = tool_use.get("tool", "Unknown tool") - tool_args = tool_use.get("args", {}) - success = tool_use.get("success", True) and not tool_use.get("error", False) - error_type = tool_use.get("error_type", "") if not success else "" - error_msg = tool_use.get("result", "No error") if not success else "No error" - - sample = f"Tool invocation #{i+1}:\n" - sample += f"- Tool: {tool_name}\n" - sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n" - sample += f"- Success: {'No' if not success else 'Yes'}\n" - if not success: - sample += f"- Error type: {error_type}\n" - sample += f"- Error: {error_msg}" - invocation_samples.append(sample) - - error_type_summary = "" - if error_types: - error_type_summary = "Error type breakdown:\n" - for error_type, count in error_types.items(): - error_type_summary += f"- {error_type}: {count} occurrences ({(count/tool_count):.1%})\n" - - invocation_samples_text = "\n\n".join(invocation_samples) - prompt = [ - {"role": "system", "content": """You are an expert evaluator assessing how correctly an AI agent's tool invocations are STRUCTURED. - -Your job is to evaluate ONLY the structural and syntactical aspects of how the agent called tools, NOT which tools were selected or what parameter values were used. - -Evaluate the agent's tool invocation based on these criteria: -1. Structure (0-10): Does the tool call follow the expected syntax and format? -2. Error Handling (0-10): Does the agent handle tool errors appropriately? -3. Invocation Patterns (0-10): Are tool calls properly sequenced, batched, or managed? - -Error types that indicate invocation issues: -- execution_error: The tool was called correctly but failed during execution -- usage_error: General errors in how the tool was used structurally - -IMPORTANT: DO NOT evaluate: -- Whether the right tool was chosen (that's the ToolSelectionEvaluator's job) -- Whether the parameter values are correct (that's the ParameterExtractionEvaluator's job) -- The quality of results from tools - -Focus ONLY on HOW tools were invoked - the structure, format, and handling of the invocation process. - -Return your evaluation as JSON with these fields: -- scores: {"structure": number, "error_handling": number, "invocation_patterns": number} -- overall_score: number (average of all scores, 0-10) -- feedback: string (focused ONLY on structural aspects of tool invocation) -- improvement_suggestions: string (concrete suggestions for better structuring of tool calls) -"""}, - {"role": "user", "content": f""" -Agent role: {agent.role} -Task description: {task.description} - -Tool invocation examples: -{invocation_samples_text} - -Tool error rate: {error_rate:.2%} ({len(tool_errors)} errors out of {tool_count} invocations) -{error_type_summary} - -Evaluate the quality of the agent's tool invocation structure during this task. -"""} - ] - - assert self.llm is not None - response = self.llm.call(prompt) - - try: - evaluation_data = extract_json_from_llm_response(response) - assert evaluation_data is not None - scores = evaluation_data.get("scores", {}) - structure = scores.get("structure", 5.0) - error_handling = scores.get("error_handling", 5.0) - invocation_patterns = scores.get("invocation_patterns", 5.0) - - overall_score = float(evaluation_data.get("overall_score", 5.0)) - - feedback = "Tool Invocation Evaluation:\n" - feedback += f"• Structure: {structure}/10 - Following proper syntax and format\n" - feedback += f"• Error Handling: {error_handling}/10 - Appropriately handling tool errors\n" - feedback += f"• Invocation Patterns: {invocation_patterns}/10 - Proper sequencing and management of calls\n\n" - - if "improvement_suggestions" in evaluation_data: - feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}" - else: - feedback += evaluation_data.get("feedback", "No detailed feedback available.") - - return EvaluationScore( - score=overall_score, - feedback=feedback, - raw_response=response - ) - except Exception as e: - return EvaluationScore( - score=None, - feedback=f"Error evaluating tool invocation: {e}", - raw_response=response - )