Refactor tracing logic to consolidate conditions for enabling tracing… (#3347)

* Refactor tracing logic to consolidate conditions for enabling tracing in Crew class and update TraceBatchManager to handle ephemeral batches more effectively. Added tests for trace listener handling of both ephemeral and authenticated user batches.

* drop print

* linted

* refactor: streamline ephemeral handling in TraceBatchManager

This commit removes the ephemeral parameter from the _send_events_to_backend and _finalize_backend_batch methods, replacing it with internal logic that checks the current batch's ephemeral status. This change simplifies the method signatures and enhances the clarity of the code by directly using the is_current_batch_ephemeral attribute for conditional logic.
This commit is contained in:
Lorenze Jay
2025-08-18 14:16:51 -07:00
committed by GitHub
parent 80b3d9689a
commit a1b3edd79c
6 changed files with 336 additions and 13 deletions

View File

@@ -286,10 +286,12 @@ class Crew(FlowTrackable, BaseModel):
self._cache_handler = CacheHandler()
event_listener = EventListener()
if on_first_execution_tracing_confirmation():
self.tracing = True
if is_tracing_enabled() or self.tracing:
if (
on_first_execution_tracing_confirmation()
or is_tracing_enabled()
or self.tracing
):
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
event_listener.verbose = self.verbose

View File

@@ -40,6 +40,8 @@ class TraceBatch:
class TraceBatchManager:
"""Single responsibility: Manage batches and event buffering"""
is_current_batch_ephemeral: bool = False
def __init__(self):
try:
self.plus_api = PlusAPI(api_key=get_auth_token())
@@ -62,6 +64,7 @@ class TraceBatchManager:
user_context=user_context, execution_metadata=execution_metadata
)
self.event_buffer.clear()
self.is_current_batch_ephemeral = use_ephemeral
self.record_start_time("execution")
self._initialize_backend_batch(user_context, execution_metadata, use_ephemeral)
@@ -136,7 +139,7 @@ class TraceBatchManager:
"""Add event to buffer"""
self.event_buffer.append(trace_event)
def _send_events_to_backend(self, ephemeral: bool = True):
def _send_events_to_backend(self):
"""Send buffered events to backend"""
if not self.plus_api or not self.trace_batch_id or not self.event_buffer:
return
@@ -156,7 +159,7 @@ class TraceBatchManager:
response = (
self.plus_api.send_ephemeral_trace_events(self.trace_batch_id, payload)
if ephemeral
if self.is_current_batch_ephemeral
else self.plus_api.send_trace_events(self.trace_batch_id, payload)
)
@@ -170,15 +173,14 @@ class TraceBatchManager:
except Exception as e:
logger.error(f"❌ Error sending events to backend: {str(e)}")
def finalize_batch(self, ephemeral: bool = True) -> Optional[TraceBatch]:
def finalize_batch(self) -> Optional[TraceBatch]:
"""Finalize batch and return it for sending"""
if not self.current_batch:
return None
if self.event_buffer:
self._send_events_to_backend(ephemeral)
self._finalize_backend_batch(ephemeral)
self._send_events_to_backend()
self._finalize_backend_batch()
self.current_batch.events = self.event_buffer.copy()
@@ -187,12 +189,13 @@ class TraceBatchManager:
self.current_batch = None
self.event_buffer.clear()
self.trace_batch_id = None
self.is_current_batch_ephemeral = False
self._cleanup_batch_data()
return finalized_batch
def _finalize_backend_batch(self, ephemeral: bool = True):
def _finalize_backend_batch(self):
"""Send batch finalization to backend"""
if not self.plus_api or not self.trace_batch_id:
return
@@ -210,7 +213,7 @@ class TraceBatchManager:
self.plus_api.finalize_ephemeral_trace_batch(
self.trace_batch_id, payload
)
if ephemeral
if self.is_current_batch_ephemeral
else self.plus_api.finalize_trace_batch(self.trace_batch_id, payload)
)
@@ -219,7 +222,7 @@ class TraceBatchManager:
console = Console()
return_link = (
f"{CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}"
if not ephemeral and access_code
if not self.is_current_batch_ephemeral and access_code is None
else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}"
)
panel = Panel(

View File

@@ -166,7 +166,7 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
self._handle_trace_event("crew_kickoff_completed", source, event)
self.batch_manager.finalize_batch(ephemeral=True)
self.batch_manager.finalize_batch()
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event):