chore: resolve all ruff and mypy issues in experimental module

resolve linting, typing, and import issues; update Okta test
This commit is contained in:
Greyson LaLonde
2025-09-22 12:56:28 -04:00
committed by GitHub
parent aa8dc9d77f
commit 0e370593f1
17 changed files with 595 additions and 402 deletions

View File

@@ -1,40 +1,39 @@
from crewai.experimental.evaluation import (
AgentEvaluationResult,
AgentEvaluator,
BaseEvaluator,
EvaluationScore,
MetricCategory,
AgentEvaluationResult,
SemanticQualityEvaluator,
GoalAlignmentEvaluator,
ReasoningEfficiencyEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
EvaluationTraceCallback,
create_evaluation_callbacks,
AgentEvaluator,
create_default_evaluator,
ExperimentRunner,
ExperimentResults,
ExperimentResult,
ExperimentResults,
ExperimentRunner,
GoalAlignmentEvaluator,
MetricCategory,
ParameterExtractionEvaluator,
ReasoningEfficiencyEvaluator,
SemanticQualityEvaluator,
ToolInvocationEvaluator,
ToolSelectionEvaluator,
create_default_evaluator,
create_evaluation_callbacks,
)
__all__ = [
"AgentEvaluationResult",
"AgentEvaluator",
"BaseEvaluator",
"EvaluationScore",
"MetricCategory",
"AgentEvaluationResult",
"SemanticQualityEvaluator",
"GoalAlignmentEvaluator",
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"EvaluationTraceCallback",
"create_evaluation_callbacks",
"AgentEvaluator",
"create_default_evaluator",
"ExperimentRunner",
"ExperimentResult",
"ExperimentResults",
"ExperimentResult"
]
"ExperimentRunner",
"GoalAlignmentEvaluator",
"MetricCategory",
"ParameterExtractionEvaluator",
"ReasoningEfficiencyEvaluator",
"SemanticQualityEvaluator",
"ToolInvocationEvaluator",
"ToolSelectionEvaluator",
"create_default_evaluator",
"create_evaluation_callbacks",
]

View File

@@ -1,51 +1,47 @@
from crewai.experimental.evaluation.agent_evaluator import (
AgentEvaluator,
create_default_evaluator,
)
from crewai.experimental.evaluation.base_evaluator import (
AgentEvaluationResult,
BaseEvaluator,
EvaluationScore,
MetricCategory,
AgentEvaluationResult
)
from crewai.experimental.evaluation.metrics import (
SemanticQualityEvaluator,
GoalAlignmentEvaluator,
ReasoningEfficiencyEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.experimental.evaluation.evaluation_listener import (
EvaluationTraceCallback,
create_evaluation_callbacks
create_evaluation_callbacks,
)
from crewai.experimental.evaluation.agent_evaluator import (
AgentEvaluator,
create_default_evaluator
)
from crewai.experimental.evaluation.experiment import (
ExperimentRunner,
ExperimentResult,
ExperimentResults,
ExperimentResult
ExperimentRunner,
)
from crewai.experimental.evaluation.metrics import (
GoalAlignmentEvaluator,
ParameterExtractionEvaluator,
ReasoningEfficiencyEvaluator,
SemanticQualityEvaluator,
ToolInvocationEvaluator,
ToolSelectionEvaluator,
)
__all__ = [
"AgentEvaluationResult",
"AgentEvaluator",
"BaseEvaluator",
"EvaluationScore",
"MetricCategory",
"AgentEvaluationResult",
"SemanticQualityEvaluator",
"GoalAlignmentEvaluator",
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"EvaluationTraceCallback",
"create_evaluation_callbacks",
"AgentEvaluator",
"create_default_evaluator",
"ExperimentRunner",
"ExperimentResult",
"ExperimentResults",
"ExperimentResult"
"ExperimentRunner",
"GoalAlignmentEvaluator",
"MetricCategory",
"ParameterExtractionEvaluator",
"ReasoningEfficiencyEvaluator",
"SemanticQualityEvaluator",
"ToolInvocationEvaluator",
"ToolSelectionEvaluator",
"create_default_evaluator",
"create_evaluation_callbacks",
]

View File

@@ -1,34 +1,36 @@
import threading
from typing import Any, Optional
from collections.abc import Sequence
from typing import Any
from crewai.experimental.evaluation.base_evaluator import (
AgentEvaluationResult,
AggregationStrategy,
)
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.evaluation_display import EvaluationDisplayFormatter
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentEvaluationStartedEvent,
AgentEvaluationCompletedEvent,
AgentEvaluationFailedEvent,
AgentEvaluationStartedEvent,
LiteAgentExecutionCompletedEvent,
)
from crewai.experimental.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence
from crewai.events.event_bus import crewai_event_bus
from crewai.events.utils.console_formatter import ConsoleFormatter
from crewai.events.types.task_events import TaskCompletedEvent
from crewai.events.types.agent_events import LiteAgentExecutionCompletedEvent
from crewai.events.utils.console_formatter import ConsoleFormatter
from crewai.experimental.evaluation.base_evaluator import (
AgentAggregatedEvaluationResult,
AgentEvaluationResult,
AggregationStrategy,
BaseEvaluator,
EvaluationScore,
MetricCategory,
)
from crewai.experimental.evaluation.evaluation_display import EvaluationDisplayFormatter
from crewai.experimental.evaluation.evaluation_listener import (
create_evaluation_callbacks,
)
from crewai.task import Task
class ExecutionState:
current_agent_id: Optional[str] = None
current_task_id: Optional[str] = None
current_agent_id: str | None = None
current_task_id: str | None = None
def __init__(self):
self.traces = {}
@@ -40,10 +42,10 @@ class ExecutionState:
class AgentEvaluator:
def __init__(
self,
agents: list[Agent],
agents: list[Agent] | list[BaseAgent],
evaluators: Sequence[BaseEvaluator] | None = None,
):
self.agents: list[Agent] = agents
self.agents: list[Agent] | list[BaseAgent] = agents
self.evaluators: Sequence[BaseEvaluator] | None = evaluators
self.callback = create_evaluation_callbacks()
@@ -75,7 +77,8 @@ class AgentEvaluator:
)
def _handle_task_completed(self, source: Any, event: TaskCompletedEvent) -> None:
assert event.task is not None
if event.task is None:
raise ValueError("TaskCompletedEvent must have a task")
agent = event.task.agent
if (
agent
@@ -92,9 +95,8 @@ class AgentEvaluator:
state.current_agent_id = str(agent.id)
state.current_task_id = str(event.task.id)
assert (
state.current_agent_id is not None and state.current_task_id is not None
)
if state.current_agent_id is None or state.current_task_id is None:
raise ValueError("Agent ID and Task ID must not be None")
trace = self.callback.get_trace(
state.current_agent_id, state.current_task_id
)
@@ -146,9 +148,8 @@ class AgentEvaluator:
if not target_agent:
return
assert (
state.current_agent_id is not None and state.current_task_id is not None
)
if state.current_agent_id is None or state.current_task_id is None:
raise ValueError("Agent ID and Task ID must not be None")
trace = self.callback.get_trace(
state.current_agent_id, state.current_task_id
)
@@ -244,7 +245,7 @@ class AgentEvaluator:
def evaluate(
self,
agent: Agent,
agent: Agent | BaseAgent,
execution_trace: dict[str, Any],
final_output: Any,
state: ExecutionState,
@@ -255,7 +256,8 @@ class AgentEvaluator:
task_id=state.current_task_id or (str(task.id) if task else "unknown_task"),
)
assert self.evaluators is not None
if self.evaluators is None:
raise ValueError("Evaluators must be initialized")
task_id = str(task.id) if task else None
for evaluator in self.evaluators:
try:
@@ -276,7 +278,7 @@ class AgentEvaluator:
metric_category=evaluator.metric_category,
score=score,
)
except Exception as e:
except Exception as e: # noqa: PERF203
self.emit_evaluation_failed_event(
agent_role=agent.role,
agent_id=str(agent.id),
@@ -284,7 +286,7 @@ class AgentEvaluator:
error=str(e),
)
self.console_formatter.print(
f"Error in {evaluator.metric_category.value} evaluator: {str(e)}"
f"Error in {evaluator.metric_category.value} evaluator: {e!s}"
)
return result
@@ -337,14 +339,14 @@ class AgentEvaluator:
)
def create_default_evaluator(agents: list[Agent], llm: None = None):
def create_default_evaluator(agents: list[Agent] | list[BaseAgent], llm: None = None):
from crewai.experimental.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator,
SemanticQualityEvaluator,
ToolInvocationEvaluator,
ToolSelectionEvaluator,
)
evaluators = [

View File

@@ -1,15 +1,17 @@
import abc
import enum
from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Any
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.task import Task
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.llm import BaseLLM
from crewai.task import Task
from crewai.utilities.llm_utils import create_llm
class MetricCategory(enum.Enum):
GOAL_ALIGNMENT = "goal_alignment"
SEMANTIC_QUALITY = "semantic_quality"
@@ -19,7 +21,7 @@ class MetricCategory(enum.Enum):
TOOL_INVOCATION = "tool_invocation"
def title(self):
return self.value.replace('_', ' ').title()
return self.value.replace("_", " ").title()
class EvaluationScore(BaseModel):
@@ -27,15 +29,13 @@ class EvaluationScore(BaseModel):
default=5.0,
description="Numeric score from 0-10 where 0 is worst and 10 is best, None if not applicable",
ge=0.0,
le=10.0
le=10.0,
)
feedback: str = Field(
default="",
description="Detailed feedback explaining the evaluation score"
default="", description="Detailed feedback explaining the evaluation score"
)
raw_response: str | None = Field(
default=None,
description="Raw response from the evaluator (e.g., LLM)"
default=None, description="Raw response from the evaluator (e.g., LLM)"
)
def __str__(self) -> str:
@@ -56,8 +56,8 @@ class BaseEvaluator(abc.ABC):
@abc.abstractmethod
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
agent: Agent | BaseAgent,
execution_trace: dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
@@ -67,9 +67,8 @@ class BaseEvaluator(abc.ABC):
class AgentEvaluationResult(BaseModel):
agent_id: str = Field(description="ID of the evaluated agent")
task_id: str = Field(description="ID of the task that was executed")
metrics: Dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Evaluation scores for each metric category"
metrics: dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict, description="Evaluation scores for each metric category"
)
@@ -81,33 +80,23 @@ class AggregationStrategy(Enum):
class AgentAggregatedEvaluationResult(BaseModel):
agent_id: str = Field(
default="",
description="ID of the agent"
)
agent_role: str = Field(
default="",
description="Role of the agent"
)
agent_id: str = Field(default="", description="ID of the agent")
agent_role: str = Field(default="", description="Role of the agent")
task_count: int = Field(
default=0,
description="Number of tasks included in this aggregation"
default=0, description="Number of tasks included in this aggregation"
)
aggregation_strategy: AggregationStrategy = Field(
default=AggregationStrategy.SIMPLE_AVERAGE,
description="Strategy used for aggregation"
description="Strategy used for aggregation",
)
metrics: Dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Aggregated metrics across all tasks"
metrics: dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict, description="Aggregated metrics across all tasks"
)
task_results: List[str] = Field(
default_factory=list,
description="IDs of tasks included in this aggregation"
task_results: list[str] = Field(
default_factory=list, description="IDs of tasks included in this aggregation"
)
overall_score: Optional[float] = Field(
default=None,
description="Overall score for this agent"
overall_score: float | None = Field(
default=None, description="Overall score for this agent"
)
def __str__(self) -> str:
@@ -119,7 +108,7 @@ class AgentAggregatedEvaluationResult(BaseModel):
result += f"\n\n- {category.value.upper()}: {score.score}/10\n"
if score.feedback:
detailed_feedback = "\n ".join(score.feedback.split('\n'))
detailed_feedback = "\n ".join(score.feedback.split("\n"))
result += f" {detailed_feedback}\n"
return result
return result

View File

@@ -1,16 +1,18 @@
from collections import defaultdict
from typing import Dict, Any, List
from rich.table import Table
from rich.box import HEAVY_EDGE, ROUNDED
from collections.abc import Sequence
from typing import Any
from rich.box import HEAVY_EDGE, ROUNDED
from rich.table import Table
from crewai.events.utils.console_formatter import ConsoleFormatter
from crewai.experimental.evaluation.base_evaluator import (
AgentAggregatedEvaluationResult,
AggregationStrategy,
AgentEvaluationResult,
AggregationStrategy,
EvaluationScore,
MetricCategory,
)
from crewai.experimental.evaluation import EvaluationScore
from crewai.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.llm_utils import create_llm
@@ -19,7 +21,7 @@ class EvaluationDisplayFormatter:
self.console_formatter = ConsoleFormatter()
def display_evaluation_with_feedback(
self, iterations_results: Dict[int, Dict[str, List[Any]]]
self, iterations_results: dict[int, dict[str, list[Any]]]
):
if not iterations_results:
self.console_formatter.print(
@@ -99,7 +101,7 @@ class EvaluationDisplayFormatter:
def display_summary_results(
self,
iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]],
iterations_results: dict[int, dict[str, list[AgentEvaluationResult]]],
):
if not iterations_results:
self.console_formatter.print(
@@ -280,7 +282,7 @@ class EvaluationDisplayFormatter:
feedback_summary = feedbacks[0]
aggregated_metrics[category] = EvaluationScore(
score=avg_score, feedback=feedback_summary
score=avg_score, feedback=feedback_summary or ""
)
overall_score = None
@@ -304,25 +306,25 @@ class EvaluationDisplayFormatter:
self,
agent_role: str,
metric: str,
feedbacks: List[str],
scores: List[float | None],
feedbacks: list[str],
scores: list[float | None],
strategy: AggregationStrategy,
) -> str:
if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks):
return "\n\n".join(
[f"Feedback {i+1}: {fb}" for i, fb in enumerate(feedbacks)]
[f"Feedback {i + 1}: {fb}" for i, fb in enumerate(feedbacks)]
)
try:
llm = create_llm()
formatted_feedbacks = []
for i, (feedback, score) in enumerate(zip(feedbacks, scores)):
for i, (feedback, score) in enumerate(zip(feedbacks, scores, strict=False)):
if len(feedback) > 500:
feedback = feedback[:500] + "..."
score_text = f"{score:.1f}" if score is not None else "N/A"
formatted_feedbacks.append(
f"Feedback #{i+1} (Score: {score_text}):\n{feedback}"
f"Feedback #{i + 1} (Score: {score_text}):\n{feedback}"
)
all_feedbacks = "\n\n" + "\n\n---\n\n".join(formatted_feedbacks)
@@ -365,10 +367,9 @@ class EvaluationDisplayFormatter:
""",
},
]
assert llm is not None
response = llm.call(prompt)
return response
if llm is None:
raise ValueError("LLM must be initialized")
return llm.call(prompt)
except Exception:
return "Synthesized from multiple tasks: " + "\n\n".join(

View File

@@ -1,26 +1,25 @@
from datetime import datetime
from typing import Any, Dict, Optional
from collections.abc import Sequence
from datetime import datetime
from typing import Any
from crewai.agent import Agent
from crewai.task import Task
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.event_bus import CrewAIEventsBus
from crewai.events.types.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
LiteAgentExecutionStartedEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.events.types.llm_events import LLMCallCompletedEvent, LLMCallStartedEvent
from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
ToolExecutionErrorEvent,
ToolSelectionErrorEvent,
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolValidateInputErrorEvent,
)
from crewai.events.types.llm_events import LLMCallStartedEvent, LLMCallCompletedEvent
from crewai.task import Task
class EvaluationTraceCallback(BaseEventListener):
@@ -136,7 +135,7 @@ class EvaluationTraceCallback(BaseEventListener):
def _init_trace(self, trace_key: str, **kwargs: Any):
self.traces[trace_key] = kwargs
def on_agent_start(self, agent: Agent, task: Task):
def on_agent_start(self, agent: BaseAgent, task: Task):
self.current_agent_id = agent.id
self.current_task_id = task.id
@@ -151,7 +150,7 @@ class EvaluationTraceCallback(BaseEventListener):
final_output=None,
)
def on_agent_finish(self, agent: Agent, task: Task, output: Any):
def on_agent_finish(self, agent: BaseAgent, task: Task, output: Any):
trace_key = f"{agent.id}_{task.id}"
if trace_key in self.traces:
self.traces[trace_key]["final_output"] = output
@@ -253,7 +252,7 @@ class EvaluationTraceCallback(BaseEventListener):
if hasattr(self, "current_llm_call"):
self.current_llm_call = {}
def get_trace(self, agent_id: str, task_id: str) -> Optional[Dict[str, Any]]:
def get_trace(self, agent_id: str, task_id: str) -> dict[str, Any] | None:
trace_key = f"{agent_id}_{task_id}"
return self.traces.get(trace_key)

View File

@@ -1,8 +1,7 @@
from crewai.experimental.evaluation.experiment.result import (
ExperimentResult,
ExperimentResults,
)
from crewai.experimental.evaluation.experiment.runner import ExperimentRunner
from crewai.experimental.evaluation.experiment.result import ExperimentResults, ExperimentResult
__all__ = [
"ExperimentRunner",
"ExperimentResults",
"ExperimentResult"
]
__all__ = ["ExperimentResult", "ExperimentResults", "ExperimentRunner"]

View File

@@ -2,45 +2,60 @@ import json
import os
from datetime import datetime, timezone
from typing import Any
from pydantic import BaseModel
class ExperimentResult(BaseModel):
identifier: str
inputs: dict[str, Any]
score: int | dict[str, int | float]
expected_score: int | dict[str, int | float]
score: float | dict[str, float]
expected_score: float | dict[str, float]
passed: bool
agent_evaluations: dict[str, Any] | None = None
class ExperimentResults:
def __init__(self, results: list[ExperimentResult], metadata: dict[str, Any] | None = None):
def __init__(
self, results: list[ExperimentResult], metadata: dict[str, Any] | None = None
):
self.results = results
self.metadata = metadata or {}
self.timestamp = datetime.now(timezone.utc)
from crewai.experimental.evaluation.experiment.result_display import ExperimentResultsDisplay
from crewai.experimental.evaluation.experiment.result_display import (
ExperimentResultsDisplay,
)
self.display = ExperimentResultsDisplay()
def to_json(self, filepath: str | None = None) -> dict[str, Any]:
data = {
"timestamp": self.timestamp.isoformat(),
"metadata": self.metadata,
"results": [r.model_dump(exclude={"agent_evaluations"}) for r in self.results]
"results": [
r.model_dump(exclude={"agent_evaluations"}) for r in self.results
],
}
if filepath:
with open(filepath, 'w') as f:
with open(filepath, "w") as f:
json.dump(data, f, indent=2)
self.display.console.print(f"[green]Results saved to {filepath}[/green]")
return data
def compare_with_baseline(self, baseline_filepath: str, save_current: bool = True, print_summary: bool = False) -> dict[str, Any]:
def compare_with_baseline(
self,
baseline_filepath: str,
save_current: bool = True,
print_summary: bool = False,
) -> dict[str, Any]:
baseline_runs = []
if os.path.exists(baseline_filepath) and os.path.getsize(baseline_filepath) > 0:
try:
with open(baseline_filepath, 'r') as f:
with open(baseline_filepath, "r") as f:
baseline_data = json.load(f)
if isinstance(baseline_data, dict) and "timestamp" in baseline_data:
@@ -48,14 +63,18 @@ class ExperimentResults:
elif isinstance(baseline_data, list):
baseline_runs = baseline_data
except (json.JSONDecodeError, FileNotFoundError) as e:
self.display.console.print(f"[yellow]Warning: Could not load baseline file: {str(e)}[/yellow]")
self.display.console.print(
f"[yellow]Warning: Could not load baseline file: {e!s}[/yellow]"
)
if not baseline_runs:
if save_current:
current_data = self.to_json()
with open(baseline_filepath, 'w') as f:
with open(baseline_filepath, "w") as f:
json.dump([current_data], f, indent=2)
self.display.console.print(f"[green]Saved current results as new baseline to {baseline_filepath}[/green]")
self.display.console.print(
f"[green]Saved current results as new baseline to {baseline_filepath}[/green]"
)
return {"is_baseline": True, "changes": {}}
baseline_runs.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
@@ -69,9 +88,11 @@ class ExperimentResults:
if save_current:
current_data = self.to_json()
baseline_runs.append(current_data)
with open(baseline_filepath, 'w') as f:
with open(baseline_filepath, "w") as f:
json.dump(baseline_runs, f, indent=2)
self.display.console.print(f"[green]Added current results to baseline file {baseline_filepath}[/green]")
self.display.console.print(
f"[green]Added current results to baseline file {baseline_filepath}[/green]"
)
return comparison
@@ -118,5 +139,5 @@ class ExperimentResults:
"new_tests": new_tests,
"missing_tests": missing_tests,
"total_compared": len(improved) + len(regressed) + len(unchanged),
"baseline_timestamp": baseline_run.get("timestamp", "unknown")
"baseline_timestamp": baseline_run.get("timestamp", "unknown"),
}

View File

@@ -1,9 +1,12 @@
from typing import Dict, Any
from typing import Any
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.table import Table
from crewai.experimental.evaluation.experiment.result import ExperimentResults
class ExperimentResultsDisplay:
def __init__(self):
self.console = Console()
@@ -19,13 +22,19 @@ class ExperimentResultsDisplay:
table.add_row("Total Test Cases", str(total))
table.add_row("Passed", str(passed))
table.add_row("Failed", str(total - passed))
table.add_row("Success Rate", f"{(passed / total * 100):.1f}%" if total > 0 else "N/A")
table.add_row(
"Success Rate", f"{(passed / total * 100):.1f}%" if total > 0 else "N/A"
)
self.console.print(table)
def comparison_summary(self, comparison: Dict[str, Any], baseline_timestamp: str):
self.console.print(Panel(f"[bold]Comparison with baseline run from {baseline_timestamp}[/bold]",
expand=False))
def comparison_summary(self, comparison: dict[str, Any], baseline_timestamp: str):
self.console.print(
Panel(
f"[bold]Comparison with baseline run from {baseline_timestamp}[/bold]",
expand=False,
)
)
table = Table(title="Results Comparison")
table.add_column("Metric", style="cyan")
@@ -34,7 +43,9 @@ class ExperimentResultsDisplay:
improved = comparison.get("improved", [])
if improved:
details = ", ".join([f"{test_identifier}" for test_identifier in improved[:3]])
details = ", ".join(
[f"{test_identifier}" for test_identifier in improved[:3]]
)
if len(improved) > 3:
details += f" and {len(improved) - 3} more"
table.add_row("✅ Improved", str(len(improved)), details)
@@ -43,7 +54,9 @@ class ExperimentResultsDisplay:
regressed = comparison.get("regressed", [])
if regressed:
details = ", ".join([f"{test_identifier}" for test_identifier in regressed[:3]])
details = ", ".join(
[f"{test_identifier}" for test_identifier in regressed[:3]]
)
if len(regressed) > 3:
details += f" and {len(regressed) - 3} more"
table.add_row("❌ Regressed", str(len(regressed)), details, style="red")
@@ -58,13 +71,13 @@ class ExperimentResultsDisplay:
details = ", ".join(new_tests[:3])
if len(new_tests) > 3:
details += f" and {len(new_tests) - 3} more"
table.add_row(" New Tests", str(len(new_tests)), details)
table.add_row("+ New Tests", str(len(new_tests)), details)
missing_tests = comparison.get("missing_tests", [])
if missing_tests:
details = ", ".join(missing_tests[:3])
if len(missing_tests) > 3:
details += f" and {len(missing_tests) - 3} more"
table.add_row(" Missing Tests", str(len(missing_tests)), details)
table.add_row("- Missing Tests", str(len(missing_tests)), details)
self.console.print(table)

View File

@@ -2,11 +2,20 @@ from collections import defaultdict
from hashlib import md5
from typing import Any
from crewai import Crew, Agent
from crewai import Agent, Crew
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.experimental.evaluation import AgentEvaluator, create_default_evaluator
from crewai.experimental.evaluation.experiment.result_display import ExperimentResultsDisplay
from crewai.experimental.evaluation.experiment.result import ExperimentResults, ExperimentResult
from crewai.experimental.evaluation.evaluation_display import AgentAggregatedEvaluationResult
from crewai.experimental.evaluation.evaluation_display import (
AgentAggregatedEvaluationResult,
)
from crewai.experimental.evaluation.experiment.result import (
ExperimentResult,
ExperimentResults,
)
from crewai.experimental.evaluation.experiment.result_display import (
ExperimentResultsDisplay,
)
class ExperimentRunner:
def __init__(self, dataset: list[dict[str, Any]]):
@@ -14,11 +23,17 @@ class ExperimentRunner:
self.evaluator: AgentEvaluator | None = None
self.display = ExperimentResultsDisplay()
def run(self, crew: Crew | None = None, agents: list[Agent] | None = None, print_summary: bool = False) -> ExperimentResults:
def run(
self,
crew: Crew | None = None,
agents: list[Agent] | list[BaseAgent] | None = None,
print_summary: bool = False,
) -> ExperimentResults:
if crew and not agents:
agents = crew.agents
assert agents is not None
if agents is None:
raise ValueError("Agents must be provided either directly or via a crew")
self.evaluator = create_default_evaluator(agents=agents)
results = []
@@ -35,21 +50,37 @@ class ExperimentRunner:
return experiment_results
def _run_test_case(self, test_case: dict[str, Any], agents: list[Agent], crew: Crew | None = None) -> ExperimentResult:
def _run_test_case(
self,
test_case: dict[str, Any],
agents: list[Agent] | list[BaseAgent],
crew: Crew | None = None,
) -> ExperimentResult:
inputs = test_case["inputs"]
expected_score = test_case["expected_score"]
identifier = test_case.get("identifier") or md5(str(test_case).encode(), usedforsecurity=False).hexdigest()
identifier = (
test_case.get("identifier")
or md5(str(test_case).encode(), usedforsecurity=False).hexdigest()
)
try:
self.display.console.print(f"[dim]Running crew with input: {str(inputs)[:50]}...[/dim]")
self.display.console.print(
f"[dim]Running crew with input: {str(inputs)[:50]}...[/dim]"
)
self.display.console.print("\n")
if crew:
crew.kickoff(inputs=inputs)
else:
for agent in agents:
agent.kickoff(**inputs)
if isinstance(agent, Agent):
agent.kickoff(**inputs)
else:
raise TypeError(
f"Agent {agent} is not an instance of Agent and cannot be kicked off directly"
)
assert self.evaluator is not None
if self.evaluator is None:
raise ValueError("Evaluator must be initialized")
agent_evaluations = self.evaluator.get_agent_evaluation()
actual_score = self._extract_scores(agent_evaluations)
@@ -61,35 +92,38 @@ class ExperimentRunner:
score=actual_score,
expected_score=expected_score,
passed=passed,
agent_evaluations=agent_evaluations
agent_evaluations=agent_evaluations,
)
except Exception as e:
self.display.console.print(f"[red]Error running test case: {str(e)}[/red]")
self.display.console.print(f"[red]Error running test case: {e!s}[/red]")
return ExperimentResult(
identifier=identifier,
inputs=inputs,
score=0,
score=0.0,
expected_score=expected_score,
passed=False
passed=False,
)
def _extract_scores(self, agent_evaluations: dict[str, AgentAggregatedEvaluationResult]) -> float | dict[str, float]:
def _extract_scores(
self, agent_evaluations: dict[str, AgentAggregatedEvaluationResult]
) -> float | dict[str, float]:
all_scores: dict[str, list[float]] = defaultdict(list)
for evaluation in agent_evaluations.values():
for metric_name, score in evaluation.metrics.items():
if score.score is not None:
all_scores[metric_name.value].append(score.score)
avg_scores = {m: sum(s)/len(s) for m, s in all_scores.items()}
avg_scores = {m: sum(s) / len(s) for m, s in all_scores.items()}
if len(avg_scores) == 1:
return list(avg_scores.values())[0]
return next(iter(avg_scores.values()))
return avg_scores
def _assert_scores(self, expected: float | dict[str, float],
actual: float | dict[str, float]) -> bool:
def _assert_scores(
self, expected: float | dict[str, float], actual: float | dict[str, float]
) -> bool:
"""
Compare expected and actual scores, and return whether the test case passed.
@@ -122,4 +156,4 @@ class ExperimentRunner:
# All matching keys must have actual >= expected
return all(actual[key] >= expected[key] for key in matching_keys)
return False
return False

View File

@@ -13,11 +13,11 @@ def extract_json_from_llm_response(text: str) -> dict[str, Any]:
json_patterns = [
# Standard markdown code blocks with json
r'```json\s*([\s\S]*?)\s*```',
r"```json\s*([\s\S]*?)\s*```",
# Code blocks without language specifier
r'```\s*([\s\S]*?)\s*```',
r"```\s*([\s\S]*?)\s*```",
# Inline code with JSON
r'`([{\\[].*[}\]])`',
r"`([{\\[].*[}\]])`",
]
for pattern in json_patterns:
@@ -25,6 +25,6 @@ def extract_json_from_llm_response(text: str) -> dict[str, Any]:
for match in matches:
try:
return json.loads(match.strip())
except json.JSONDecodeError:
except json.JSONDecodeError: # noqa: PERF203
continue
raise ValueError("No valid JSON found in the response")

View File

@@ -1,26 +1,21 @@
from crewai.experimental.evaluation.metrics.goal_metrics import GoalAlignmentEvaluator
from crewai.experimental.evaluation.metrics.reasoning_metrics import (
ReasoningEfficiencyEvaluator
ReasoningEfficiencyEvaluator,
)
from crewai.experimental.evaluation.metrics.tools_metrics import (
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.experimental.evaluation.metrics.goal_metrics import (
GoalAlignmentEvaluator
)
from crewai.experimental.evaluation.metrics.semantic_quality_metrics import (
SemanticQualityEvaluator
SemanticQualityEvaluator,
)
from crewai.experimental.evaluation.metrics.tools_metrics import (
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ToolSelectionEvaluator,
)
__all__ = [
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"GoalAlignmentEvaluator",
"SemanticQualityEvaluator"
]
"ParameterExtractionEvaluator",
"ReasoningEfficiencyEvaluator",
"SemanticQualityEvaluator",
"ToolInvocationEvaluator",
"ToolSelectionEvaluator",
]

View File

@@ -1,10 +1,15 @@
from typing import Any, Dict
from typing import Any
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.experimental.evaluation.base_evaluator import (
BaseEvaluator,
EvaluationScore,
MetricCategory,
)
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.task import Task
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
class GoalAlignmentEvaluator(BaseEvaluator):
@property
@@ -13,8 +18,8 @@ class GoalAlignmentEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
agent: Agent | BaseAgent,
execution_trace: dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
@@ -23,7 +28,9 @@ class GoalAlignmentEvaluator(BaseEvaluator):
task_context = f"Task description: {task.description}\nExpected output: {task.expected_output}\n"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent's output aligns with its assigned task goal.
{
"role": "system",
"content": """You are an expert evaluator assessing how well an AI agent's output aligns with its assigned task goal.
Score the agent's goal alignment on a scale from 0-10 where:
- 0: Complete misalignment, agent did not understand or attempt the task goal
@@ -37,8 +44,11 @@ Consider:
4. Did the agent provide all requested information or deliverables?
Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string).
"""},
{"role": "user", "content": f"""
""",
},
{
"role": "user",
"content": f"""
Agent role: {agent.role}
Agent goal: {agent.goal}
{task_context}
@@ -47,23 +57,26 @@ Agent's final output:
{final_output}
Evaluate how well the agent's output aligns with the assigned task goal.
"""}
""",
},
]
assert self.llm is not None
if self.llm is None:
raise ValueError("LLM must be initialized")
response = self.llm.call(prompt)
try:
evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
assert evaluation_data is not None
if evaluation_data is None:
raise ValueError("Failed to extract evaluation data from LLM response")
return EvaluationScore(
score=evaluation_data.get("score", 0),
feedback=evaluation_data.get("feedback", response),
raw_response=response
raw_response=response,
)
except Exception:
return EvaluationScore(
score=None,
feedback=f"Failed to parse evaluation. Raw response: {response}",
raw_response=response
raw_response=response,
)

View File

@@ -8,18 +8,24 @@ This module provides evaluator implementations for:
import logging
import re
from enum import Enum
from typing import Any, Dict, List, Tuple
import numpy as np
from collections.abc import Sequence
from enum import Enum
from typing import Any
import numpy as np
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.experimental.evaluation.base_evaluator import (
BaseEvaluator,
EvaluationScore,
MetricCategory,
)
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
class ReasoningPatternType(Enum):
EFFICIENT = "efficient" # Good reasoning flow
LOOP = "loop" # Agent is stuck in a loop
@@ -35,8 +41,8 @@ class ReasoningEfficiencyEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
agent: Agent | BaseAgent,
execution_trace: dict[str, Any],
final_output: TaskOutput | str,
task: Task | None = None,
) -> EvaluationScore:
@@ -49,7 +55,7 @@ class ReasoningEfficiencyEvaluator(BaseEvaluator):
if not llm_calls or len(llm_calls) < 2:
return EvaluationScore(
score=None,
feedback="Insufficient LLM calls to evaluate reasoning efficiency."
feedback="Insufficient LLM calls to evaluate reasoning efficiency.",
)
total_calls = len(llm_calls)
@@ -58,12 +64,16 @@ class ReasoningEfficiencyEvaluator(BaseEvaluator):
time_intervals = []
has_reliable_timing = True
for i in range(1, len(llm_calls)):
start_time = llm_calls[i-1].get("end_time")
start_time = llm_calls[i - 1].get("end_time")
end_time = llm_calls[i].get("start_time")
if start_time and end_time and start_time != end_time:
try:
interval = end_time - start_time
time_intervals.append(interval.total_seconds() if hasattr(interval, 'total_seconds') else 0)
time_intervals.append(
interval.total_seconds()
if hasattr(interval, "total_seconds")
else 0
)
except Exception:
has_reliable_timing = False
else:
@@ -83,14 +93,22 @@ class ReasoningEfficiencyEvaluator(BaseEvaluator):
if has_reliable_timing and time_intervals:
efficiency_metrics["avg_time_between_calls"] = np.mean(time_intervals)
loop_info = f"Detected {len(loop_details)} potential reasoning loops." if loop_detected else "No significant reasoning loops detected."
loop_info = (
f"Detected {len(loop_details)} potential reasoning loops."
if loop_detected
else "No significant reasoning loops detected."
)
call_samples = self._get_call_samples(llm_calls)
final_output = final_output.raw if isinstance(final_output, TaskOutput) else final_output
final_output = (
final_output.raw if isinstance(final_output, TaskOutput) else final_output
)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the reasoning efficiency of an AI agent's thought process.
{
"role": "system",
"content": """You are an expert evaluator assessing the reasoning efficiency of an AI agent's thought process.
Evaluate the agent's reasoning efficiency across these five key subcategories:
@@ -120,8 +138,11 @@ Return your evaluation as JSON with the following structure:
"feedback": string (general feedback about overall reasoning efficiency),
"optimization_suggestions": string (concrete suggestions for improving reasoning efficiency),
"detected_patterns": string (describe any inefficient reasoning patterns you observe)
}"""},
{"role": "user", "content": f"""
}""",
},
{
"role": "user",
"content": f"""
Agent role: {agent.role}
{task_context}
@@ -140,10 +161,12 @@ Agent's final output:
Evaluate the reasoning efficiency of this agent based on these interaction patterns.
Identify any inefficient reasoning patterns and provide specific suggestions for optimization.
"""}
""",
},
]
assert self.llm is not None
if self.llm is None:
raise ValueError("LLM must be initialized")
response = self.llm.call(prompt)
try:
@@ -156,34 +179,46 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
conciseness = scores.get("conciseness", 5.0)
loop_avoidance = scores.get("loop_avoidance", 5.0)
overall_score = evaluation_data.get("overall_score", evaluation_data.get("score", 5.0))
overall_score = evaluation_data.get(
"overall_score", evaluation_data.get("score", 5.0)
)
feedback = evaluation_data.get("feedback", "No detailed feedback provided.")
optimization_suggestions = evaluation_data.get("optimization_suggestions", "No specific suggestions provided.")
optimization_suggestions = evaluation_data.get(
"optimization_suggestions", "No specific suggestions provided."
)
detailed_feedback = "Reasoning Efficiency Evaluation:\n"
detailed_feedback += f"• Focus: {focus}/10 - Staying on topic without tangents\n"
detailed_feedback += f"• Progression: {progression}/10 - Building on previous thinking\n"
detailed_feedback += (
f"• Focus: {focus}/10 - Staying on topic without tangents\n"
)
detailed_feedback += (
f"• Progression: {progression}/10 - Building on previous thinking\n"
)
detailed_feedback += f"• Decision Quality: {decision_quality}/10 - Making appropriate decisions\n"
detailed_feedback += f"• Conciseness: {conciseness}/10 - Communicating efficiently\n"
detailed_feedback += (
f"• Conciseness: {conciseness}/10 - Communicating efficiently\n"
)
detailed_feedback += f"• Loop Avoidance: {loop_avoidance}/10 - Avoiding repetitive patterns\n\n"
detailed_feedback += f"Feedback:\n{feedback}\n\n"
detailed_feedback += f"Optimization Suggestions:\n{optimization_suggestions}"
detailed_feedback += (
f"Optimization Suggestions:\n{optimization_suggestions}"
)
return EvaluationScore(
score=float(overall_score),
feedback=detailed_feedback,
raw_response=response
raw_response=response,
)
except Exception as e:
logging.warning(f"Failed to parse reasoning efficiency evaluation: {e}")
return EvaluationScore(
score=None,
feedback=f"Failed to parse reasoning efficiency evaluation. Raw response: {response[:200]}...",
raw_response=response
raw_response=response,
)
def _detect_loops(self, llm_calls: List[Dict]) -> Tuple[bool, List[Dict]]:
def _detect_loops(self, llm_calls: list[dict]) -> tuple[bool, list[dict]]:
loop_details = []
messages = []
@@ -193,9 +228,11 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
messages.append(content)
elif isinstance(content, list) and len(content) > 0:
# Handle message list format
for msg in content:
if isinstance(msg, dict) and "content" in msg:
messages.append(msg["content"])
messages.extend(
msg["content"]
for msg in content
if isinstance(msg, dict) and "content" in msg
)
# Simple n-gram based similarity detection
# For a more robust implementation, consider using embedding-based similarity
@@ -205,18 +242,20 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
# A more sophisticated approach would use semantic similarity
similarity = self._calculate_text_similarity(messages[i], messages[j])
if similarity > 0.7: # Arbitrary threshold
loop_details.append({
"first_occurrence": i,
"second_occurrence": j,
"similarity": similarity,
"snippet": messages[i][:100] + "..."
})
loop_details.append(
{
"first_occurrence": i,
"second_occurrence": j,
"similarity": similarity,
"snippet": messages[i][:100] + "...",
}
)
return len(loop_details) > 0, loop_details
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
text1 = re.sub(r'\s+', ' ', text1.lower()).strip()
text2 = re.sub(r'\s+', ' ', text2.lower()).strip()
text1 = re.sub(r"\s+", " ", text1.lower()).strip()
text2 = re.sub(r"\s+", " ", text2.lower()).strip()
# Simple Jaccard similarity on word sets
words1 = set(text1.split())
@@ -227,7 +266,7 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
return intersection / union if union > 0 else 0.0
def _analyze_reasoning_patterns(self, llm_calls: List[Dict]) -> Dict[str, Any]:
def _analyze_reasoning_patterns(self, llm_calls: list[dict]) -> dict[str, Any]:
call_lengths = []
response_times = []
@@ -248,8 +287,8 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
if start_time and end_time:
try:
response_times.append(end_time - start_time)
except Exception:
pass
except Exception as e:
logging.debug(f"Failed to calculate response time: {e}")
avg_length = np.mean(call_lengths) if call_lengths else 0
std_length = np.std(call_lengths) if call_lengths else 0
@@ -267,7 +306,9 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
details = "Agent is consistently verbose across interactions."
elif len(llm_calls) > 10 and length_trend > 0.5:
primary_pattern = ReasoningPatternType.INDECISIVE
details = "Agent shows signs of indecisiveness with increasing message lengths."
details = (
"Agent shows signs of indecisiveness with increasing message lengths."
)
elif std_length / avg_length > 0.8:
primary_pattern = ReasoningPatternType.SCATTERED
details = "Agent shows inconsistent reasoning flow with highly variable responses."
@@ -279,8 +320,8 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
"avg_length": avg_length,
"std_length": std_length,
"length_trend": length_trend,
"loop_score": loop_score
}
"loop_score": loop_score,
},
}
def _calculate_trend(self, values: Sequence[float | int]) -> float:
@@ -303,7 +344,9 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
except Exception:
return 0.0
def _calculate_loop_likelihood(self, call_lengths: Sequence[float], response_times: Sequence[float]) -> float:
def _calculate_loop_likelihood(
self, call_lengths: Sequence[float], response_times: Sequence[float]
) -> float:
if not call_lengths or len(call_lengths) < 3:
return 0.0
@@ -312,7 +355,11 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
if len(call_lengths) >= 4:
repeated_lengths = 0
for i in range(len(call_lengths) - 2):
ratio = call_lengths[i] / call_lengths[i + 2] if call_lengths[i + 2] > 0 else 0
ratio = (
call_lengths[i] / call_lengths[i + 2]
if call_lengths[i + 2] > 0
else 0
)
if 0.85 <= ratio <= 1.15:
repeated_lengths += 1
@@ -324,21 +371,27 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
std_time = np.std(response_times)
mean_time = np.mean(response_times)
if mean_time > 0:
time_consistency = 1.0 - (std_time / mean_time)
indicators.append(max(0, time_consistency - 0.3) * 1.5)
except Exception:
pass
time_consistency = 1.0 - (float(std_time) / float(mean_time))
indicators.append(max(0.0, float(time_consistency - 0.3)) * 1.5)
except Exception as e:
logging.debug(f"Time consistency calculation failed: {e}")
return np.mean(indicators) if indicators else 0.0
return float(np.mean(indicators)) if indicators else 0.0
def _get_call_samples(self, llm_calls: List[Dict]) -> str:
def _get_call_samples(self, llm_calls: list[dict]) -> str:
samples = []
if len(llm_calls) <= 6:
sample_indices = list(range(len(llm_calls)))
else:
sample_indices = [0, 1, len(llm_calls) // 2 - 1, len(llm_calls) // 2,
len(llm_calls) - 2, len(llm_calls) - 1]
sample_indices = [
0,
1,
len(llm_calls) // 2 - 1,
len(llm_calls) // 2,
len(llm_calls) - 2,
len(llm_calls) - 1,
]
for idx in sample_indices:
call = llm_calls[idx]
@@ -347,10 +400,11 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
if isinstance(content, str):
sample = content
elif isinstance(content, list) and len(content) > 0:
sample_parts = []
for msg in content:
if isinstance(msg, dict) and "content" in msg:
sample_parts.append(msg["content"])
sample_parts = [
msg["content"]
for msg in content
if isinstance(msg, dict) and "content" in msg
]
sample = "\n".join(sample_parts)
else:
sample = str(content)

View File

@@ -1,10 +1,15 @@
from typing import Any, Dict
from typing import Any
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.experimental.evaluation.base_evaluator import (
BaseEvaluator,
EvaluationScore,
MetricCategory,
)
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.task import Task
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
class SemanticQualityEvaluator(BaseEvaluator):
@property
@@ -13,8 +18,8 @@ class SemanticQualityEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
agent: Agent | BaseAgent,
execution_trace: dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
@@ -22,7 +27,9 @@ class SemanticQualityEvaluator(BaseEvaluator):
if task is not None:
task_context = f"Task description: {task.description}"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the semantic quality of an AI agent's output.
{
"role": "system",
"content": """You are an expert evaluator assessing the semantic quality of an AI agent's output.
Score the semantic quality on a scale from 0-10 where:
- 0: Completely incoherent, confusing, or logically flawed output
@@ -37,8 +44,11 @@ Consider:
5. Is the output free from contradictions and logical fallacies?
Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string).
"""},
{"role": "user", "content": f"""
""",
},
{
"role": "user",
"content": f"""
Agent role: {agent.role}
{task_context}
@@ -46,23 +56,28 @@ Agent's final output:
{final_output}
Evaluate the semantic quality and reasoning of this output.
"""}
""",
},
]
assert self.llm is not None
if self.llm is None:
raise ValueError("LLM must be initialized")
response = self.llm.call(prompt)
try:
evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
assert evaluation_data is not None
if evaluation_data is None:
raise ValueError("Failed to extract evaluation data from LLM response")
return EvaluationScore(
score=float(evaluation_data["score"]) if evaluation_data.get("score") is not None else None,
score=float(evaluation_data["score"])
if evaluation_data.get("score") is not None
else None,
feedback=evaluation_data.get("feedback", response),
raw_response=response
raw_response=response,
)
except Exception:
return EvaluationScore(
score=None,
feedback=f"Failed to parse evaluation. Raw response: {response}",
raw_response=response
)
raw_response=response,
)

View File

@@ -1,22 +1,26 @@
import json
from typing import Dict, Any
from typing import Any
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.experimental.evaluation.base_evaluator import (
BaseEvaluator,
EvaluationScore,
MetricCategory,
)
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.task import Task
class ToolSelectionEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.TOOL_SELECTION
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
agent: Agent | BaseAgent,
execution_trace: dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
@@ -26,19 +30,18 @@ class ToolSelectionEvaluator(BaseEvaluator):
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
unique_tool_types = set([tool.get("tool", "Unknown tool") for tool in tool_uses])
unique_tool_types = set(
[tool.get("tool", "Unknown tool") for tool in tool_uses]
)
if tool_count == 0:
if not agent.tools:
return EvaluationScore(
score=None,
feedback="Agent had no tools available to use."
)
else:
return EvaluationScore(
score=None,
feedback="Agent had tools available but didn't use any."
score=None, feedback="Agent had no tools available to use."
)
return EvaluationScore(
score=None, feedback="Agent had tools available but didn't use any."
)
available_tools_info = ""
if agent.tools:
@@ -52,7 +55,9 @@ class ToolSelectionEvaluator(BaseEvaluator):
tool_types_summary += f"- {tool_type}\n"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing if an AI agent selected the most appropriate tools for a given task.
{
"role": "system",
"content": """You are an expert evaluator assessing if an AI agent selected the most appropriate tools for a given task.
You must evaluate based on these 2 criteria:
1. Relevance (0-10): Were the tools chosen directly aligned with the task's goals?
@@ -73,8 +78,11 @@ Return your evaluation as JSON with these fields:
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on tool selection decisions from available tools)
- improvement_suggestions: string (ONLY suggest better selection from the AVAILABLE tools list, NOT new tools)
"""},
{"role": "user", "content": f"""
""",
},
{
"role": "user",
"content": f"""
Agent role: {agent.role}
{task_context}
@@ -89,14 +97,17 @@ IMPORTANT:
- ONLY evaluate selection from tools listed as available
- DO NOT suggest new tools that aren't in the available tools list
- DO NOT evaluate tool usage or results
"""}
""",
},
]
assert self.llm is not None
if self.llm is None:
raise ValueError("LLM must be initialized")
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
if evaluation_data is None:
raise ValueError("Failed to extract evaluation data from LLM response")
scores = evaluation_data.get("scores", {})
relevance = scores.get("relevance", 5.0)
@@ -105,22 +116,24 @@ IMPORTANT:
feedback = "Tool Selection Evaluation:\n"
feedback += f"• Relevance: {relevance}/10 - Selection of appropriate tool types for the task\n"
feedback += f"• Coverage: {coverage}/10 - Selection of all necessary tool types\n"
feedback += (
f"• Coverage: {coverage}/10 - Selection of all necessary tool types\n"
)
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
feedback += evaluation_data.get(
"feedback", "No detailed feedback available."
)
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
score=overall_score, feedback=feedback, raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating tool selection: {e}",
raw_response=response
raw_response=response,
)
@@ -131,8 +144,8 @@ class ParameterExtractionEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
agent: Agent | BaseAgent,
execution_trace: dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
@@ -145,19 +158,23 @@ class ParameterExtractionEvaluator(BaseEvaluator):
if tool_count == 0:
return EvaluationScore(
score=None,
feedback="No tool usage detected. Cannot evaluate parameter extraction."
feedback="No tool usage detected. Cannot evaluate parameter extraction.",
)
validation_errors = []
for tool_use in tool_uses:
if not tool_use.get("success", True) and tool_use.get("error_type") == "validation_error":
validation_errors.append({
"tool": tool_use.get("tool", "Unknown tool"),
"error": tool_use.get("result"),
"args": tool_use.get("args", {})
})
validation_errors = [
{
"tool": tool_use.get("tool", "Unknown tool"),
"error": tool_use.get("result"),
"args": tool_use.get("args", {}),
}
for tool_use in tool_uses
if not tool_use.get("success", True)
and tool_use.get("error_type") == "validation_error"
]
validation_error_rate = len(validation_errors) / tool_count if tool_count > 0 else 0
validation_error_rate = (
len(validation_errors) / tool_count if tool_count > 0 else 0
)
param_samples = []
for i, tool_use in enumerate(tool_uses[:5]):
@@ -168,7 +185,7 @@ class ParameterExtractionEvaluator(BaseEvaluator):
is_validation_error = error_type == "validation_error"
sample = f"Tool use #{i+1} - {tool_name}:\n"
sample = f"Tool use #{i + 1} - {tool_name}:\n"
sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n"
sample += f"- Success: {'No' if not success else 'Yes'}"
@@ -187,13 +204,17 @@ class ParameterExtractionEvaluator(BaseEvaluator):
tool_name = err.get("tool", "Unknown tool")
error_msg = err.get("error", "Unknown error")
args = err.get("args", {})
validation_errors_info += f"\nValidation Error #{i+1}:\n- Tool: {tool_name}\n- Args: {json.dumps(args, indent=2)}\n- Error: {error_msg}"
validation_errors_info += f"\nValidation Error #{i + 1}:\n- Tool: {tool_name}\n- Args: {json.dumps(args, indent=2)}\n- Error: {error_msg}"
if len(validation_errors) > 3:
validation_errors_info += f"\n...and {len(validation_errors) - 3} more validation errors."
validation_errors_info += (
f"\n...and {len(validation_errors) - 3} more validation errors."
)
param_samples_text = "\n\n".join(param_samples)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent extracts and formats PARAMETER VALUES for tool calls.
{
"role": "system",
"content": """You are an expert evaluator assessing how well an AI agent extracts and formats PARAMETER VALUES for tool calls.
Your job is to evaluate ONLY whether the agent used the correct parameter VALUES, not whether the right tools were selected or how the tools were invoked.
@@ -216,8 +237,11 @@ Return your evaluation as JSON with these fields:
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on parameter value extraction quality)
- improvement_suggestions: string (concrete suggestions for better parameter VALUE extraction)
"""},
{"role": "user", "content": f"""
""",
},
{
"role": "user",
"content": f"""
Agent role: {agent.role}
{task_context}
@@ -226,15 +250,18 @@ Parameter extraction examples:
{validation_errors_info}
Evaluate the quality of the agent's parameter extraction for this task.
"""}
""",
},
]
assert self.llm is not None
if self.llm is None:
raise ValueError("LLM must be initialized")
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
if evaluation_data is None:
raise ValueError("Failed to extract evaluation data from LLM response")
scores = evaluation_data.get("scores", {})
accuracy = scores.get("accuracy", 5.0)
@@ -251,18 +278,18 @@ Evaluate the quality of the agent's parameter extraction for this task.
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
feedback += evaluation_data.get(
"feedback", "No detailed feedback available."
)
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
score=overall_score, feedback=feedback, raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating parameter extraction: {e}",
raw_response=response
raw_response=response,
)
@@ -273,8 +300,8 @@ class ToolInvocationEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
agent: Agent | BaseAgent,
execution_trace: dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
@@ -288,7 +315,7 @@ class ToolInvocationEvaluator(BaseEvaluator):
if tool_count == 0:
return EvaluationScore(
score=None,
feedback="No tool usage detected. Cannot evaluate tool invocation."
feedback="No tool usage detected. Cannot evaluate tool invocation.",
)
for tool_use in tool_uses:
@@ -296,7 +323,7 @@ class ToolInvocationEvaluator(BaseEvaluator):
error_info = {
"tool": tool_use.get("tool", "Unknown tool"),
"error": tool_use.get("result"),
"error_type": tool_use.get("error_type", "unknown_error")
"error_type": tool_use.get("error_type", "unknown_error"),
}
tool_errors.append(error_info)
@@ -315,9 +342,11 @@ class ToolInvocationEvaluator(BaseEvaluator):
tool_args = tool_use.get("args", {})
success = tool_use.get("success", True) and not tool_use.get("error", False)
error_type = tool_use.get("error_type", "") if not success else ""
error_msg = tool_use.get("result", "No error") if not success else "No error"
error_msg = (
tool_use.get("result", "No error") if not success else "No error"
)
sample = f"Tool invocation #{i+1}:\n"
sample = f"Tool invocation #{i + 1}:\n"
sample += f"- Tool: {tool_name}\n"
sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n"
sample += f"- Success: {'No' if not success else 'Yes'}\n"
@@ -330,11 +359,13 @@ class ToolInvocationEvaluator(BaseEvaluator):
if error_types:
error_type_summary = "Error type breakdown:\n"
for error_type, count in error_types.items():
error_type_summary += f"- {error_type}: {count} occurrences ({(count/tool_count):.1%})\n"
error_type_summary += f"- {error_type}: {count} occurrences ({(count / tool_count):.1%})\n"
invocation_samples_text = "\n\n".join(invocation_samples)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how correctly an AI agent's tool invocations are STRUCTURED.
{
"role": "system",
"content": """You are an expert evaluator assessing how correctly an AI agent's tool invocations are STRUCTURED.
Your job is to evaluate ONLY the structural and syntactical aspects of how the agent called tools, NOT which tools were selected or what parameter values were used.
@@ -359,8 +390,11 @@ Return your evaluation as JSON with these fields:
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on structural aspects of tool invocation)
- improvement_suggestions: string (concrete suggestions for better structuring of tool calls)
"""},
{"role": "user", "content": f"""
""",
},
{
"role": "user",
"content": f"""
Agent role: {agent.role}
{task_context}
@@ -371,15 +405,18 @@ Tool error rate: {error_rate:.2%} ({len(tool_errors)} errors out of {tool_count}
{error_type_summary}
Evaluate the quality of the agent's tool invocation structure during this task.
"""}
""",
},
]
assert self.llm is not None
if self.llm is None:
raise ValueError("LLM must be initialized")
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
if evaluation_data is None:
raise ValueError("Failed to extract evaluation data from LLM response")
scores = evaluation_data.get("scores", {})
structure = scores.get("structure", 5.0)
error_handling = scores.get("error_handling", 5.0)
@@ -388,23 +425,25 @@ Evaluate the quality of the agent's tool invocation structure during this task.
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Tool Invocation Evaluation:\n"
feedback += f"• Structure: {structure}/10 - Following proper syntax and format\n"
feedback += (
f"• Structure: {structure}/10 - Following proper syntax and format\n"
)
feedback += f"• Error Handling: {error_handling}/10 - Appropriately handling tool errors\n"
feedback += f"• Invocation Patterns: {invocation_patterns}/10 - Proper sequencing and management of calls\n\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
feedback += evaluation_data.get(
"feedback", "No detailed feedback available."
)
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
score=overall_score, feedback=feedback, raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating tool invocation: {e}",
raw_response=response
raw_response=response,
)

View File

@@ -1,12 +1,21 @@
import inspect
import warnings
from typing_extensions import Any
import warnings
from crewai.experimental.evaluation.experiment import ExperimentResults, ExperimentRunner
from crewai import Crew, Agent
def assert_experiment_successfully(experiment_results: ExperimentResults, baseline_filepath: str | None = None) -> None:
failed_tests = [result for result in experiment_results.results if not result.passed]
from crewai import Agent, Crew
from crewai.experimental.evaluation.experiment import (
ExperimentResults,
ExperimentRunner,
)
def assert_experiment_successfully(
experiment_results: ExperimentResults, baseline_filepath: str | None = None
) -> None:
failed_tests = [
result for result in experiment_results.results if not result.passed
]
if failed_tests:
detailed_failures: list[str] = []
@@ -14,39 +23,54 @@ def assert_experiment_successfully(experiment_results: ExperimentResults, baseli
for result in failed_tests:
expected = result.expected_score
actual = result.score
detailed_failures.append(f"- {result.identifier}: expected {expected}, got {actual}")
detailed_failures.append(
f"- {result.identifier}: expected {expected}, got {actual}"
)
failure_details = "\n".join(detailed_failures)
raise AssertionError(f"The following test cases failed:\n{failure_details}")
baseline_filepath = baseline_filepath or _get_baseline_filepath_fallback()
comparison = experiment_results.compare_with_baseline(baseline_filepath=baseline_filepath)
comparison = experiment_results.compare_with_baseline(
baseline_filepath=baseline_filepath
)
assert_experiment_no_regression(comparison)
def assert_experiment_no_regression(comparison_result: dict[str, list[str]]) -> None:
regressed = comparison_result.get("regressed", [])
if regressed:
raise AssertionError(f"Regression detected! The following tests that previously passed now fail: {regressed}")
raise AssertionError(
f"Regression detected! The following tests that previously passed now fail: {regressed}"
)
missing_tests = comparison_result.get("missing_tests", [])
if missing_tests:
warnings.warn(
f"Warning: {len(missing_tests)} tests from the baseline are missing in the current run: {missing_tests}",
UserWarning
UserWarning,
stacklevel=2,
)
def run_experiment(dataset: list[dict[str, Any]], crew: Crew | None = None, agents: list[Agent] | None = None, verbose: bool = False) -> ExperimentResults:
def run_experiment(
dataset: list[dict[str, Any]],
crew: Crew | None = None,
agents: list[Agent] | None = None,
verbose: bool = False,
) -> ExperimentResults:
runner = ExperimentRunner(dataset=dataset)
return runner.run(agents=agents, crew=crew, print_summary=verbose)
def _get_baseline_filepath_fallback() -> str:
test_func_name = "experiment_fallback"
try:
current_frame = inspect.currentframe()
if current_frame is not None:
test_func_name = current_frame.f_back.f_back.f_code.co_name # type: ignore[union-attr]
test_func_name = current_frame.f_back.f_back.f_code.co_name # type: ignore[union-attr]
except Exception:
...
return f"{test_func_name}_results.json"
return f"{test_func_name}_results.json"