From ae253b41563bf987a88fe1de9cbb56ef0c6d3962 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Tue, 20 Jan 2026 01:07:56 -0500 Subject: [PATCH] refactor: improve tracing listener typing and sorting --- .../listeners/tracing/trace_batch_manager.py | 9 +++-- .../listeners/tracing/trace_listener.py | 35 +++++++++---------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py index 3c5bf0aa2..6c45f63ef 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py @@ -267,9 +267,12 @@ class TraceBatchManager: sorted_events = sorted( self.event_buffer, - key=lambda e: e.timestamp - if hasattr(e, "timestamp") and e.timestamp - else "", + key=lambda e: ( + e.emission_sequence + if e.emission_sequence is not None + else float("inf"), + e.timestamp if hasattr(e, "timestamp") and e.timestamp else "", + ), ) self.current_batch.events = sorted_events diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index a4d4cbd31..32fa40606 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -9,6 +9,7 @@ from typing_extensions import Self from crewai.cli.authentication.token import AuthError, get_auth_token from crewai.cli.version import get_crewai_version from crewai.events.base_event_listener import BaseEventListener +from crewai.events.base_events import BaseEvent from crewai.events.event_bus import CrewAIEventsBus from crewai.events.listeners.tracing.first_time_trace_handler import ( FirstTimeTraceHandler, @@ -616,7 +617,7 @@ class TraceCollectionListener(BaseEventListener): if self.batch_manager.is_batch_initialized(): self.batch_manager.finalize_batch() - def _initialize_crew_batch(self, source: Any, event: Any) -> None: + def _initialize_crew_batch(self, source: Any, event: BaseEvent) -> None: """Initialize trace batch. Args: @@ -626,7 +627,7 @@ class TraceCollectionListener(BaseEventListener): user_context = self._get_user_context() execution_metadata = { "crew_name": getattr(event, "crew_name", "Unknown Crew"), - "execution_start": event.timestamp if hasattr(event, "timestamp") else None, + "execution_start": event.timestamp, "crewai_version": get_crewai_version(), } @@ -635,7 +636,7 @@ class TraceCollectionListener(BaseEventListener): self._initialize_batch(user_context, execution_metadata) - def _initialize_flow_batch(self, source: Any, event: Any) -> None: + def _initialize_flow_batch(self, source: Any, event: BaseEvent) -> None: """Initialize trace batch for Flow execution. Args: @@ -645,7 +646,7 @@ class TraceCollectionListener(BaseEventListener): user_context = self._get_user_context() execution_metadata = { "flow_name": getattr(event, "flow_name", "Unknown Flow"), - "execution_start": event.timestamp if hasattr(event, "timestamp") else None, + "execution_start": event.timestamp, "crewai_version": get_crewai_version(), "execution_type": "flow", } @@ -714,18 +715,16 @@ class TraceCollectionListener(BaseEventListener): self.batch_manager.end_event_processing() def _create_trace_event( - self, event_type: str, source: Any, event: Any + self, event_type: str, source: Any, event: BaseEvent ) -> TraceEvent: - """Create a trace event""" - if hasattr(event, "timestamp") and event.timestamp: - trace_event = TraceEvent( - type=event_type, - timestamp=event.timestamp.isoformat(), - ) - else: - trace_event = TraceEvent( - type=event_type, - ) + """Create a trace event with ordering information.""" + trace_event = TraceEvent( + type=event_type, + timestamp=event.timestamp.isoformat() if event.timestamp else "", + event_id=event.event_id, + emission_sequence=event.emission_sequence, + parent_event_id=event.parent_event_id, + ) trace_event.event_data = self._build_event_data(event_type, event, source) @@ -778,10 +777,8 @@ class TraceCollectionListener(BaseEventListener): } if event_type == "llm_call_started": event_data = safe_serialize_to_dict(event) - event_data["task_name"] = ( - event.task_name or event.task_description - if hasattr(event, "task_name") and event.task_name - else None + event_data["task_name"] = event.task_name or getattr( + event, "task_description", None ) return event_data if event_type == "llm_call_completed":