Decoupling telemetry and ensure tests (#2212)

* feat: Enhance event listener and telemetry tracking

- Update event listener to improve telemetry span handling
- Add execution_span field to Task for better tracing
- Modify event handling in EventListener to use new span tracking
- Remove debug print statements
- Improve test coverage for crew and flow events
- Update cassettes to reflect new event tracking behavior

* Remove telemetry references from Crew class

- Remove Telemetry import and initialization from Crew class
- Delete _telemetry attribute from class configuration
- Clean up unused telemetry-related code

* test: Improve crew verbose output test with event log filtering

- Filter out event listener logs in verbose output test
- Ensure no output when verbose is set to False
- Enhance test coverage for crew logging behavior

* dropped comment

* refactor: Improve telemetry span tracking in EventListener

- Remove `execution_span` from Task class
- Add `execution_spans` dictionary to EventListener to track spans
- Update task event handlers to use new span tracking mechanism
- Simplify span management across task lifecycle events

* lint
This commit is contained in:
Lorenze Jay
2025-02-24 12:24:35 -08:00
committed by GitHub
parent c62fb615b1
commit 5235442a5b
6 changed files with 1798 additions and 309 deletions

View File

@@ -35,7 +35,6 @@ from crewai.process import Process
from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import Tool
from crewai.traces.unified_trace_controller import init_crew_main_trace
@@ -258,8 +257,6 @@ class Crew(BaseModel):
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = create_llm(self.function_calling_llm)
self._telemetry = Telemetry()
self._telemetry.set_tracer()
return self
@model_validator(mode="after")
@@ -1115,7 +1112,6 @@ class Crew(BaseModel):
"_short_term_memory",
"_long_term_memory",
"_entity_memory",
"_telemetry",
"agents",
"tasks",
"knowledge_sources",

View File

@@ -1,5 +1,8 @@
from pydantic import PrivateAttr
from typing import Any, Dict
from pydantic import Field, PrivateAttr
from crewai.task import Task
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import Logger
from crewai.utilities.constants import EMITTER_COLOR
@@ -42,6 +45,7 @@ class EventListener(BaseEventListener):
_instance = None
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
logger = Logger(verbose=True, default_color=EMITTER_COLOR)
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
def __new__(cls):
if cls._instance is None:
@@ -54,6 +58,7 @@ class EventListener(BaseEventListener):
super().__init__()
self._telemetry = Telemetry()
self._telemetry.set_tracer()
self.execution_spans = {}
self._initialized = True
# ----------- CREW EVENTS -----------
@@ -62,7 +67,7 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent):
self.logger.log(
f"🚀 Crew '{event.crew_name}' started",
f"🚀 Crew '{event.crew_name}' started, {source.id}",
event.timestamp,
)
self._telemetry.crew_execution_span(source, event.inputs)
@@ -72,28 +77,28 @@ class EventListener(BaseEventListener):
final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output)
self.logger.log(
f"✅ Crew '{event.crew_name}' completed",
f"✅ Crew '{event.crew_name}' completed, {source.id}",
event.timestamp,
)
@crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent):
self.logger.log(
f"❌ Crew '{event.crew_name}' failed",
f"❌ Crew '{event.crew_name}' failed, {source.id}",
event.timestamp,
)
@crewai_event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent):
cloned_crew = source.copy()
cloned_crew._telemetry.test_execution_span(
self._telemetry.test_execution_span(
cloned_crew,
event.n_iterations,
event.inputs,
event.eval_llm,
event.eval_llm or "",
)
self.logger.log(
f"🚀 Crew '{event.crew_name}' started test",
f"🚀 Crew '{event.crew_name}' started test, {source.id}",
event.timestamp,
)
@@ -136,9 +141,9 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent):
source._execution_span = self._telemetry.task_started(
crew=source.agent.crew, task=source
)
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
self.execution_spans[source] = span
self.logger.log(
f"📋 Task started: {source.description}",
event.timestamp,
@@ -146,24 +151,22 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent):
if source._execution_span:
self._telemetry.task_ended(
source._execution_span, source, source.agent.crew
)
span = self.execution_spans.get(source)
if span:
self._telemetry.task_ended(span, source, source.agent.crew)
self.logger.log(
f"✅ Task completed: {source.description}",
event.timestamp,
)
source._execution_span = None
self.execution_spans[source] = None
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent):
if source._execution_span:
span = self.execution_spans.get(source)
if span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(
source._execution_span, source, source.agent.crew
)
source._execution_span = None
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
self.logger.log(
f"❌ Task failed: {source.description}",
event.timestamp,
@@ -189,7 +192,7 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent):
self._telemetry.flow_creation_span(self.__class__.__name__)
self._telemetry.flow_creation_span(event.flow_name)
self.logger.log(
f"🌊 Flow Created: '{event.flow_name}'",
event.timestamp,
@@ -198,17 +201,17 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent):
self._telemetry.flow_execution_span(
source.__class__.__name__, list(source._methods.keys())
event.flow_name, list(source._methods.keys())
)
self.logger.log(
f"🤖 Flow Started: '{event.flow_name}'",
f"🤖 Flow Started: '{event.flow_name}', {source.flow_id}",
event.timestamp,
)
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent):
self.logger.log(
f"👍 Flow Finished: '{event.flow_name}'",
f"👍 Flow Finished: '{event.flow_name}', {source.flow_id}",
event.timestamp,
)