ensure flow started events only emiited once

This commit is contained in:
lorenzejay
2026-06-04 10:11:37 -07:00
parent 880355648f
commit b9be5586b2
2 changed files with 75 additions and 28 deletions

View File

@@ -907,6 +907,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
_pending_user_message: str | dict[str, Any] | None = PrivateAttr(default=None)
_pending_intents: Sequence[str] | None = PrivateAttr(default=None)
_pending_intent_llm: str | "BaseLLM" | None = PrivateAttr(default=None)
_deferred_flow_started_event_id: str | None = PrivateAttr(default=None)
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
class _FlowGeneric(cls): # type: ignore[valid-type,misc]
@@ -2206,38 +2207,51 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
):
self._apply_pending_conversational_turn()
if get_current_parent_id() is None:
defer_trace_finalization = self._should_defer_trace_finalization()
deferred_started_event_id = self._deferred_flow_started_event_id
should_emit_flow_started = not (
defer_trace_finalization and deferred_started_event_id
)
if (
defer_trace_finalization
and deferred_started_event_id
and get_current_parent_id() is None
):
restore_event_scope(((deferred_started_event_id, "flow_started"),))
elif get_current_parent_id() is None:
reset_emission_counter()
reset_last_event_id()
# ``FlowStartedEvent`` always fires — ``suppress_flow_events``
# only hides the Rich console panel (and the textual log line
# below), it doesn't gate observability events. Tracing /
# downstream listeners still need to see flow_started.
started_event = FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
)
future = crewai_event_bus.emit(self, started_event)
if future:
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowStartedEvent handler failed", exc_info=True)
# Stash the started event id so a deferred
# ``finalize_session_traces()`` can restore the event scope
# before emitting ``FlowFinishedEvent`` (otherwise the bus
# warns "Ending event 'flow_finished' emitted with empty
# scope stack").
if self._should_defer_trace_finalization():
object.__setattr__(
self, "_deferred_flow_started_event_id", started_event.event_id
)
if not self.suppress_flow_events:
self._log_flow_event(
f"Flow started with ID: {self.flow_id}", color="bold magenta"
if should_emit_flow_started:
# In normal flows, each kickoff owns its own flow lifecycle.
# Deferred conversational sessions are different: the first
# turn opens the flow scope and later turns reuse it until
# ``finalize_session_traces()`` emits the single finish event.
started_event = FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
)
future = crewai_event_bus.emit(self, started_event)
if future:
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowStartedEvent handler failed", exc_info=True)
# Stash the started event id so a deferred
# ``finalize_session_traces()`` can restore the event scope
# before emitting ``FlowFinishedEvent`` (otherwise the bus
# warns "Ending event 'flow_finished' emitted with empty
# scope stack").
if defer_trace_finalization:
object.__setattr__(
self, "_deferred_flow_started_event_id", started_event.event_id
)
if not self.suppress_flow_events:
self._log_flow_event(
f"Flow started with ID: {self.flow_id}", color="bold magenta"
)
# After FlowStarted: env events must not pre-empt trace batch init
# with implicit "crew" execution_type.

View File

@@ -941,6 +941,39 @@ class TestConversationalFlow:
"defer_trace_finalization=True must skip per-turn finalize"
)
def test_deferred_conversation_emits_one_flow_started(self) -> None:
"""Deferred conversational sessions emit one flow_started for the session."""
from crewai.events.types.flow_events import FlowStartedEvent
@ConversationConfig(defer_trace_finalization=True)
class DeferredFlow(ConversationalFlow):
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> str:
self.append_assistant_message("worked")
return "worked"
flow = DeferredFlow()
started_events: list[FlowStartedEvent] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowStartedEvent)
def capture(_: Any, event: FlowStartedEvent) -> None:
started_events.append(event)
flow.handle_turn("turn 1")
flow.handle_turn("turn 2")
flow.handle_turn("turn 3")
crewai_event_bus.flush()
assert len(started_events) == 1, (
"deferred conversational traces should emit one session-level "
"flow_started event, not one per turn"
)
def test_finalize_session_traces_emits_finished_and_finalizes_batch(self) -> None:
"""``finalize_session_traces()`` emits one ``FlowFinishedEvent`` + one ``finalize_batch``.