refactor: improve tracing listener typing and sorting

This commit is contained in:
Greyson LaLonde
2026-01-20 01:07:56 -05:00
parent f7e1bdb64e
commit ae253b4156
2 changed files with 22 additions and 22 deletions

View File

@@ -267,9 +267,12 @@ class TraceBatchManager:
sorted_events = sorted( sorted_events = sorted(
self.event_buffer, self.event_buffer,
key=lambda e: e.timestamp key=lambda e: (
if hasattr(e, "timestamp") and e.timestamp e.emission_sequence
else "", if e.emission_sequence is not None
else float("inf"),
e.timestamp if hasattr(e, "timestamp") and e.timestamp else "",
),
) )
self.current_batch.events = sorted_events self.current_batch.events = sorted_events

View File

@@ -9,6 +9,7 @@ from typing_extensions import Self
from crewai.cli.authentication.token import AuthError, get_auth_token from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version from crewai.cli.version import get_crewai_version
from crewai.events.base_event_listener import BaseEventListener from crewai.events.base_event_listener import BaseEventListener
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import CrewAIEventsBus from crewai.events.event_bus import CrewAIEventsBus
from crewai.events.listeners.tracing.first_time_trace_handler import ( from crewai.events.listeners.tracing.first_time_trace_handler import (
FirstTimeTraceHandler, FirstTimeTraceHandler,
@@ -616,7 +617,7 @@ class TraceCollectionListener(BaseEventListener):
if self.batch_manager.is_batch_initialized(): if self.batch_manager.is_batch_initialized():
self.batch_manager.finalize_batch() self.batch_manager.finalize_batch()
def _initialize_crew_batch(self, source: Any, event: Any) -> None: def _initialize_crew_batch(self, source: Any, event: BaseEvent) -> None:
"""Initialize trace batch. """Initialize trace batch.
Args: Args:
@@ -626,7 +627,7 @@ class TraceCollectionListener(BaseEventListener):
user_context = self._get_user_context() user_context = self._get_user_context()
execution_metadata = { execution_metadata = {
"crew_name": getattr(event, "crew_name", "Unknown Crew"), "crew_name": getattr(event, "crew_name", "Unknown Crew"),
"execution_start": event.timestamp if hasattr(event, "timestamp") else None, "execution_start": event.timestamp,
"crewai_version": get_crewai_version(), "crewai_version": get_crewai_version(),
} }
@@ -635,7 +636,7 @@ class TraceCollectionListener(BaseEventListener):
self._initialize_batch(user_context, execution_metadata) self._initialize_batch(user_context, execution_metadata)
def _initialize_flow_batch(self, source: Any, event: Any) -> None: def _initialize_flow_batch(self, source: Any, event: BaseEvent) -> None:
"""Initialize trace batch for Flow execution. """Initialize trace batch for Flow execution.
Args: Args:
@@ -645,7 +646,7 @@ class TraceCollectionListener(BaseEventListener):
user_context = self._get_user_context() user_context = self._get_user_context()
execution_metadata = { execution_metadata = {
"flow_name": getattr(event, "flow_name", "Unknown Flow"), "flow_name": getattr(event, "flow_name", "Unknown Flow"),
"execution_start": event.timestamp if hasattr(event, "timestamp") else None, "execution_start": event.timestamp,
"crewai_version": get_crewai_version(), "crewai_version": get_crewai_version(),
"execution_type": "flow", "execution_type": "flow",
} }
@@ -714,18 +715,16 @@ class TraceCollectionListener(BaseEventListener):
self.batch_manager.end_event_processing() self.batch_manager.end_event_processing()
def _create_trace_event( def _create_trace_event(
self, event_type: str, source: Any, event: Any self, event_type: str, source: Any, event: BaseEvent
) -> TraceEvent: ) -> TraceEvent:
"""Create a trace event""" """Create a trace event with ordering information."""
if hasattr(event, "timestamp") and event.timestamp: trace_event = TraceEvent(
trace_event = TraceEvent( type=event_type,
type=event_type, timestamp=event.timestamp.isoformat() if event.timestamp else "",
timestamp=event.timestamp.isoformat(), event_id=event.event_id,
) emission_sequence=event.emission_sequence,
else: parent_event_id=event.parent_event_id,
trace_event = TraceEvent( )
type=event_type,
)
trace_event.event_data = self._build_event_data(event_type, event, source) trace_event.event_data = self._build_event_data(event_type, event, source)
@@ -778,10 +777,8 @@ class TraceCollectionListener(BaseEventListener):
} }
if event_type == "llm_call_started": if event_type == "llm_call_started":
event_data = safe_serialize_to_dict(event) event_data = safe_serialize_to_dict(event)
event_data["task_name"] = ( event_data["task_name"] = event.task_name or getattr(
event.task_name or event.task_description event, "task_description", None
if hasattr(event, "task_name") and event.task_name
else None
) )
return event_data return event_data
if event_type == "llm_call_completed": if event_type == "llm_call_completed":