diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index 0a2a84a95..bb2387cb5 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -907,6 +907,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): _pending_user_message: str | dict[str, Any] | None = PrivateAttr(default=None) _pending_intents: Sequence[str] | None = PrivateAttr(default=None) _pending_intent_llm: str | "BaseLLM" | None = PrivateAttr(default=None) + _deferred_flow_started_event_id: str | None = PrivateAttr(default=None) def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override] class _FlowGeneric(cls): # type: ignore[valid-type,misc] @@ -2206,38 +2207,51 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): ): self._apply_pending_conversational_turn() - if get_current_parent_id() is None: + defer_trace_finalization = self._should_defer_trace_finalization() + deferred_started_event_id = self._deferred_flow_started_event_id + should_emit_flow_started = not ( + defer_trace_finalization and deferred_started_event_id + ) + + if ( + defer_trace_finalization + and deferred_started_event_id + and get_current_parent_id() is None + ): + restore_event_scope(((deferred_started_event_id, "flow_started"),)) + elif get_current_parent_id() is None: reset_emission_counter() reset_last_event_id() - # ``FlowStartedEvent`` always fires — ``suppress_flow_events`` - # only hides the Rich console panel (and the textual log line - # below), it doesn't gate observability events. Tracing / - # downstream listeners still need to see flow_started. - started_event = FlowStartedEvent( - type="flow_started", - flow_name=self.name or self.__class__.__name__, - inputs=inputs, - ) - future = crewai_event_bus.emit(self, started_event) - if future: - try: - await asyncio.wrap_future(future) - except Exception: - logger.warning("FlowStartedEvent handler failed", exc_info=True) - # Stash the started event id so a deferred - # ``finalize_session_traces()`` can restore the event scope - # before emitting ``FlowFinishedEvent`` (otherwise the bus - # warns "Ending event 'flow_finished' emitted with empty - # scope stack"). - if self._should_defer_trace_finalization(): - object.__setattr__( - self, "_deferred_flow_started_event_id", started_event.event_id - ) - if not self.suppress_flow_events: - self._log_flow_event( - f"Flow started with ID: {self.flow_id}", color="bold magenta" + if should_emit_flow_started: + # In normal flows, each kickoff owns its own flow lifecycle. + # Deferred conversational sessions are different: the first + # turn opens the flow scope and later turns reuse it until + # ``finalize_session_traces()`` emits the single finish event. + started_event = FlowStartedEvent( + type="flow_started", + flow_name=self.name or self.__class__.__name__, + inputs=inputs, ) + future = crewai_event_bus.emit(self, started_event) + if future: + try: + await asyncio.wrap_future(future) + except Exception: + logger.warning("FlowStartedEvent handler failed", exc_info=True) + # Stash the started event id so a deferred + # ``finalize_session_traces()`` can restore the event scope + # before emitting ``FlowFinishedEvent`` (otherwise the bus + # warns "Ending event 'flow_finished' emitted with empty + # scope stack"). + if defer_trace_finalization: + object.__setattr__( + self, "_deferred_flow_started_event_id", started_event.event_id + ) + if not self.suppress_flow_events: + self._log_flow_event( + f"Flow started with ID: {self.flow_id}", color="bold magenta" + ) # After FlowStarted: env events must not pre-empt trace batch init # with implicit "crew" execution_type. diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 0525ac439..bd14d0cf0 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -941,6 +941,39 @@ class TestConversationalFlow: "defer_trace_finalization=True must skip per-turn finalize" ) + def test_deferred_conversation_emits_one_flow_started(self) -> None: + """Deferred conversational sessions emit one flow_started for the session.""" + from crewai.events.types.flow_events import FlowStartedEvent + + @ConversationConfig(defer_trace_finalization=True) + class DeferredFlow(ConversationalFlow): + def route_turn(self, context: dict[str, Any]) -> str | None: + return "work" + + @listen("work") + def do_work(self) -> str: + self.append_assistant_message("worked") + return "worked" + + flow = DeferredFlow() + started_events: list[FlowStartedEvent] = [] + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(FlowStartedEvent) + def capture(_: Any, event: FlowStartedEvent) -> None: + started_events.append(event) + + flow.handle_turn("turn 1") + flow.handle_turn("turn 2") + flow.handle_turn("turn 3") + crewai_event_bus.flush() + + assert len(started_events) == 1, ( + "deferred conversational traces should emit one session-level " + "flow_started event, not one per turn" + ) + def test_finalize_session_traces_emits_finished_and_finalizes_batch(self) -> None: """``finalize_session_traces()`` emits one ``FlowFinishedEvent`` + one ``finalize_batch``.