refactor: Improve telemetry span tracking in EventListener

- Remove `execution_span` from Task class
- Add `execution_spans` dictionary to EventListener to track spans
- Update task event handlers to use new span tracking mechanism
- Simplify span management across task lifecycle events
This commit is contained in:
Lorenze Jay
2025-02-24 12:17:56 -08:00
parent 84c809eee2
commit 7460906712
2 changed files with 15 additions and 15 deletions

View File

@@ -144,9 +144,6 @@ class Task(BaseModel):
end_time: Optional[datetime.datetime] = Field(
default=None, description="End time of the task execution"
)
execution_span: Optional[Any] = Field(
default=None, description="Open Telemetry Span of the task execution"
)
@field_validator("guardrail")
@classmethod

View File

@@ -1,5 +1,7 @@
from pydantic import PrivateAttr
from typing import Any, Dict
from pydantic import Field, PrivateAttr
from crewai.task import Task
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import Logger
from crewai.utilities.constants import EMITTER_COLOR
@@ -37,6 +39,7 @@ class EventListener(BaseEventListener):
_instance = None
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
logger = Logger(verbose=True, default_color=EMITTER_COLOR)
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
def __new__(cls):
if cls._instance is None:
@@ -49,6 +52,7 @@ class EventListener(BaseEventListener):
super().__init__()
self._telemetry = Telemetry()
self._telemetry.set_tracer()
self.execution_spans = {}
self._initialized = True
# ----------- CREW EVENTS -----------
@@ -132,7 +136,8 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent):
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
source.execution_span = span
self.execution_spans[source] = span
self.logger.log(
f"📋 Task started: {source.description}",
event.timestamp,
@@ -140,24 +145,22 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent):
if source.execution_span:
self._telemetry.task_ended(
source.execution_span, source, source.agent.crew
)
span = self.execution_spans.get(source)
if span:
self._telemetry.task_ended(span, source, source.agent.crew)
self.logger.log(
f"✅ Task completed: {source.description}",
event.timestamp,
)
source.execution_span = None
self.execution_spans[source] = None
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent):
if source.execution_span:
span = self.execution_spans.get(source)
if span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(
source.execution_span, source, source.agent.crew
)
source.execution_span = None
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
self.logger.log(
f"❌ Task failed: {source.description}",
event.timestamp,