Supporting eval single Agent/LiteAgent (#3167)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

* refactor: rely on task completion event to evaluate agents

* feat: remove Crew dependency to evaluate agent

* feat: drop execution_context in AgentEvaluator

* chore: drop experimental Agent Eval feature from stable crew.test

* feat: support eval LiteAgent

* resolve linter issues
This commit is contained in:
Lucas Gomide
2025-07-15 10:22:41 -03:00
committed by GitHub
parent 53f674be60
commit 6ebb6c9b63
16 changed files with 1313 additions and 148 deletions

View File

@@ -3,32 +3,32 @@ from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.evaluation_display import EvaluationDisplayFormatter
from typing import Any, Dict
from collections import defaultdict
from typing import Any
from crewai.experimental.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence
from crewai.crew import Crew
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.experimental.evaluation.evaluation_display import AgentAggregatedEvaluationResult
from contextlib import contextmanager
from crewai.utilities.events.task_events import TaskCompletedEvent
from crewai.utilities.events.agent_events import LiteAgentExecutionCompletedEvent
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult
import threading
class ExecutionState:
def __init__(self):
self.traces: dict[str, Any] = {}
self.current_agent_id: str | None = None
self.current_task_id: str | None = None
self.iteration: int = 1
self.iterations_results: dict[int, dict[str, list[AgentEvaluationResult]]] = {}
self.traces = {}
self.current_agent_id = None
self.current_task_id = None
self.iteration = 1
self.iterations_results = {}
self.agent_evaluators = {}
class AgentEvaluator:
def __init__(
self,
agents: list[Agent],
evaluators: Sequence[BaseEvaluator] | None = None,
crew: Crew | None = None,
):
self.crew: Crew | None = crew
self.agents: list[Agent] = agents
self.evaluators: Sequence[BaseEvaluator] | None = evaluators
self.callback = create_evaluation_callbacks()
@@ -37,19 +37,10 @@ class AgentEvaluator:
self._thread_local: threading.local = threading.local()
self.agent_evaluators: dict[str, Sequence[BaseEvaluator] | None] = {}
if crew is not None:
assert crew and crew.agents is not None
for agent in crew.agents:
self.agent_evaluators[str(agent.id)] = self.evaluators
for agent in self.agents:
self._execution_state.agent_evaluators[str(agent.id)] = self.evaluators
@contextmanager
def execution_context(self):
state = ExecutionState()
try:
yield state
finally:
pass
self._subscribe_to_events()
@property
def _execution_state(self) -> ExecutionState:
@@ -57,81 +48,95 @@ class AgentEvaluator:
self._thread_local.execution_state = ExecutionState()
return self._thread_local.execution_state
def _subscribe_to_events(self) -> None:
crewai_event_bus.register_handler(TaskCompletedEvent, self._handle_task_completed)
crewai_event_bus.register_handler(LiteAgentExecutionCompletedEvent, self._handle_lite_agent_completed)
def _handle_task_completed(self, source: Any, event: TaskCompletedEvent) -> None:
assert event.task is not None
agent = event.task.agent
if agent and str(getattr(agent, 'id', 'unknown')) in self._execution_state.agent_evaluators:
state = ExecutionState()
state.current_agent_id = str(agent.id)
state.current_task_id = str(event.task.id)
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
if not trace:
return
result = self.evaluate(
agent=agent,
task=event.task,
execution_trace=trace,
final_output=event.output,
state=state
)
current_iteration = self._execution_state.iteration
if current_iteration not in self._execution_state.iterations_results:
self._execution_state.iterations_results[current_iteration] = {}
if agent.role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent.role] = []
self._execution_state.iterations_results[current_iteration][agent.role].append(result)
def _handle_lite_agent_completed(self, source: object, event: LiteAgentExecutionCompletedEvent) -> None:
agent_info = event.agent_info
agent_id = str(agent_info["id"])
if agent_id in self._execution_state.agent_evaluators:
state = ExecutionState()
state.current_agent_id = agent_id
state.current_task_id = "lite_task"
target_agent = None
for agent in self.agents:
if str(agent.id) == agent_id:
target_agent = agent
break
if not target_agent:
return
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
if not trace:
return
result = self.evaluate(
agent=target_agent,
execution_trace=trace,
final_output=event.output,
state=state
)
current_iteration = self._execution_state.iteration
if current_iteration not in self._execution_state.iterations_results:
self._execution_state.iterations_results[current_iteration] = {}
agent_role = target_agent.role
if agent_role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent_role] = []
self._execution_state.iterations_results[current_iteration][agent_role].append(result)
def set_iteration(self, iteration: int) -> None:
self._execution_state.iteration = iteration
def reset_iterations_results(self) -> None:
self._execution_state.iterations_results = {}
def evaluate_current_iteration(self) -> dict[str, list[AgentEvaluationResult]]:
if not self.crew:
raise ValueError("Cannot evaluate: no crew was provided to the evaluator.")
if not self.callback:
raise ValueError("Cannot evaluate: no callback was set. Use set_callback() method first.")
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
evaluation_results: defaultdict[str, list[AgentEvaluationResult]] = defaultdict(list)
total_evals = 0
for agent in self.crew.agents:
for task in self.crew.tasks:
if task.agent and task.agent.id == agent.id and self.agent_evaluators.get(str(agent.id)):
total_evals += 1
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}[/bold blue]"),
BarColumn(),
TextColumn("{task.percentage:.0f}% completed"),
console=self.console_formatter.console
) as progress:
eval_task = progress.add_task(f"Evaluating agents (iteration {self._execution_state.iteration})...", total=total_evals)
with self.execution_context() as state:
state.iteration = self._execution_state.iteration
for agent in self.crew.agents:
evaluator = self.agent_evaluators.get(str(agent.id))
if not evaluator:
continue
for task in self.crew.tasks:
if task.agent and str(task.agent.id) != str(agent.id):
continue
trace = self.callback.get_trace(str(agent.id), str(task.id))
if not trace:
self.console_formatter.print(f"[yellow]Warning: No trace found for agent {agent.role} on task {task.description[:30]}...[/yellow]")
progress.update(eval_task, advance=1)
continue
state.current_agent_id = str(agent.id)
state.current_task_id = str(task.id)
with crewai_event_bus.scoped_handlers():
result = self.evaluate(
agent=agent,
task=task,
execution_trace=trace,
final_output=task.output,
state=state
)
evaluation_results[agent.role].append(result)
progress.update(eval_task, advance=1)
self._execution_state.iterations_results[self._execution_state.iteration] = evaluation_results
return evaluation_results
def get_evaluation_results(self) -> dict[str, list[AgentEvaluationResult]]:
if self._execution_state.iteration in self._execution_state.iterations_results:
if self._execution_state.iterations_results and self._execution_state.iteration in self._execution_state.iterations_results:
return self._execution_state.iterations_results[self._execution_state.iteration]
return self.evaluate_current_iteration()
return {}
def display_results_with_iterations(self) -> None:
self.display_formatter.display_summary_results(self._execution_state.iterations_results)
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = False) -> Dict[str, AgentAggregatedEvaluationResult]:
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = True) -> dict[str, AgentAggregatedEvaluationResult]:
agent_results = {}
with crewai_event_bus.scoped_handlers():
task_results = self.get_evaluation_results()
@@ -165,14 +170,14 @@ class AgentEvaluator:
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: dict[str, Any],
final_output: Any,
state: ExecutionState
state: ExecutionState,
task: Task | None = None,
) -> AgentEvaluationResult:
result = AgentEvaluationResult(
agent_id=state.current_agent_id or str(agent.id),
task_id=state.current_task_id or str(task.id)
task_id=state.current_task_id or (str(task.id) if task else "unknown_task")
)
assert self.evaluators is not None
@@ -190,7 +195,7 @@ class AgentEvaluator:
return result
def create_default_evaluator(crew, llm=None):
def create_default_evaluator(agents: list[Agent], llm: None = None):
from crewai.experimental.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
@@ -209,4 +214,4 @@ def create_default_evaluator(crew, llm=None):
ReasoningEfficiencyEvaluator(llm=llm),
]
return AgentEvaluator(evaluators=evaluators, crew=crew)
return AgentEvaluator(evaluators=evaluators, agents=agents)

View File

@@ -57,9 +57,9 @@ class BaseEvaluator(abc.ABC):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
pass

View File

@@ -9,7 +9,9 @@ from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent
AgentExecutionCompletedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent
)
from crewai.utilities.events.tool_usage_events import (
ToolUsageFinishedEvent,
@@ -52,10 +54,18 @@ class EvaluationTraceCallback(BaseEventListener):
def on_agent_started(source, event: AgentExecutionStartedEvent):
self.on_agent_start(event.agent, event.task)
@event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_started(source, event: LiteAgentExecutionStartedEvent):
self.on_lite_agent_start(event.agent_info)
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_completed(source, event: AgentExecutionCompletedEvent):
self.on_agent_finish(event.agent, event.task, event.output)
@event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_completed(source, event: LiteAgentExecutionCompletedEvent):
self.on_lite_agent_finish(event.output)
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_completed(source, event: ToolUsageFinishedEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.output, success=True)
@@ -88,19 +98,38 @@ class EvaluationTraceCallback(BaseEventListener):
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.on_llm_call_end(event.messages, event.response)
def on_lite_agent_start(self, agent_info: dict[str, Any]):
self.current_agent_id = agent_info['id']
self.current_task_id = "lite_task"
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
self._init_trace(
trace_key=trace_key,
agent_id=self.current_agent_id,
task_id=self.current_task_id,
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
)
def _init_trace(self, trace_key: str, **kwargs: Any):
self.traces[trace_key] = kwargs
def on_agent_start(self, agent: Agent, task: Task):
self.current_agent_id = agent.id
self.current_task_id = task.id
trace_key = f"{agent.id}_{task.id}"
self.traces[trace_key] = {
"agent_id": agent.id,
"task_id": task.id,
"tool_uses": [],
"llm_calls": [],
"start_time": datetime.now(),
"final_output": None
}
self._init_trace(
trace_key=trace_key,
agent_id=agent.id,
task_id=task.id,
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
)
def on_agent_finish(self, agent: Agent, task: Task, output: Any):
trace_key = f"{agent.id}_{task.id}"
@@ -108,9 +137,20 @@ class EvaluationTraceCallback(BaseEventListener):
self.traces[trace_key]["final_output"] = output
self.traces[trace_key]["end_time"] = datetime.now()
self._reset_current()
def _reset_current(self):
self.current_agent_id = None
self.current_task_id = None
def on_lite_agent_finish(self, output: Any):
trace_key = f"{self.current_agent_id}_lite_task"
if trace_key in self.traces:
self.traces[trace_key]["final_output"] = output
self.traces[trace_key]["end_time"] = datetime.now()
self._reset_current()
def on_tool_use(self, tool_name: str, tool_args: dict[str, Any] | str, result: Any,
success: bool = True, error_type: str | None = None):
if not self.current_agent_id or not self.current_task_id:

View File

@@ -2,7 +2,7 @@ from collections import defaultdict
from hashlib import md5
from typing import Any
from crewai import Crew
from crewai import Crew, Agent
from crewai.experimental.evaluation import AgentEvaluator, create_default_evaluator
from crewai.experimental.evaluation.experiment.result_display import ExperimentResultsDisplay
from crewai.experimental.evaluation.experiment.result import ExperimentResults, ExperimentResult
@@ -14,14 +14,18 @@ class ExperimentRunner:
self.evaluator: AgentEvaluator | None = None
self.display = ExperimentResultsDisplay()
def run(self, crew: Crew, print_summary: bool = False) -> ExperimentResults:
self.evaluator = create_default_evaluator(crew=crew)
def run(self, crew: Crew | None = None, agents: list[Agent] | None = None, print_summary: bool = False) -> ExperimentResults:
if crew and not agents:
agents = crew.agents
assert agents is not None
self.evaluator = create_default_evaluator(agents=agents)
results = []
for test_case in self.dataset:
self.evaluator.reset_iterations_results()
result = self._run_test_case(test_case, crew)
result = self._run_test_case(test_case=test_case, crew=crew, agents=agents)
results.append(result)
experiment_results = ExperimentResults(results)
@@ -31,7 +35,7 @@ class ExperimentRunner:
return experiment_results
def _run_test_case(self, test_case: dict[str, Any], crew: Crew) -> ExperimentResult:
def _run_test_case(self, test_case: dict[str, Any], agents: list[Agent], crew: Crew | None = None) -> ExperimentResult:
inputs = test_case["inputs"]
expected_score = test_case["expected_score"]
identifier = test_case.get("identifier") or md5(str(test_case).encode(), usedforsecurity=False).hexdigest()
@@ -39,7 +43,11 @@ class ExperimentRunner:
try:
self.display.console.print(f"[dim]Running crew with input: {str(inputs)[:50]}...[/dim]")
self.display.console.print("\n")
crew.kickoff(inputs=inputs)
if crew:
crew.kickoff(inputs=inputs)
else:
for agent in agents:
agent.kickoff(**inputs)
assert self.evaluator is not None
agent_evaluations = self.evaluator.get_agent_evaluation()

View File

@@ -14,10 +14,14 @@ class GoalAlignmentEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}\nExpected output: {task.expected_output}\n"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent's output aligns with its assigned task goal.
@@ -37,8 +41,7 @@ Return your evaluation as JSON with fields 'score' (number) and 'feedback' (stri
{"role": "user", "content": f"""
Agent role: {agent.role}
Agent goal: {agent.goal}
Task description: {task.description}
Expected output: {task.expected_output}
{task_context}
Agent's final output:
{final_output}

View File

@@ -36,10 +36,14 @@ class ReasoningEfficiencyEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: TaskOutput,
final_output: TaskOutput | str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}\nExpected output: {task.expected_output}\n"
llm_calls = execution_trace.get("llm_calls", [])
if not llm_calls or len(llm_calls) < 2:
@@ -83,6 +87,8 @@ class ReasoningEfficiencyEvaluator(BaseEvaluator):
call_samples = self._get_call_samples(llm_calls)
final_output = final_output.raw if isinstance(final_output, TaskOutput) else final_output
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the reasoning efficiency of an AI agent's thought process.
@@ -117,7 +123,7 @@ Return your evaluation as JSON with the following structure:
}"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
{task_context}
Reasoning efficiency metrics:
- Total LLM calls: {efficiency_metrics["total_llm_calls"]}
@@ -130,7 +136,7 @@ Sample of agent reasoning flow (chronological sequence):
{call_samples}
Agent's final output:
{final_output.raw[:500]}... (truncated)
{final_output[:500]}... (truncated)
Evaluate the reasoning efficiency of this agent based on these interaction patterns.
Identify any inefficient reasoning patterns and provide specific suggestions for optimization.

View File

@@ -14,10 +14,13 @@ class SemanticQualityEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the semantic quality of an AI agent's output.
@@ -37,7 +40,7 @@ Return your evaluation as JSON with fields 'score' (number) and 'feedback' (stri
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
{task_context}
Agent's final output:
{final_output}

View File

@@ -16,10 +16,14 @@ class ToolSelectionEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
unique_tool_types = set([tool.get("tool", "Unknown tool") for tool in tool_uses])
@@ -72,7 +76,7 @@ Return your evaluation as JSON with these fields:
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
{task_context}
Available tools for this agent:
{available_tools_info}
@@ -128,10 +132,13 @@ class ParameterExtractionEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
@@ -212,7 +219,7 @@ Return your evaluation as JSON with these fields:
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
{task_context}
Parameter extraction examples:
{param_samples_text}
@@ -267,10 +274,13 @@ class ToolInvocationEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
task: Task,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_errors = []
tool_count = len(tool_uses)
@@ -352,7 +362,7 @@ Return your evaluation as JSON with these fields:
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Task description: {task.description}
{task_context}
Tool invocation examples:
{invocation_samples_text}

View File

@@ -3,7 +3,7 @@ import inspect
from typing_extensions import Any
import warnings
from crewai.experimental.evaluation.experiment import ExperimentResults, ExperimentRunner
from crewai import Crew
from crewai import Crew, Agent
def assert_experiment_successfully(experiment_results: ExperimentResults, baseline_filepath: str | None = None) -> None:
failed_tests = [result for result in experiment_results.results if not result.passed]
@@ -35,10 +35,10 @@ def assert_experiment_no_regression(comparison_result: dict[str, list[str]]) ->
UserWarning
)
def run_experiment(dataset: list[dict[str, Any]], crew: Crew, verbose: bool = False) -> ExperimentResults:
def run_experiment(dataset: list[dict[str, Any]], crew: Crew | None = None, agents: list[Agent] | None = None, verbose: bool = False) -> ExperimentResults:
runner = ExperimentRunner(dataset=dataset)
return runner.run(crew=crew, print_summary=verbose)
return runner.run(agents=agents, crew=crew, print_summary=verbose)
def _get_baseline_filepath_fallback() -> str:
test_func_name = "experiment_fallback"