diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 16cc88db2..a057da581 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1455,6 +1455,25 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): "No pending feedback context. Use from_pending() to restore a paused flow." ) + if get_current_parent_id() is None: + reset_emission_counter() + reset_last_event_id() + + if not self.suppress_flow_events: + future = crewai_event_bus.emit( + self, + FlowStartedEvent( + type="flow_started", + flow_name=self.name or self.__class__.__name__, + inputs=None, + ), + ) + if future and isinstance(future, Future): + try: + await asyncio.wrap_future(future) + except Exception: + logger.warning("FlowStartedEvent handler failed", exc_info=True) + context = self._pending_feedback_context emit = context.emit default_outcome = context.default_outcome @@ -1594,16 +1613,39 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): final_result = self._method_outputs[-1] if self._method_outputs else result - # Emit flow finished - crewai_event_bus.emit( - self, - FlowFinishedEvent( - type="flow_finished", - flow_name=self.name or self.__class__.__name__, - result=final_result, - state=self._state, - ), - ) + if self._event_futures: + await asyncio.gather( + *[ + asyncio.wrap_future(f) + for f in self._event_futures + if isinstance(f, Future) + ] + ) + self._event_futures.clear() + + if not self.suppress_flow_events: + future = crewai_event_bus.emit( + self, + FlowFinishedEvent( + type="flow_finished", + flow_name=self.name or self.__class__.__name__, + result=final_result, + state=self._copy_and_serialize_state(), + ), + ) + if future and isinstance(future, Future): + try: + await asyncio.wrap_future(future) + except Exception: + logger.warning("FlowFinishedEvent handler failed", exc_info=True) + + trace_listener = TraceCollectionListener() + if trace_listener.batch_manager.batch_owner_type == "flow": + if trace_listener.first_time_handler.is_first_time: + trace_listener.first_time_handler.mark_events_collected() + trace_listener.first_time_handler.handle_execution_completion() + else: + trace_listener.batch_manager.finalize_batch() return final_result