Emit events about Agent eval (#3168)

* feat: emit events abou Agent Eval

We are triggering events when an evaluation has started/completed/failed

* style: fix type checking issues
This commit is contained in:
Lucas Gomide
2025-07-16 14:18:59 -03:00
committed by GitHub
parent 6ebb6c9b63
commit 9b67e5a15f
6 changed files with 339 additions and 39 deletions

View File

@@ -1,23 +1,24 @@
import threading
from typing import Any
from crewai.experimental.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy from crewai.experimental.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy
from crewai.agent import Agent from crewai.agent import Agent
from crewai.task import Task from crewai.task import Task
from crewai.experimental.evaluation.evaluation_display import EvaluationDisplayFormatter from crewai.experimental.evaluation.evaluation_display import EvaluationDisplayFormatter
from crewai.utilities.events.agent_events import AgentEvaluationStartedEvent, AgentEvaluationCompletedEvent, AgentEvaluationFailedEvent
from typing import Any
from crewai.experimental.evaluation import BaseEvaluator, create_evaluation_callbacks from crewai.experimental.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence from collections.abc import Sequence
from crewai.utilities.events.crewai_event_bus import crewai_event_bus from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.events.task_events import TaskCompletedEvent from crewai.utilities.events.task_events import TaskCompletedEvent
from crewai.utilities.events.agent_events import LiteAgentExecutionCompletedEvent from crewai.utilities.events.agent_events import LiteAgentExecutionCompletedEvent
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult, EvaluationScore, MetricCategory
import threading
class ExecutionState: class ExecutionState:
def __init__(self): def __init__(self):
self.traces = {} self.traces = {}
self.current_agent_id = None self.current_agent_id: str | None = None
self.current_task_id = None self.current_task_id: str | None = None
self.iteration = 1 self.iteration = 1
self.iterations_results = {} self.iterations_results = {}
self.agent_evaluators = {} self.agent_evaluators = {}
@@ -49,17 +50,21 @@ class AgentEvaluator:
return self._thread_local.execution_state return self._thread_local.execution_state
def _subscribe_to_events(self) -> None: def _subscribe_to_events(self) -> None:
crewai_event_bus.register_handler(TaskCompletedEvent, self._handle_task_completed) from typing import cast
crewai_event_bus.register_handler(LiteAgentExecutionCompletedEvent, self._handle_lite_agent_completed) crewai_event_bus.register_handler(TaskCompletedEvent, cast(Any, self._handle_task_completed))
crewai_event_bus.register_handler(LiteAgentExecutionCompletedEvent, cast(Any, self._handle_lite_agent_completed))
def _handle_task_completed(self, source: Any, event: TaskCompletedEvent) -> None: def _handle_task_completed(self, source: Any, event: TaskCompletedEvent) -> None:
assert event.task is not None assert event.task is not None
agent = event.task.agent agent = event.task.agent
if agent and str(getattr(agent, 'id', 'unknown')) in self._execution_state.agent_evaluators: if agent and str(getattr(agent, 'id', 'unknown')) in self._execution_state.agent_evaluators:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=str(event.task.id))
state = ExecutionState() state = ExecutionState()
state.current_agent_id = str(agent.id) state.current_agent_id = str(agent.id)
state.current_task_id = str(event.task.id) state.current_task_id = str(event.task.id)
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id) trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
if not trace: if not trace:
@@ -100,6 +105,7 @@ class AgentEvaluator:
if not target_agent: if not target_agent:
return return
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id) trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
if not trace: if not trace:
@@ -181,8 +187,10 @@ class AgentEvaluator:
) )
assert self.evaluators is not None assert self.evaluators is not None
task_id = str(task.id) if task else None
for evaluator in self.evaluators: for evaluator in self.evaluators:
try: try:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id)
score = evaluator.evaluate( score = evaluator.evaluate(
agent=agent, agent=agent,
task=task, task=task,
@@ -190,11 +198,31 @@ class AgentEvaluator:
final_output=final_output final_output=final_output
) )
result.metrics[evaluator.metric_category] = score result.metrics[evaluator.metric_category] = score
self.emit_evaluation_completed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, metric_category=evaluator.metric_category, score=score)
except Exception as e: except Exception as e:
self.emit_evaluation_failed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, error=str(e))
self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}") self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}")
return result return result
def emit_evaluation_started_event(self, agent_role: str, agent_id: str, task_id: str | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationStartedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration)
)
def emit_evaluation_completed_event(self, agent_role: str, agent_id: str, task_id: str | None = None, metric_category: MetricCategory | None = None, score: EvaluationScore | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationCompletedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, metric_category=metric_category, score=score)
)
def emit_evaluation_failed_event(self, agent_role: str, agent_id: str, error: str, task_id: str | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationFailedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, error=error)
)
def create_default_evaluator(agents: list[Agent], llm: None = None): def create_default_evaluator(agents: list[Agent], llm: None = None):
from crewai.experimental.evaluation import ( from crewai.experimental.evaluation import (
GoalAlignmentEvaluator, GoalAlignmentEvaluator,

View File

@@ -227,4 +227,8 @@ class EvaluationTraceCallback(BaseEventListener):
def create_evaluation_callbacks() -> EvaluationTraceCallback: def create_evaluation_callbacks() -> EvaluationTraceCallback:
return EvaluationTraceCallback() from crewai.utilities.events.crewai_event_bus import crewai_event_bus
callback = EvaluationTraceCallback()
callback.setup_listeners(crewai_event_bus)
return callback

View File

@@ -17,6 +17,9 @@ from .agent_events import (
AgentExecutionStartedEvent, AgentExecutionStartedEvent,
AgentExecutionCompletedEvent, AgentExecutionCompletedEvent,
AgentExecutionErrorEvent, AgentExecutionErrorEvent,
AgentEvaluationStartedEvent,
AgentEvaluationCompletedEvent,
AgentEvaluationFailedEvent,
) )
from .task_events import ( from .task_events import (
TaskStartedEvent, TaskStartedEvent,
@@ -74,6 +77,9 @@ __all__ = [
"AgentExecutionStartedEvent", "AgentExecutionStartedEvent",
"AgentExecutionCompletedEvent", "AgentExecutionCompletedEvent",
"AgentExecutionErrorEvent", "AgentExecutionErrorEvent",
"AgentEvaluationStartedEvent",
"AgentEvaluationCompletedEvent",
"AgentEvaluationFailedEvent",
"TaskStartedEvent", "TaskStartedEvent",
"TaskCompletedEvent", "TaskCompletedEvent",
"TaskFailedEvent", "TaskFailedEvent",

View File

@@ -123,3 +123,28 @@ class AgentLogsExecutionEvent(BaseEvent):
type: str = "agent_logs_execution" type: str = "agent_logs_execution"
model_config = {"arbitrary_types_allowed": True} model_config = {"arbitrary_types_allowed": True}
# Agent Eval events
class AgentEvaluationStartedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
type: str = "agent_evaluation_started"
class AgentEvaluationCompletedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
metric_category: Any
score: Any
type: str = "agent_evaluation_completed"
class AgentEvaluationFailedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
error: str
type: str = "agent_evaluation_failed"

View File

@@ -0,0 +1,123 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Test Agent. An agent
created for testing purposes\nYour personal goal is: Complete test tasks successfully\nTo
give my best complete final answer to the task respond using the exact following
format:\n\nThought: I now can give a great answer\nFinal Answer: Your final
answer must be the great and the most complete as possible, it must be outcome
described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user",
"content": "\nCurrent Task: Test task description\n\nThis is the expected criteria
for your final answer: Expected test output\nyou MUST return the actual complete
content as the final answer, not a summary.\n\nBegin! This is VERY important
to you, use the tools available and give your best Final Answer, your job depends
on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '879'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFTBbhtHDL3rK4g5rwRbtaNYt9RoEaNoUaBODm0DgZnh7jKe5WyHXDmO
4X8vZiRLcupDLwvsPPLxPQ45jzMAx8GtwfkezQ9jnP9oeLv98N5+vfl9+4v89Mf76+XV7XDz8Yc/
r39T15SM9PkLeXvOWvg0jJGMk+xgnwmNCuv56nJ5+XZ1tbqswJACxZLWjTa/SPOBhefLs+XF/Gw1
P3+7z+4Te1K3hr9mAACP9Vt0SqCvbg1nzfPJQKrYkVsfggBcTrGcOFRlNRRzzRH0SYykSr8BSffg
UaDjLQFCV2QDit5TBvhbfmbBCO/q/xpue1ZgBesJ6OtI3iiAkRqkycbJGrjv2ffgk5S6CqkFhECG
HClAIPWZx9Kkgtz3aJVq37vChXoH2qcpBogp3UHkO1rAbU/QViW7Os8hLD5OgQBjBCFfOpEfgKVN
ecBSpoFAQxK1jMbSgY+Y2R6aWjJTT6K8JSHVBlACYOgpk3gCS4DyADqS55YpQDdxoMhCuoCbgwKf
tpSB0PeAJdaKseKpOsn0z8SZBhJrgESnXERY8S0JRsxWulkoilkKkDJ0JJQx8jcKi13DX3pWyuWm
FPDQN8jU7mW3KRfdSaj2r5ZLMEmgXOYg7K5OlcQYI1Cs4vSFavSVmLWnsDgdnEztpFiGV6YYTwAU
SVYbXkf20x55OgxpTN2Y02f9LtW1LKz9JhNqkjKQaml0FX2aAXyqyzC9mG835jSMtrF0R7Xc+Zvz
HZ877uARvXqzBy0ZxuP58nLVvMK32Q2rnqyT8+h7CsfU4+7hFDidALMT1/9V8xr3zjlL93/oj4D3
NBqFzZgpsH/p+BiW6Utd0dfDDl2ugl2ZK/a0MaZcbiJQi1PcPRxOH9Ro2LQsHeUxc309yk3Onmb/
AgAA//8DAAbYfvVABQAA
headers:
CF-RAY:
- 95f9c7ffa8331b11-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 15 Jul 2025 13:59:38 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=J_xe1AP.B5P6D2GVMCesyioeS5E9DnYT34rbwQUefFc-1752587978-1.0.1.1-5Dflk5cAj6YCsOSVbCFWWSpXpw_mXsczIdzWzs2h2OwDL01HQbduE5LAToy67sfjFjHeeO4xRrqPLUQpySy2QqyHXbI_fzX4UAt3.UdwHxU;
path=/; expires=Tue, 15-Jul-25 14:29:38 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=0rTD8RMpxBQQy42jzmum16_eoRtWNfaZMG_TJkhGS7I-1752587978437-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '2623'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '2626'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999813'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_ccc347e91010713379c920aa0efd1f4f
status:
code: 200
message: OK
version: 1

View File

@@ -11,9 +11,13 @@ from crewai.experimental.evaluation import (
ToolSelectionEvaluator, ToolSelectionEvaluator,
ParameterExtractionEvaluator, ParameterExtractionEvaluator,
ToolInvocationEvaluator, ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator ReasoningEfficiencyEvaluator,
MetricCategory,
EvaluationScore
) )
from crewai.utilities.events.agent_events import AgentEvaluationStartedEvent, AgentEvaluationCompletedEvent, AgentEvaluationFailedEvent
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.experimental.evaluation import create_default_evaluator from crewai.experimental.evaluation import create_default_evaluator
class TestAgentEvaluator: class TestAgentEvaluator:
@@ -102,28 +106,57 @@ class TestAgentEvaluator:
goal="Complete test tasks successfully", goal="Complete test tasks successfully",
backstory="An agent created for testing purposes", backstory="An agent created for testing purposes",
) )
agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[GoalAlignmentEvaluator()])
agent.kickoff(messages="Complete this task successfully") with crewai_event_bus.scoped_handlers():
events = {}
@crewai_event_bus.on(AgentEvaluationStartedEvent)
def capture_started(source, event):
events["started"] = event
results = agent_evaluator.get_evaluation_results() @crewai_event_bus.on(AgentEvaluationCompletedEvent)
def capture_completed(source, event):
events["completed"] = event
assert isinstance(results, dict) @crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
events["failed"] = event
result, = results[agent.role] agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[GoalAlignmentEvaluator()])
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id) agent.kickoff(messages="Complete this task successfully")
assert result.task_id == "lite_task"
goal_alignment, = result.metrics.values() assert events.keys() == {"started", "completed"}
assert goal_alignment.score == 2.0 assert events["started"].agent_id == str(agent.id)
assert events["started"].agent_role == agent.role
assert events["started"].task_id is None
assert events["started"].iteration == 1
expected_feedback = "The agent did not demonstrate a clear understanding of the task goal, which is to complete test tasks successfully" assert events["completed"].agent_id == str(agent.id)
assert expected_feedback in goal_alignment.feedback assert events["completed"].agent_role == agent.role
assert events["completed"].task_id is None
assert events["completed"].iteration == 1
assert events["completed"].metric_category == MetricCategory.GOAL_ALIGNMENT
assert isinstance(events["completed"].score, EvaluationScore)
assert events["completed"].score.score == 2.0
assert goal_alignment.raw_response is not None results = agent_evaluator.get_evaluation_results()
assert '"score": 2' in goal_alignment.raw_response
assert isinstance(results, dict)
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == "lite_task"
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 2.0
expected_feedback = "The agent did not demonstrate a clear understanding of the task goal, which is to complete test tasks successfully"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 2' in goal_alignment.raw_response
@pytest.mark.vcr(filter_headers=["authorization"]) @pytest.mark.vcr(filter_headers=["authorization"])
def test_eval_specific_agents_from_crew(self, mock_crew): def test_eval_specific_agents_from_crew(self, mock_crew):
@@ -140,25 +173,106 @@ class TestAgentEvaluator:
mock_crew.agents.append(agent) mock_crew.agents.append(agent)
mock_crew.tasks.append(task) mock_crew.tasks.append(task)
agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[GoalAlignmentEvaluator()]) with crewai_event_bus.scoped_handlers():
events = {}
@crewai_event_bus.on(AgentEvaluationStartedEvent)
def capture_started(source, event):
events["started"] = event
mock_crew.kickoff() @crewai_event_bus.on(AgentEvaluationCompletedEvent)
def capture_completed(source, event):
events["completed"] = event
results = agent_evaluator.get_evaluation_results() @crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
events["failed"] = event
assert isinstance(results, dict) agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[GoalAlignmentEvaluator()])
assert len(results.keys()) == 1 mock_crew.kickoff()
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id) assert events.keys() == {"started", "completed"}
assert result.task_id == str(task.id) assert events["started"].agent_id == str(agent.id)
assert events["started"].agent_role == agent.role
assert events["started"].task_id == str(task.id)
assert events["started"].iteration == 1
goal_alignment, = result.metrics.values() assert events["completed"].agent_id == str(agent.id)
assert goal_alignment.score == 5.0 assert events["completed"].agent_role == agent.role
assert events["completed"].task_id == str(task.id)
assert events["completed"].iteration == 1
assert events["completed"].metric_category == MetricCategory.GOAL_ALIGNMENT
assert isinstance(events["completed"].score, EvaluationScore)
assert events["completed"].score.score == 5.0
expected_feedback = "The agent provided a thorough guide on how to conduct a test task but failed to produce specific expected output" results = agent_evaluator.get_evaluation_results()
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None assert isinstance(results, dict)
assert '"score": 5' in goal_alignment.raw_response assert len(results.keys()) == 1
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == str(task.id)
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 5.0
expected_feedback = "The agent provided a thorough guide on how to conduct a test task but failed to produce specific expected output"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 5' in goal_alignment.raw_response
@pytest.mark.vcr(filter_headers=["authorization"])
def test_failed_evaluation(self, mock_crew):
agent, = mock_crew.agents
task, = mock_crew.tasks
with crewai_event_bus.scoped_handlers():
events = {}
@crewai_event_bus.on(AgentEvaluationStartedEvent)
def capture_started(source, event):
events["started"] = event
@crewai_event_bus.on(AgentEvaluationCompletedEvent)
def capture_completed(source, event):
events["completed"] = event
@crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
events["failed"] = event
# Create a mock evaluator that will raise an exception
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator
from crewai.experimental.evaluation import MetricCategory
class FailingEvaluator(BaseEvaluator):
metric_category = MetricCategory.GOAL_ALIGNMENT
def evaluate(self, agent, task, execution_trace, final_output):
raise ValueError("Forced evaluation failure")
agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[FailingEvaluator()])
mock_crew.kickoff()
assert events.keys() == {"started", "failed"}
assert events["started"].agent_id == str(agent.id)
assert events["started"].agent_role == agent.role
assert events["started"].task_id == str(task.id)
assert events["started"].iteration == 1
assert events["failed"].agent_id == str(agent.id)
assert events["failed"].agent_role == agent.role
assert events["failed"].task_id == str(task.id)
assert events["failed"].iteration == 1
assert events["failed"].error == "Forced evaluation failure"
results = agent_evaluator.get_evaluation_results()
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == str(task.id)
assert result.metrics == {}