feat: Enhance event listener and telemetry tracking

- Update event listener to improve telemetry span handling
- Add execution_span field to Task for better tracing
- Modify event handling in EventListener to use new span tracking
- Remove debug print statements
- Improve test coverage for crew and flow events
- Update cassettes to reflect new event tracking behavior
This commit is contained in:
Lorenze Jay
2025-02-24 09:00:06 -08:00
parent ec050e5d33
commit 2d07c8d2e4
6 changed files with 1780 additions and 287 deletions

View File

@@ -338,7 +338,6 @@ class LLM:
# --- 5) Handle the tool call
tool_call = tool_calls[0]
function_name = tool_call.function.name
print("function_name", function_name)
if function_name in available_functions:
try:
function_args = json.loads(tool_call.function.arguments)

View File

@@ -144,6 +144,9 @@ class Task(BaseModel):
end_time: Optional[datetime.datetime] = Field(
default=None, description="End time of the task execution"
)
execution_span: Optional[Any] = Field(
default=None, description="Open Telemetry Span of the task execution"
)
@field_validator("guardrail")
@classmethod

View File

@@ -57,7 +57,7 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent):
self.logger.log(
f"🚀 Crew '{event.crew_name}' started",
f"🚀 Crew '{event.crew_name}' started, {source.id}",
event.timestamp,
)
self._telemetry.crew_execution_span(source, event.inputs)
@@ -67,28 +67,28 @@ class EventListener(BaseEventListener):
final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output)
self.logger.log(
f"✅ Crew '{event.crew_name}' completed",
f"✅ Crew '{event.crew_name}' completed, {source.id}",
event.timestamp,
)
@crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent):
self.logger.log(
f"❌ Crew '{event.crew_name}' failed",
f"❌ Crew '{event.crew_name}' failed, {source.id}",
event.timestamp,
)
@crewai_event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent):
cloned_crew = source.copy()
cloned_crew._telemetry.test_execution_span(
self._telemetry.test_execution_span(
cloned_crew,
event.n_iterations,
event.inputs,
event.eval_llm,
event.eval_llm or "",
)
self.logger.log(
f"🚀 Crew '{event.crew_name}' started test",
f"🚀 Crew '{event.crew_name}' started test, {source.id}",
event.timestamp,
)
@@ -131,9 +131,8 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent):
source._execution_span = self._telemetry.task_started(
crew=source.agent.crew, task=source
)
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
source.execution_span = span
self.logger.log(
f"📋 Task started: {source.description}",
event.timestamp,
@@ -141,24 +140,24 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent):
if source._execution_span:
if source.execution_span:
self._telemetry.task_ended(
source._execution_span, source, source.agent.crew
source.execution_span, source, source.agent.crew
)
self.logger.log(
f"✅ Task completed: {source.description}",
event.timestamp,
)
source._execution_span = None
source.execution_span = None
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent):
if source._execution_span:
if source.execution_span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(
source._execution_span, source, source.agent.crew
source.execution_span, source, source.agent.crew
)
source._execution_span = None
source.execution_span = None
self.logger.log(
f"❌ Task failed: {source.description}",
event.timestamp,
@@ -184,7 +183,7 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent):
self._telemetry.flow_creation_span(self.__class__.__name__)
self._telemetry.flow_creation_span(event.flow_name)
self.logger.log(
f"🌊 Flow Created: '{event.flow_name}'",
event.timestamp,
@@ -193,17 +192,17 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent):
self._telemetry.flow_execution_span(
source.__class__.__name__, list(source._methods.keys())
event.flow_name, list(source._methods.keys())
)
self.logger.log(
f"🤖 Flow Started: '{event.flow_name}'",
f"🤖 Flow Started: '{event.flow_name}', {source.flow_id}",
event.timestamp,
)
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent):
self.logger.log(
f"👍 Flow Finished: '{event.flow_name}'",
f"👍 Flow Finished: '{event.flow_name}', {source.flow_id}",
event.timestamp,
)

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,236 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are base_agent. You are
a helpful assistant that just says hi\nYour personal goal is: Just say hi\nTo
give my best complete final answer to the task respond using the exact following
format:\n\nThought: I now can give a great answer\nFinal Answer: Your final
answer must be the great and the most complete as possible, it must be outcome
described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user",
"content": "\nCurrent Task: Just say hi\n\nThis is the expected criteria for
your final answer: hi\nyou MUST return the actual complete content as the final
answer, not a summary.\n\nBegin! This is VERY important to you, use the tools
available and give your best Final Answer, your job depends on it!\n\nThought:"}],
"model": "gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '838'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.8
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-B4VsaBZ4ec4b0ab4pkqWgyxTFVVfc\",\n \"object\":
\"chat.completion\",\n \"created\": 1740415556,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: hi\",\n \"refusal\": null\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
161,\n \"completion_tokens\": 12,\n \"total_tokens\": 173,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_7fcd609668\"\n}\n"
headers:
CF-RAY:
- 9170edc5da6f230e-SJC
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Mon, 24 Feb 2025 16:45:57 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=lvRw4Nyef7N35to64fj2_kHDfbZp0KSFbwgF5chYMRI-1740415557-1.0.1.1-o5BaN1FpBwv5Wq6zIlv0rCB28lk5hVI9wZQWU3pig1jgyAKDkYzTwZ0MlSR6v6TPIX9RfepjrO3.Gk3FEmcVRw;
path=/; expires=Mon, 24-Feb-25 17:15:57 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=ySaVoTQvAcQyH5QoJQJDj75e5j8HwGFPOlFMAWEvXJk-1740415557302-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '721'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999808'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_fc3b3bcd4382cddaa3c04ce7003e4857
http_version: HTTP/1.1
status_code: 200
- request:
body: '{"messages": [{"role": "system", "content": "You are Task Execution Evaluator.
Evaluator agent for crew evaluation with precise capabilities to evaluate the
performance of the agents in the crew based on the tasks they have performed\nYour
personal goal is: Your goal is to evaluate the performance of the agents in
the crew based on the tasks they have performed using score from 1 to 10 evaluating
on completion, quality, and overall performance.\nTo give my best complete final
answer to the task respond using the exact following format:\n\nThought: I now
can give a great answer\nFinal Answer: Your final answer must be the great and
the most complete as possible, it must be outcome described.\n\nI MUST use these
formats, my job depends on it!"}, {"role": "user", "content": "\nCurrent Task:
Based on the task description and the expected output, compare and evaluate
the performance of the agents in the crew based on the Task Output they have
performed using score from 1 to 10 evaluating on completion, quality, and overall
performance.task_description: Just say hi task_expected_output: hi agent: base_agent
agent_goal: Just say hi Task Output: hi\n\nThis is the expected criteria for
your final answer: Evaluation Score from 1 to 10 based on the performance of
the agents on the tasks\nyou MUST return the actual complete content as the
final answer, not a summary.\nEnsure your final answer contains only the content
in the following format: {\n \"quality\": float\n}\n\nEnsure the final output
does not include any code block markers like ```json or ```python.\n\nBegin!
This is VERY important to you, use the tools available and give your best Final
Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop":
["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '1765'
content-type:
- application/json
cookie:
- __cf_bm=lvRw4Nyef7N35to64fj2_kHDfbZp0KSFbwgF5chYMRI-1740415557-1.0.1.1-o5BaN1FpBwv5Wq6zIlv0rCB28lk5hVI9wZQWU3pig1jgyAKDkYzTwZ0MlSR6v6TPIX9RfepjrO3.Gk3FEmcVRw;
_cfuvid=ySaVoTQvAcQyH5QoJQJDj75e5j8HwGFPOlFMAWEvXJk-1740415557302-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.8
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-B4Vsbd9AsRaJ2exDtWnHAwC8rIjfi\",\n \"object\":
\"chat.completion\",\n \"created\": 1740415557,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: { \\n \\\"quality\\\": 10 \\n} \",\n \"refusal\": null\n
\ },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n
\ ],\n \"usage\": {\n \"prompt_tokens\": 338,\n \"completion_tokens\":
22,\n \"total_tokens\": 360,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n
\ \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_7fcd609668\"\n}\n"
headers:
CF-RAY:
- 9170edd15bb5230e-SJC
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Mon, 24 Feb 2025 16:45:58 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '860'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999578'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_fad452c2d10b5fc95809130912b08837
http_version: HTTP/1.1
status_code: 200
version: 1

View File

@@ -1,6 +1,5 @@
import json
from datetime import datetime
from unittest.mock import MagicMock, patch
from unittest.mock import Mock, patch
import pytest
from pydantic import Field
@@ -9,9 +8,9 @@ from crewai.agent import Agent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.crew import Crew
from crewai.flow.flow import Flow, listen, start
from crewai.llm import LLM
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.tools.tool_usage import ToolUsage
from crewai.utilities.events.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
@@ -21,8 +20,11 @@ from crewai.utilities.events.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
CrewTestCompletedEvent,
CrewTestStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.event_listener import EventListener
from crewai.utilities.events.event_types import ToolUsageFinishedEvent
from crewai.utilities.events.flow_events import (
FlowCreatedEvent,
@@ -52,26 +54,35 @@ base_task = Task(
expected_output="hi",
agent=base_agent,
)
event_listener = EventListener()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_emits_start_kickoff_event():
received_events = []
mock_span = Mock()
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStartedEvent)
def handle_crew_start(source, event):
received_events.append(event)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
@crewai_event_bus.on(CrewKickoffStartedEvent)
def handle_crew_start(source, event):
received_events.append(event)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
with (
patch.object(
event_listener._telemetry, "crew_execution_span", return_value=mock_span
) as mock_crew_execution_span,
patch.object(
event_listener._telemetry, "end_crew", return_value=mock_span
) as mock_crew_ended,
):
crew.kickoff()
mock_crew_execution_span.assert_called_once_with(crew, None)
mock_crew_ended.assert_called_once_with(crew, "hi")
assert len(received_events) == 1
assert received_events[0].crew_name == "TestCrew"
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_kickoff_started"
assert len(received_events) == 1
assert received_events[0].crew_name == "TestCrew"
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_kickoff_started"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -92,6 +103,45 @@ def test_crew_emits_end_kickoff_event():
assert received_events[0].type == "crew_kickoff_completed"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_emits_test_kickoff_type_event():
received_events = []
mock_span = Mock()
@crewai_event_bus.on(CrewTestStartedEvent)
def handle_crew_end(source, event):
received_events.append(event)
@crewai_event_bus.on(CrewTestCompletedEvent)
def handle_crew_test_end(source, event):
received_events.append(event)
eval_llm = LLM(model="gpt-4o-mini")
with (
patch.object(
event_listener._telemetry, "test_execution_span", return_value=mock_span
) as mock_crew_execution_span,
):
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.test(n_iterations=1, eval_llm=eval_llm)
# Verify the call was made with correct argument types and values
assert mock_crew_execution_span.call_count == 1
args = mock_crew_execution_span.call_args[0]
assert isinstance(args[0], Crew)
assert args[1] == 1
assert args[2] is None
assert args[3] == eval_llm
assert len(received_events) == 2
assert received_events[0].crew_name == "TestCrew"
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_test_started"
assert received_events[1].crew_name == "TestCrew"
assert isinstance(received_events[1].timestamp, datetime)
assert received_events[1].type == "crew_test_completed"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_emits_kickoff_failed_event():
received_events = []
@@ -142,9 +192,20 @@ def test_crew_emits_end_task_event():
def handle_task_end(source, event):
received_events.append(event)
mock_span = Mock()
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
with (
patch.object(
event_listener._telemetry, "task_started", return_value=mock_span
) as mock_task_started,
patch.object(
event_listener._telemetry, "task_ended", return_value=mock_span
) as mock_task_ended,
):
crew.kickoff()
crew.kickoff()
mock_task_started.assert_called_once_with(crew=crew, task=base_task)
mock_task_ended.assert_called_once_with(mock_span, base_task, crew)
assert len(received_events) == 1
assert isinstance(received_events[0].timestamp, datetime)
@@ -334,24 +395,29 @@ def test_tools_emits_error_events():
def test_flow_emits_start_event():
received_events = []
mock_span = Mock()
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event):
received_events.append(event)
@crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event):
received_events.append(event)
class TestFlow(Flow[dict]):
@start()
def begin(self):
return "started"
class TestFlow(Flow[dict]):
@start()
def begin(self):
return "started"
with (
patch.object(
event_listener._telemetry, "flow_execution_span", return_value=mock_span
) as mock_flow_execution_span,
):
flow = TestFlow()
flow.kickoff()
assert len(received_events) == 1
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "flow_started"
mock_flow_execution_span.assert_called_once_with("TestFlow", ["begin"])
assert len(received_events) == 1
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "flow_started"
def test_flow_emits_finish_event():
@@ -455,6 +521,7 @@ def test_multiple_handlers_for_same_event():
def test_flow_emits_created_event():
received_events = []
mock_span = Mock()
@crewai_event_bus.on(FlowCreatedEvent)
def handle_flow_created(source, event):
@@ -465,8 +532,15 @@ def test_flow_emits_created_event():
def begin(self):
return "started"
flow = TestFlow()
flow.kickoff()
with (
patch.object(
event_listener._telemetry, "flow_creation_span", return_value=mock_span
) as mock_flow_creation_span,
):
flow = TestFlow()
flow.kickoff()
mock_flow_creation_span.assert_called_once_with("TestFlow")
assert len(received_events) == 1
assert received_events[0].flow_name == "TestFlow"