diff --git a/src/crewai/evaluation/__init__.py b/src/crewai/evaluation/__init__.py new file mode 100644 index 000000000..d9389e6e8 --- /dev/null +++ b/src/crewai/evaluation/__init__.py @@ -0,0 +1,39 @@ +# First, import the core base classes without AgentEvaluator +from crewai.evaluation.base_evaluator import ( + BaseEvaluator, + EvaluationScore, + MetricCategory, + AgentEvaluationResult +) + +# Now import the evaluators which depend on base classes +from crewai.evaluation.metrics.semantic_quality_metrics import ( + SemanticQualityEvaluator +) + +from crewai.evaluation.metrics.goal_metrics import ( + GoalAlignmentEvaluator +) + +from crewai.evaluation.metrics.reasoning_metrics import ( + ReasoningEfficiencyEvaluator +) + + +from crewai.evaluation.metrics.tools_metrics import ( + ToolSelectionEvaluator, + ParameterExtractionEvaluator, + ToolInvocationEvaluator +) + +# Next import integration which uses the base classes but not AgentEvaluator +from crewai.evaluation.evaluation_listener import ( + EvaluationTraceCallback, + create_evaluation_callbacks +) + + +from crewai.evaluation.agent_evaluator import ( + AgentEvaluator, + create_default_evaluator +) \ No newline at end of file diff --git a/src/crewai/evaluation/agent_evaluator.py b/src/crewai/evaluation/agent_evaluator.py new file mode 100644 index 000000000..3627571b0 --- /dev/null +++ b/src/crewai/evaluation/agent_evaluator.py @@ -0,0 +1,178 @@ +from crewai.evaluation.base_evaluator import AgentEvaluationResult, AgentAggregatedEvaluationResult, AggregationStrategy +from crewai.utilities.events.base_event_listener import BaseEventListener +from crewai.agent import Agent +from crewai.task import Task +from crewai.utilities.llm_utils import create_llm +from crewai.evaluation.evaluation_display import EvaluationDisplayFormatter + +from typing import List, Optional, Dict, Any, Tuple +from collections import defaultdict +from crewai.evaluation import EvaluationScore, BaseEvaluator, create_evaluation_callbacks +from crewai.crew import Crew +from rich.table import Table +from crewai.utilities.events.crewai_event_bus import crewai_event_bus +from crewai.utilities.events.utils.console_formatter import ConsoleFormatter + +class AgentEvaluator: + def __init__( + self, + evaluators: Optional[List[BaseEvaluator]] = None, + crew: Optional[Any] = None, + ): + self.crew: Crew = crew + self.evaluators = evaluators + + self.agent_evaluators = {} + if crew is not None: + for agent in crew.agents: + self.agent_evaluators[agent.id] = self.evaluators.copy() + + self.callback = create_evaluation_callbacks() + self.console_formatter = ConsoleFormatter() + self.display_formatter = EvaluationDisplayFormatter() + + self.iteration = 1 + self.iterations_results = {} + + def set_iteration(self, iteration: int) -> None: + self.iteration = iteration + + def evaluate_current_iteration(self): + if not self.crew: + raise ValueError("Cannot evaluate: no crew was provided to the evaluator.") + + if not self.callback: + raise ValueError("Cannot evaluate: no callback was set. Use set_callback() method first.") + + from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn + self.console_formatter.print(f"\n[bold blue]📊 Running agent evaluations for iteration {self.iteration}...[/bold blue]\n") + + evaluation_results = defaultdict(list) + + total_evals = 0 + for agent in self.crew.agents: + for task in self.crew.tasks: + if task.agent.id == agent.id and self.agent_evaluators.get(agent.id): + total_evals += 1 + + with Progress( + SpinnerColumn(), + TextColumn("[bold blue]{task.description}[/bold blue]"), + BarColumn(), + TextColumn("{task.percentage:.0f}% completed"), + console=self.console_formatter.console + ) as progress: + eval_task = progress.add_task(f"Evaluating agents (iteration {self.iteration})...", total=total_evals) + + for agent in self.crew.agents: + evaluator = self.agent_evaluators.get(agent.id) + if not evaluator: + continue + + for task in self.crew.tasks: + if task.agent.id != agent.id: + continue + + trace = self.callback.get_trace(agent.id, task.id) + if not trace: + self.console_formatter.print(f"[yellow]Warning: No trace found for agent {agent.role} on task {task.description[:30]}...[/yellow]") + progress.update(eval_task, advance=1) + continue + + with crewai_event_bus.scoped_handlers(): + result = self.evaluate( + agent=agent, + task=task, + execution_trace=trace, + final_output=task.output + ) + evaluation_results[agent.role].append(result) + progress.update(eval_task, advance=1) + + self.iterations_results[self.iteration] = evaluation_results + return evaluation_results + + def get_evaluation_results(self): + if self.iteration in self.iterations_results: + return self.iterations_results[self.iteration] + + return self.evaluate_current_iteration() + + def display_results_with_iterations(self): + self.display_formatter.display_summary_results(self.iterations_results) + + def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE): + agent_results = {} + with crewai_event_bus.scoped_handlers(): + task_results = self.get_evaluation_results() + for agent_role, results in task_results.items(): + if not results: + continue + + agent_id = results[0].agent_id + + aggregated_result = self.display_formatter._aggregate_agent_results( + agent_id=agent_id, + agent_role=agent_role, + results=results, + strategy=strategy + ) + + agent_results[agent_role] = aggregated_result + + if len(self.iterations_results) > 1 and self.iteration == max(self.iterations_results.keys()): + self.display_results_with_iterations() + elif agent_results: + self.display_evaluation_results(agent_results) + + return agent_results + + def display_evaluation_results(self, agent_results: Dict[str, AgentAggregatedEvaluationResult]): + self.display_formatter.display_evaluation_results(agent_results) + + def evaluate( + self, + agent: Agent, + task: Task, + execution_trace: Dict[str, Any], + final_output: Any + ) -> AgentEvaluationResult: + result = AgentEvaluationResult( + agent_id=str(agent.id), + task_id=str(task.id) + ) + + for evaluator in self.evaluators: + try: + score = evaluator.evaluate( + agent=agent, + task=task, + execution_trace=execution_trace, + final_output=final_output + ) + result.metrics[evaluator.metric_category] = score + except Exception as e: + self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}") + + return result + +def create_default_evaluator(crew, llm=None): + from crewai.evaluation import ( + GoalAlignmentEvaluator, + SemanticQualityEvaluator, + ToolSelectionEvaluator, + ParameterExtractionEvaluator, + ToolInvocationEvaluator, + ReasoningEfficiencyEvaluator + ) + + evaluators = [ + GoalAlignmentEvaluator(llm=llm), + SemanticQualityEvaluator(llm=llm), + ToolSelectionEvaluator(llm=llm), + ParameterExtractionEvaluator(llm=llm), + ToolInvocationEvaluator(llm=llm), + ReasoningEfficiencyEvaluator(llm=llm), + ] + + return AgentEvaluator(evaluators=evaluators, crew=crew) diff --git a/src/crewai/evaluation/base_evaluator.py b/src/crewai/evaluation/base_evaluator.py new file mode 100644 index 000000000..848e08468 --- /dev/null +++ b/src/crewai/evaluation/base_evaluator.py @@ -0,0 +1,125 @@ +import abc +import enum +from enum import Enum +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + +from crewai.agent import Agent +from crewai.task import Task +from crewai.llm import BaseLLM +from crewai.utilities.llm_utils import create_llm + +class MetricCategory(enum.Enum): + GOAL_ALIGNMENT = "goal_alignment" + SEMANTIC_QUALITY = "semantic_quality" + REASONING_EFFICIENCY = "reasoning_efficiency" + TOOL_SELECTION = "tool_selection" + PARAMETER_EXTRACTION = "parameter_extraction" + TOOL_INVOCATION = "tool_invocation" + + def title(self): + return self.value.replace('_', ' ').title() + + +class EvaluationScore(BaseModel): + score: Optional[float] = Field( + default=5.0, + description="Numeric score from 0-10 where 0 is worst and 10 is best, None if not applicable", + ge=0.0, + le=10.0 + ) + feedback: str = Field( + default="", + description="Detailed feedback explaining the evaluation score" + ) + raw_response: Optional[str] = Field( + default=None, + description="Raw response from the evaluator (e.g., LLM)" + ) + + def __str__(self) -> str: + if self.score is None: + return f"Score: N/A - {self.feedback}" + return f"Score: {self.score:.1f}/10 - {self.feedback}" + + +class BaseEvaluator(abc.ABC): + def __init__(self, llm: Optional[BaseLLM] = None): + self.llm = create_llm(llm) + + @property + @abc.abstractmethod + def metric_category(self) -> MetricCategory: + pass + + @abc.abstractmethod + def evaluate( + self, + agent: Agent, + task: Task, + execution_trace: Dict[str, Any], + final_output: Any, + ) -> EvaluationScore: + pass + + +class AgentEvaluationResult(BaseModel): + agent_id: str = Field(description="ID of the evaluated agent") + task_id: str = Field(description="ID of the task that was executed") + metrics: Dict[MetricCategory, EvaluationScore] = Field( + default_factory=dict, + description="Evaluation scores for each metric category" + ) + + +class AggregationStrategy(Enum): + SIMPLE_AVERAGE = "simple_average" # Equal weight to all tasks + WEIGHTED_BY_COMPLEXITY = "weighted_by_complexity" # Weight by task complexity + BEST_PERFORMANCE = "best_performance" # Use best scores across tasks + WORST_PERFORMANCE = "worst_performance" # Use worst scores across tasks + + +class AgentAggregatedEvaluationResult(BaseModel): + agent_id: str = Field( + default="", + description="ID of the agent" + ) + agent_role: str = Field( + default="", + description="Role of the agent" + ) + task_count: int = Field( + default=0, + description="Number of tasks included in this aggregation" + ) + aggregation_strategy: AggregationStrategy = Field( + default=AggregationStrategy.SIMPLE_AVERAGE, + description="Strategy used for aggregation" + ) + metrics: Dict[MetricCategory, EvaluationScore] = Field( + default_factory=dict, + description="Aggregated metrics across all tasks" + ) + task_results: List[str] = Field( + default_factory=list, + description="IDs of tasks included in this aggregation" + ) + overall_score: Optional[float] = Field( + default=None, + description="Overall score for this agent" + ) + + def __str__(self) -> str: + result = f"Agent Evaluation: {self.agent_role}\n" + result += f"Strategy: {self.aggregation_strategy.value}\n" + result += f"Tasks evaluated: {self.task_count}\n" + + for category, score in self.metrics.items(): + result += f"\n\n- {category.value.upper()}: {score.score}/10\n" + + if score.feedback: + detailed_feedback = "\n ".join(score.feedback.split('\n')) + result += f" {detailed_feedback}\n" + + return result \ No newline at end of file diff --git a/src/crewai/evaluation/evaluation_display.py b/src/crewai/evaluation/evaluation_display.py new file mode 100644 index 000000000..0b2c21e3f --- /dev/null +++ b/src/crewai/evaluation/evaluation_display.py @@ -0,0 +1,323 @@ +from typing import Dict, Any, List +from rich.table import Table +from rich.box import HEAVY_EDGE, ROUNDED +from rich.panel import Panel +from crewai.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy +from crewai.evaluation import EvaluationScore +from crewai.utilities.events.utils.console_formatter import ConsoleFormatter +from crewai.utilities.llm_utils import create_llm + +class EvaluationDisplayFormatter: + def __init__(self): + self.console_formatter = ConsoleFormatter() + + def display_evaluation_results(self, agent_results: Dict[str, AgentAggregatedEvaluationResult]): + if not agent_results: + self.console_formatter.print("[yellow]No evaluation results to display[/yellow]") + return + + for agent_role, result in agent_results.items(): + self.console_formatter.print(f"\n[bold cyan]Agent: {agent_role}[/bold cyan]\n") + + table = Table(title=f"{agent_role} Evaluation Results", box=ROUNDED) + table.add_column("Metric", style="cyan") + table.add_column("Score (1-10)", justify="center") + table.add_column("Feedback", style="green") + + for metric, evaluation_score in result.metrics.items(): + score = evaluation_score.score if evaluation_score.score is not None else "N/A" + + if isinstance(score, (int, float)) and score is not None: + if score >= 8.0: + score_text = f"[green]{score:.1f}[/green]" + elif score >= 6.0: + score_text = f"[cyan]{score:.1f}[/cyan]" + elif score >= 4.0: + score_text = f"[yellow]{score:.1f}[/yellow]" + else: + score_text = f"[red]{score:.1f}[/red]" + else: + score_text = "[dim]N/A[/dim]" + + table.add_section() + table.add_row( + metric.title(), + score_text, + evaluation_score.feedback or "" + ) + + if result.overall_score is not None: + if result.overall_score >= 8.0: + color = "green" + elif result.overall_score >= 6.0: + color = "cyan" + elif result.overall_score >= 4.0: + color = "yellow" + else: + color = "red" + + table.add_section() + table.add_row( + "[bold]Overall Score[/bold]", + f"[bold {color}]{result.overall_score:.1f}[/bold {color}]", + "" + ) + + self.console_formatter.print(table) + + def display_summary_results(self, iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]]): + if not iterations_results: + self.console_formatter.print("[yellow]No evaluation results to display[/yellow]") + return + + title = Panel( + "[bold]Agent Evaluation Summary[/bold]", + style="blue", + box=ROUNDED + ) + self.console_formatter.print(title, justify="center") + self.console_formatter.print("\n") + + table = Table(title="Agent Performance Scores \n (1-10 Higher is better)", box=HEAVY_EDGE) + + table.add_column("Agent/Metric", style="cyan") + + for iter_num in sorted(iterations_results.keys()): + run_label = f"Run {iter_num}" + table.add_column(run_label, justify="center") + + table.add_column("Avg. Total", justify="center") + + all_agent_roles = set() + for results in iterations_results.values(): + all_agent_roles.update(results.keys()) + + for agent_role in sorted(all_agent_roles): + agent_scores_by_iteration = {} + agent_metrics_by_iteration = {} + + for iter_num, results in sorted(iterations_results.items()): + if agent_role not in results or not results[agent_role]: + continue + + agent_results = results[agent_role] + agent_id = agent_results[0].agent_id + + aggregated_result = self._aggregate_agent_results( + agent_id=agent_id, + agent_role=agent_role, + results=agent_results, + strategy=AggregationStrategy.SIMPLE_AVERAGE + ) + + valid_scores = [score.score for score in aggregated_result.metrics.values() + if score.score is not None] + if valid_scores: + avg_score = sum(valid_scores) / len(valid_scores) + agent_scores_by_iteration[iter_num] = avg_score + + agent_metrics_by_iteration[iter_num] = aggregated_result.metrics + + if not agent_scores_by_iteration: + continue + + avg_across_iterations = sum(agent_scores_by_iteration.values()) / len(agent_scores_by_iteration) + + row = [f"[bold]{agent_role}[/bold]"] + + for iter_num in sorted(iterations_results.keys()): + if iter_num in agent_scores_by_iteration: + score = agent_scores_by_iteration[iter_num] + if score >= 8.0: + color = "green" + elif score >= 6.0: + color = "cyan" + elif score >= 4.0: + color = "yellow" + else: + color = "red" + row.append(f"[bold {color}]{score:.1f}[/]") + else: + row.append("-") + + if avg_across_iterations >= 8.0: + color = "green" + elif avg_across_iterations >= 6.0: + color = "cyan" + elif avg_across_iterations >= 4.0: + color = "yellow" + else: + color = "red" + row.append(f"[bold {color}]{avg_across_iterations:.1f}[/]") + + table.add_row(*row) + + all_metrics = set() + for metrics in agent_metrics_by_iteration.values(): + all_metrics.update(metrics.keys()) + + for metric in sorted(all_metrics, key=lambda x: x.value): + metric_scores = [] + + row = [f" - {metric.title()}"] + + for iter_num in sorted(iterations_results.keys()): + if (iter_num in agent_metrics_by_iteration and + metric in agent_metrics_by_iteration[iter_num]): + score = agent_metrics_by_iteration[iter_num][metric].score + if score is not None: + metric_scores.append(score) + if score >= 8.0: + color = "green" + elif score >= 6.0: + color = "cyan" + elif score >= 4.0: + color = "yellow" + else: + color = "red" + row.append(f"[{color}]{score:.1f}[/]") + else: + row.append("[dim]N/A[/dim]") + else: + row.append("-") + + if metric_scores: + avg = sum(metric_scores) / len(metric_scores) + if avg >= 8.0: + color = "green" + elif avg >= 6.0: + color = "cyan" + elif avg >= 4.0: + color = "yellow" + else: + color = "red" + row.append(f"[{color}]{avg:.1f}[/]") + else: + row.append("-") + + table.add_row(*row) + + table.add_row(*[""] * (len(sorted(iterations_results.keys())) + 2)) + + self.console_formatter.print(table) + self.console_formatter.print("\n") + + def _aggregate_agent_results( + self, + agent_id: str, + agent_role: str, + results: List[Any], + strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, + ) -> AgentAggregatedEvaluationResult: + metrics_by_category = {} + + for result in results: + for metric_name, evaluation_score in result.metrics.items(): + if metric_name not in metrics_by_category: + metrics_by_category[metric_name] = [] + metrics_by_category[metric_name].append(evaluation_score) + + aggregated_metrics = {} + for category, scores in metrics_by_category.items(): + valid_scores = [s for s in scores if s.score is not None] + + avg_score = sum(s.score for s in valid_scores) / len(valid_scores) if valid_scores else None + + # Extract all feedback text from scores + feedbacks = [s.feedback for s in scores if s.feedback] + + # Process feedback based on number of entries + feedback_summary = None + if feedbacks: + if len(feedbacks) > 1: + # Use the summarization method for multiple feedbacks + feedback_summary = self._summarize_feedbacks( + agent_role=agent_role, + metric=category, + feedbacks=feedbacks, + scores=[s.score for s in scores], + strategy=strategy + ) + else: + feedback_summary = feedbacks[0] + + aggregated_metrics[category] = EvaluationScore( + score=avg_score, + feedback=feedback_summary + ) + + overall_score = None + if aggregated_metrics: + scores = [m.score for m in aggregated_metrics.values() if m.score is not None] + if scores: + overall_score = sum(scores) / len(scores) + + return AgentAggregatedEvaluationResult( + agent_id=agent_id, + agent_role=agent_role, + metrics=aggregated_metrics, + overall_score=overall_score, + task_count=len(results), + aggregation_strategy=strategy + ) + + def _summarize_feedbacks( + self, + agent_role: str, + metric: str, + feedbacks: List[str], + scores: List[float], + strategy: AggregationStrategy + ) -> str: + if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks): + return "\n\n".join([f"Feedback {i+1}: {fb}" for i, fb in enumerate(feedbacks)]) + + try: + llm = create_llm() + + formatted_feedbacks = [] + for i, (feedback, score) in enumerate(zip(feedbacks, scores)): + if len(feedback) > 500: + feedback = feedback[:500] + "..." + score_text = f"{score:.1f}" if score is not None else "N/A" + formatted_feedbacks.append(f"Feedback #{i+1} (Score: {score_text}):\n{feedback}") + + all_feedbacks = "\n\n" + "\n\n---\n\n".join(formatted_feedbacks) + + strategy_guidance = "" + if strategy == AggregationStrategy.BEST_PERFORMANCE: + strategy_guidance = "Focus on the highest-scoring aspects and strengths demonstrated." + elif strategy == AggregationStrategy.WORST_PERFORMANCE: + strategy_guidance = "Focus on areas that need improvement and common issues across tasks." + else: # Default/average strategies + strategy_guidance = "Provide a balanced analysis of strengths and weaknesses across all tasks." + + prompt = [ + {"role": "system", "content": f"""You are an expert evaluator creating a comprehensive summary of agent performance feedback. + Your job is to synthesize multiple feedback points about the same metric across different tasks. + + Create a concise, insightful summary that captures the key patterns and themes from all feedback. + {strategy_guidance} + + Your summary should be: + 1. Specific and concrete (not vague or general) + 2. Focused on actionable insights + 3. Highlighting patterns across tasks + 4. 150-250 words in length + + The summary should be directly usable as final feedback for the agent's performance on this metric."""}, + {"role": "user", "content": f"""I need a synthesized summary of the following feedback for: + + Agent Role: {agent_role} + Metric: {metric.title()} + + {all_feedbacks} + """} + ] + + response = llm.call(prompt) + + return response + + except Exception as e: + return "Synthesized from multiple tasks: " + "\n\n".join([f"- {fb[:500]}..." for fb in feedbacks]) diff --git a/src/crewai/evaluation/evaluation_listener.py b/src/crewai/evaluation/evaluation_listener.py new file mode 100644 index 000000000..91a79c051 --- /dev/null +++ b/src/crewai/evaluation/evaluation_listener.py @@ -0,0 +1,188 @@ +from datetime import datetime +from typing import Any, Dict, List, Optional, Union + +from crewai.agent import Agent +from crewai.task import Task +from crewai.utilities.events.base_event_listener import BaseEventListener +from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus +from crewai.utilities.events.agent_events import ( + AgentExecutionStartedEvent, + AgentExecutionCompletedEvent +) +from crewai.utilities.events.tool_usage_events import ( + ToolUsageFinishedEvent, + ToolUsageErrorEvent, + ToolExecutionErrorEvent, + ToolSelectionErrorEvent, + ToolValidateInputErrorEvent +) +from crewai.utilities.events.llm_events import ( + LLMCallStartedEvent, + LLMCallCompletedEvent +) + +class EvaluationTraceCallback(BaseEventListener): + """Event listener for collecting execution traces for evaluation. + + This listener attaches to the event bus to collect detailed information + about the execution process, including agent steps, tool uses, knowledge + retrievals, and final output - all for use in agent evaluation. + """ + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if not hasattr(self, "_initialized") or not self._initialized: + super().__init__() + self.traces: Dict[str, Dict[str, Any]] = {} + self.current_agent_id = None + self.current_task_id = None + self._initialized = True + + def setup_listeners(self, event_bus: CrewAIEventsBus): + @event_bus.on(AgentExecutionStartedEvent) + def on_agent_started(source, event: AgentExecutionStartedEvent): + self.on_agent_start(event.agent, event.task) + + @event_bus.on(AgentExecutionCompletedEvent) + def on_agent_completed(source, event: AgentExecutionCompletedEvent): + self.on_agent_finish(event.agent, event.task, event.output) + + @event_bus.on(ToolUsageFinishedEvent) + def on_tool_completed(source, event: ToolUsageFinishedEvent): + self.on_tool_use(event.tool_name, event.tool_args, event.output, success=True) + + @event_bus.on(ToolUsageErrorEvent) + def on_tool_usage_error(source, event: ToolUsageErrorEvent): + self.on_tool_use(event.tool_name, event.tool_args, event.error, + success=False, error_type="usage_error") + + @event_bus.on(ToolExecutionErrorEvent) + def on_tool_execution_error(source, event: ToolExecutionErrorEvent): + self.on_tool_use(event.tool_name, event.tool_args, event.error, + success=False, error_type="execution_error") + + @event_bus.on(ToolSelectionErrorEvent) + def on_tool_selection_error(source, event: ToolSelectionErrorEvent): + self.on_tool_use(event.tool_name, event.tool_args, event.error, + success=False, error_type="selection_error") + + @event_bus.on(ToolValidateInputErrorEvent) + def on_tool_validate_input_error(source, event: ToolValidateInputErrorEvent): + self.on_tool_use(event.tool_name, event.tool_args, event.error, + success=False, error_type="validation_error") + + @event_bus.on(LLMCallStartedEvent) + def on_llm_call_started(source, event: LLMCallStartedEvent): + self.on_llm_call_start(event.messages, event.tools) + + @event_bus.on(LLMCallCompletedEvent) + def on_llm_call_completed(source, event: LLMCallCompletedEvent): + self.on_llm_call_end(event.messages, event.response) + + def on_agent_start(self, agent: Agent, task: Task): + self.current_agent_id = agent.id + self.current_task_id = task.id + + trace_key = f"{agent.id}_{task.id}" + self.traces[trace_key] = { + "agent_id": agent.id, + "task_id": task.id, + "tool_uses": [], + "llm_calls": [], + "start_time": datetime.now(), + "final_output": None + } + + def on_agent_finish(self, agent: Agent, task: Task, output: Any): + trace_key = f"{agent.id}_{task.id}" + if trace_key in self.traces: + self.traces[trace_key]["final_output"] = output + self.traces[trace_key]["end_time"] = datetime.now() + + self.current_agent_id = None + self.current_task_id = None + + def on_tool_use(self, tool_name: str, tool_args: Dict[str, Any], result: Any, + success: bool = True, error_type: Optional[str] = None): + if not self.current_agent_id or not self.current_task_id: + return + + trace_key = f"{self.current_agent_id}_{self.current_task_id}" + if trace_key in self.traces: + tool_use = { + "tool": tool_name, + "args": tool_args, + "result": result, + "success": success, + "timestamp": datetime.now() + } + + # Add error information if applicable + if not success and error_type: + tool_use["error"] = True + tool_use["error_type"] = error_type + + self.traces[trace_key]["tool_uses"].append(tool_use) + + def on_llm_call_start(self, messages: Union[str, List[Dict[str, Any]]], tools: Optional[List[Dict]] = None): + if not self.current_agent_id or not self.current_task_id: + return + + trace_key = f"{self.current_agent_id}_{self.current_task_id}" + if trace_key not in self.traces: + return + + self.current_llm_call = { + "messages": messages, + "tools": tools, + "start_time": datetime.now(), + "response": None, + "end_time": None + } + + def on_llm_call_end(self, messages: Union[str, List[Dict[str, Any]]], response: Any): + if not self.current_agent_id or not self.current_task_id: + return + + trace_key = f"{self.current_agent_id}_{self.current_task_id}" + if trace_key not in self.traces: + return + + total_tokens = 0 + if hasattr(response, "usage") and hasattr(response.usage, "total_tokens"): + total_tokens = response.usage.total_tokens + + current_time = datetime.now() + start_time = None + if hasattr(self, "current_llm_call") and self.current_llm_call: + start_time = self.current_llm_call.get("start_time") + + if not start_time: + start_time = current_time + llm_call = { + "messages": messages, + "response": response, + "start_time": start_time, + "end_time": current_time, + "total_tokens": total_tokens + } + + self.traces[trace_key]["llm_calls"].append(llm_call) + + if hasattr(self, "current_llm_call"): + self.current_llm_call = None + + def get_trace(self, agent_id: str, task_id: str) -> Optional[Dict[str, Any]]: + trace_key = f"{agent_id}_{task_id}" + return self.traces.get(trace_key) + + +def create_evaluation_callbacks() -> EvaluationTraceCallback: + return EvaluationTraceCallback() \ No newline at end of file diff --git a/src/crewai/evaluation/json_parser.py b/src/crewai/evaluation/json_parser.py new file mode 100644 index 000000000..a36084903 --- /dev/null +++ b/src/crewai/evaluation/json_parser.py @@ -0,0 +1,30 @@ +"""Robust JSON parsing utilities for evaluation responses.""" + +import json +import re +from typing import Dict, Any + + +def extract_json_from_llm_response(text: str) -> Dict[str, Any]: + try: + return json.loads(text) + except json.JSONDecodeError: + pass + + json_patterns = [ + # Standard markdown code blocks with json + r'```json\s*([\s\S]*?)\s*```', + # Code blocks without language specifier + r'```\s*([\s\S]*?)\s*```', + # Inline code with JSON + r'`([{\\[].*[}\]])`', + ] + + for pattern in json_patterns: + matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL) + for match in matches: + try: + return json.loads(match.strip()) + except json.JSONDecodeError: + continue + return text