Move flow trace collection and batch finalisation to event listener

This commit is contained in:
Heitor Sammuel Carvalho
2025-12-18 11:03:10 -03:00
parent b4b6434480
commit 0f28d14e61
2 changed files with 6 additions and 8 deletions

View File

@@ -194,6 +194,12 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(FlowFinishedEvent)
def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
self._handle_trace_event("flow_finished", source, event)
if self.batch_manager.batch_owner_type == "flow":
if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
self.first_time_handler.handle_execution_completion()
else:
self.batch_manager.finalize_batch()
@event_bus.on(FlowPlotEvent)
def on_flow_plot(source: Any, event: FlowPlotEvent) -> None:

View File

@@ -1020,14 +1020,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
)
self._event_futures.clear()
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
return final_output
finally:
detach(flow_token)