From 8cd51fc67ea339f578384737187871a3012eba96 Mon Sep 17 00:00:00 2001 From: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> Date: Fri, 5 Jun 2026 14:10:19 -0700 Subject: [PATCH] Lorenze/imp/conversational flow traces (#6044) * feat: add conversation message and route selection events - Introduced `ConversationMessageAddedEvent` and `ConversationRouteSelectedEvent` to enhance conversational flow tracking. - Updated event listeners to emit these events during message handling and routing decisions. - Enhanced the `_ConversationalMixin` class to emit events for user and assistant messages, as well as selected routes. - Added tests to verify the correct emission of these events during conversational turns. * ensure flow started events only emiited once * refactor(tracing): rename trace event handler methods to action event handlers Updated the class to replace with for and events, improving clarity in event handling. Additionally, adjusted comments in the class to clarify the application of pending user messages in relation to state restoration and flow scope initialization. * fix(conversational_mixin): handle empty message index in route events Updated the message index handling in the class to return when there are no messages. Added tests to ensure that route events do not reference index zero when the transcript is empty, and verified the correct emission of conversation message events during flow handling. --- lib/crewai/src/crewai/events/__init__.py | 6 + lib/crewai/src/crewai/events/event_types.py | 4 + .../listeners/tracing/trace_listener.py | 14 +++ .../src/crewai/events/types/flow_events.py | 25 ++++ .../experimental/conversational_mixin.py | 77 +++++++++++- lib/crewai/src/crewai/flow/runtime.py | 91 ++++++++------ lib/crewai/tests/test_flow_conversation.py | 118 ++++++++++++++++++ 7 files changed, 296 insertions(+), 39 deletions(-) diff --git a/lib/crewai/src/crewai/events/__init__.py b/lib/crewai/src/crewai/events/__init__.py index 4d9d836c3..b026c451d 100644 --- a/lib/crewai/src/crewai/events/__init__.py +++ b/lib/crewai/src/crewai/events/__init__.py @@ -61,6 +61,8 @@ if TYPE_CHECKING: CrewTrainStartedEvent, ) from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, FlowCreatedEvent, FlowEvent, FlowFinishedEvent, @@ -176,6 +178,8 @@ _LAZY_EVENT_MAPPING: dict[str, str] = { "CrewTrainCompletedEvent": "crewai.events.types.crew_events", "CrewTrainFailedEvent": "crewai.events.types.crew_events", "CrewTrainStartedEvent": "crewai.events.types.crew_events", + "ConversationMessageAddedEvent": "crewai.events.types.flow_events", + "ConversationRouteSelectedEvent": "crewai.events.types.flow_events", "FlowCreatedEvent": "crewai.events.types.flow_events", "FlowEvent": "crewai.events.types.flow_events", "FlowFinishedEvent": "crewai.events.types.flow_events", @@ -291,6 +295,8 @@ __all__ = [ "CheckpointRestoreStartedEvent", "CheckpointStartedEvent", "CircularDependencyError", + "ConversationMessageAddedEvent", + "ConversationRouteSelectedEvent", "CrewKickoffCompletedEvent", "CrewKickoffFailedEvent", "CrewKickoffStartedEvent", diff --git a/lib/crewai/src/crewai/events/event_types.py b/lib/crewai/src/crewai/events/event_types.py index f336ce75a..dcf31cb03 100644 --- a/lib/crewai/src/crewai/events/event_types.py +++ b/lib/crewai/src/crewai/events/event_types.py @@ -53,6 +53,8 @@ from crewai.events.types.crew_events import ( CrewTrainStartedEvent, ) from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, FlowFinishedEvent, FlowStartedEvent, MethodExecutionFailedEvent, @@ -154,6 +156,8 @@ EventTypes = ( | TaskStartedEvent | TaskCompletedEvent | TaskFailedEvent + | ConversationMessageAddedEvent + | ConversationRouteSelectedEvent | FlowStartedEvent | FlowFinishedEvent | MethodExecutionStartedEvent diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index c7901fed3..c85e6202a 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -62,6 +62,8 @@ from crewai.events.types.crew_events import ( CrewKickoffStartedEvent, ) from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, FlowCreatedEvent, FlowFinishedEvent, FlowPlotEvent, @@ -255,6 +257,18 @@ class TraceCollectionListener(BaseEventListener): def on_method_failed(source: Any, event: MethodExecutionFailedEvent) -> None: self._handle_trace_event("method_execution_failed", source, event) + @event_bus.on(ConversationMessageAddedEvent) + def on_conversation_message_added( + source: Any, event: ConversationMessageAddedEvent + ) -> None: + self._handle_action_event("conversation_message_added", source, event) + + @event_bus.on(ConversationRouteSelectedEvent) + def on_conversation_route_selected( + source: Any, event: ConversationRouteSelectedEvent + ) -> None: + self._handle_action_event("conversation_route_selected", source, event) + @event_bus.on(FlowFinishedEvent) def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None: self._handle_trace_event("flow_finished", source, event) diff --git a/lib/crewai/src/crewai/events/types/flow_events.py b/lib/crewai/src/crewai/events/types/flow_events.py index c2c1e2912..5ff4d4038 100644 --- a/lib/crewai/src/crewai/events/types/flow_events.py +++ b/lib/crewai/src/crewai/events/types/flow_events.py @@ -166,6 +166,31 @@ class FlowInputReceivedEvent(FlowEvent): type: Literal["flow_input_received"] = "flow_input_received" +class ConversationMessageAddedEvent(FlowEvent): + """Event emitted when a conversational Flow records a message. + + This gives trace consumers a first-class transcript signal instead of + requiring them to inspect the full method state payload. + """ + + session_id: str + role: Literal["user", "assistant", "system", "tool"] + content: Any + message_index: int + type: Literal["conversation_message_added"] = "conversation_message_added" + + +class ConversationRouteSelectedEvent(FlowEvent): + """Event emitted when a conversational Flow selects a route for a turn.""" + + session_id: str + route: str + user_message: str | None = None + message_index: int | None = None + previous_intent: str | None = None + type: Literal["conversation_route_selected"] = "conversation_route_selected" + + class HumanFeedbackRequestedEvent(FlowEvent): """Event emitted when human feedback is requested. diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index 3d3a8d05d..3801d0570 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -24,6 +24,11 @@ from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast from pydantic import BaseModel, Field, create_model +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, +) from crewai.experimental.conversational import ( AgentMessage, ConversationConfig, @@ -122,19 +127,36 @@ class _ConversationalMixin: """Route the current turn to a listener label.""" state = cast(ConversationState, self.state) context = self.build_router_context() + previous_intent = state.last_intent configured_route = self.route_turn(context) if configured_route: state.last_intent = configured_route + self._emit_conversation_route_selected( + configured_route, + previous_intent=previous_intent, + ) return configured_route if state.last_intent: + self._emit_conversation_route_selected( + state.last_intent, + previous_intent=previous_intent, + ) return state.last_intent if self.can_answer_from_history(context): state.last_intent = "answer_from_history" + self._emit_conversation_route_selected( + "answer_from_history", + previous_intent=previous_intent, + ) return "answer_from_history" state.last_intent = "converse" + self._emit_conversation_route_selected( + "converse", + previous_intent=previous_intent, + ) return "converse" @listen("converse") @@ -406,13 +428,61 @@ class _ConversationalMixin: metadata: dict[str, Any] | None = None, ) -> None: """Append a final user-visible assistant message.""" - cast(ConversationState, self.state).messages.append( + state = cast(ConversationState, self.state) + state.messages.append( ConversationMessage( role="assistant", content=content, metadata=metadata or {}, ) ) + self._emit_conversation_message_added( + role="assistant", + content=content, + message_index=len(state.messages) - 1, + ) + + def _emit_conversation_message_added( + self, + *, + role: Literal["user", "assistant", "system", "tool"], + content: Any, + message_index: int, + ) -> None: + """Emit a compact transcript event for conversational trace views.""" + state = cast(ConversationState, self.state) + crewai_event_bus.emit( + self, + ConversationMessageAddedEvent( + type="conversation_message_added", + flow_name=self.name or self.__class__.__name__, + session_id=state.id, + role=role, + content=content, + message_index=message_index, + ), + ) + + def _emit_conversation_route_selected( + self, + route: str, + *, + previous_intent: str | None = None, + ) -> None: + """Emit the conversational routing decision for the current turn.""" + state = cast(ConversationState, self.state) + crewai_event_bus.emit( + self, + ConversationRouteSelectedEvent( + type="conversation_route_selected", + flow_name=self.name or self.__class__.__name__, + session_id=state.id, + route=route, + user_message=state.current_user_message, + message_index=(len(state.messages) - 1) if state.messages else None, + previous_intent=previous_intent, + ), + ) def append_message( self, @@ -447,6 +517,11 @@ class _ConversationalMixin: if self.conversational: state = cast(ConversationState, self.state) state.messages.append(ConversationMessage(role="user", content=text)) + self._emit_conversation_message_added( + role="user", + content=text, + message_index=len(state.messages) - 1, + ) state.current_user_message = text state.last_user_message = text if outcomes and llm is not None: diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index 80b7a84da..7dfefd3d8 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -912,6 +912,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): _pending_user_message: str | dict[str, Any] | None = PrivateAttr(default=None) _pending_intents: Sequence[str] | None = PrivateAttr(default=None) _pending_intent_llm: str | "BaseLLM" | None = PrivateAttr(default=None) + _deferred_flow_started_event_id: str | None = PrivateAttr(default=None) def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override] class _FlowGeneric(cls): # type: ignore[valid-type,misc] @@ -2201,8 +2202,59 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): if filtered_inputs: self._initialize_state(filtered_inputs) + defer_trace_finalization = self._should_defer_trace_finalization() + deferred_started_event_id = self._deferred_flow_started_event_id + should_emit_flow_started = not ( + defer_trace_finalization and deferred_started_event_id + ) + + if ( + defer_trace_finalization + and deferred_started_event_id + and get_current_parent_id() is None + ): + restore_event_scope(((deferred_started_event_id, "flow_started"),)) + elif get_current_parent_id() is None: + reset_emission_counter() + reset_last_event_id() + + if should_emit_flow_started: + # In normal flows, each kickoff owns its own flow lifecycle. + # Deferred conversational sessions are different: the first + # turn opens the flow scope and later turns reuse it until + # ``finalize_session_traces()`` emits the single finish event. + started_event = FlowStartedEvent( + type="flow_started", + flow_name=self.name or self.__class__.__name__, + inputs=inputs, + ) + future = crewai_event_bus.emit(self, started_event) + if future: + try: + await asyncio.wrap_future(future) + except Exception: + logger.warning("FlowStartedEvent handler failed", exc_info=True) + # Stash the started event id so a deferred + # ``finalize_session_traces()`` can restore the event scope + # before emitting ``FlowFinishedEvent`` (otherwise the bus + # warns "Ending event 'flow_finished' emitted with empty + # scope stack"). + if defer_trace_finalization: + object.__setattr__( + self, "_deferred_flow_started_event_id", started_event.event_id + ) + if not self.suppress_flow_events: + self._log_flow_event( + f"Flow started with ID: {self.flow_id}", color="bold magenta" + ) + + # After FlowStarted: env events must not pre-empt trace batch init + # with implicit "crew" execution_type. + get_env_context() + # Conversational hook: apply the pending user message AFTER state - # restore so it survives ``self.persistence.load_state(...)``. + # restore and AFTER flow scope initialization, so transcript events + # are parented under the current conversation trace. # ``handle_turn`` stashes the message on ``self._pending_user_message`` # before calling ``kickoff``; this drains it. if ( @@ -2211,43 +2263,6 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): ): self._apply_pending_conversational_turn() - if get_current_parent_id() is None: - reset_emission_counter() - reset_last_event_id() - - # ``FlowStartedEvent`` always fires — ``suppress_flow_events`` - # only hides the Rich console panel (and the textual log line - # below), it doesn't gate observability events. Tracing / - # downstream listeners still need to see flow_started. - started_event = FlowStartedEvent( - type="flow_started", - flow_name=self.name or self.__class__.__name__, - inputs=inputs, - ) - future = crewai_event_bus.emit(self, started_event) - if future: - try: - await asyncio.wrap_future(future) - except Exception: - logger.warning("FlowStartedEvent handler failed", exc_info=True) - # Stash the started event id so a deferred - # ``finalize_session_traces()`` can restore the event scope - # before emitting ``FlowFinishedEvent`` (otherwise the bus - # warns "Ending event 'flow_finished' emitted with empty - # scope stack"). - if self._should_defer_trace_finalization(): - object.__setattr__( - self, "_deferred_flow_started_event_id", started_event.event_id - ) - if not self.suppress_flow_events: - self._log_flow_event( - f"Flow started with ID: {self.flow_id}", color="bold magenta" - ) - - # After FlowStarted: env events must not pre-empt trace batch init - # with implicit "crew" execution_type. - get_env_context() - if inputs is not None and "id" not in inputs: self._initialize_state(inputs) diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 4ed61394e..fab03ad29 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -11,6 +11,8 @@ from pydantic import BaseModel from crewai.events.event_bus import crewai_event_bus from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, FlowStartedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, @@ -421,6 +423,56 @@ class TestConversationalFlow: assert any(message["content"] == "prior findings" for message in messages) assert any(message["content"] == "summarize findings" for message in messages) + def test_conversational_turn_emits_message_and_route_events(self) -> None: + class ResearchRoute(BaseModel): + intent: Literal["research", "converse", "end"] + + router_llm = MagicMock() + router_llm.call.return_value = ResearchRoute(intent="converse") + chat_llm = MagicMock() + chat_llm.call.return_value = "hello back" + + @ConversationConfig( + llm=chat_llm, + router=RouterConfig( + response_format=ResearchRoute, + llm=router_llm, + routes=["research"], + ), + ) + class RoutedFlow(ConversationalFlow): + @listen("research") + def run_research(self) -> str: + self.append_assistant_message("researched") + return "researched" + + messages: list[ConversationMessageAddedEvent] = [] + routes: list[ConversationRouteSelectedEvent] = [] + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(ConversationMessageAddedEvent) + def capture_message(_: Any, event: ConversationMessageAddedEvent) -> None: + messages.append(event) + + @crewai_event_bus.on(ConversationRouteSelectedEvent) + def capture_route(_: Any, event: ConversationRouteSelectedEvent) -> None: + routes.append(event) + + flow = RoutedFlow() + flow.handle_turn("just chat") + crewai_event_bus.flush() + + assert [(event.role, event.content) for event in messages] == [ + ("user", "just chat"), + ("assistant", "hello back"), + ] + assert [event.message_index for event in messages] == [0, 1] + assert len(routes) == 1 + assert routes[0].route == "converse" + assert routes[0].user_message == "just chat" + assert routes[0].session_id == messages[0].session_id + def test_builtin_end_marks_conversation_ended(self) -> None: class ResearchRoute(BaseModel): intent: Literal["research", "converse", "end"] @@ -969,6 +1021,72 @@ class TestConversationalFlow: "defer_trace_finalization=True must skip per-turn finalize" ) + def test_deferred_conversation_emits_one_flow_started(self) -> None: + """Deferred conversational sessions emit one flow_started for the session.""" + from crewai.events.types.flow_events import FlowStartedEvent + + @ConversationConfig(defer_trace_finalization=True) + class DeferredFlow(ConversationalFlow): + def route_turn(self, context: dict[str, Any]) -> str | None: + return "work" + + @listen("work") + def do_work(self) -> str: + self.append_assistant_message("worked") + return "worked" + + flow = DeferredFlow() + observed_events: list[str] = [] + started_events: list[FlowStartedEvent] = [] + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(FlowStartedEvent) + def capture(_: Any, event: FlowStartedEvent) -> None: + observed_events.append(event.type) + started_events.append(event) + + @crewai_event_bus.on(ConversationMessageAddedEvent) + def capture_message( + _: Any, event: ConversationMessageAddedEvent + ) -> None: + if event.role == "user": + observed_events.append(event.type) + + flow.handle_turn("turn 1") + flow.handle_turn("turn 2") + flow.handle_turn("turn 3") + crewai_event_bus.flush() + + assert len(started_events) == 1, ( + "deferred conversational traces should emit one session-level " + "flow_started event, not one per turn" + ) + assert observed_events[0] == "flow_started" + assert observed_events[1] == "conversation_message_added" + + def test_route_event_uses_no_message_index_for_empty_transcript(self) -> None: + """Route events do not reference index zero when no message exists.""" + + @ConversationConfig() + class DeferredFlow(ConversationalFlow): + pass + + flow = DeferredFlow() + route_events: list[ConversationRouteSelectedEvent] = [] + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(ConversationRouteSelectedEvent) + def capture(_: Any, event: ConversationRouteSelectedEvent) -> None: + route_events.append(event) + + flow._emit_conversation_route_selected("converse") + crewai_event_bus.flush() + + assert len(route_events) == 1 + assert route_events[0].message_index is None + def test_finalize_session_traces_emits_finished_and_finalizes_batch(self) -> None: """``finalize_session_traces()`` emits one ``FlowFinishedEvent`` + one ``finalize_batch``.