diff --git a/lib/crewai/src/crewai/events/__init__.py b/lib/crewai/src/crewai/events/__init__.py index 4d9d836c3..b026c451d 100644 --- a/lib/crewai/src/crewai/events/__init__.py +++ b/lib/crewai/src/crewai/events/__init__.py @@ -61,6 +61,8 @@ if TYPE_CHECKING: CrewTrainStartedEvent, ) from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, FlowCreatedEvent, FlowEvent, FlowFinishedEvent, @@ -176,6 +178,8 @@ _LAZY_EVENT_MAPPING: dict[str, str] = { "CrewTrainCompletedEvent": "crewai.events.types.crew_events", "CrewTrainFailedEvent": "crewai.events.types.crew_events", "CrewTrainStartedEvent": "crewai.events.types.crew_events", + "ConversationMessageAddedEvent": "crewai.events.types.flow_events", + "ConversationRouteSelectedEvent": "crewai.events.types.flow_events", "FlowCreatedEvent": "crewai.events.types.flow_events", "FlowEvent": "crewai.events.types.flow_events", "FlowFinishedEvent": "crewai.events.types.flow_events", @@ -291,6 +295,8 @@ __all__ = [ "CheckpointRestoreStartedEvent", "CheckpointStartedEvent", "CircularDependencyError", + "ConversationMessageAddedEvent", + "ConversationRouteSelectedEvent", "CrewKickoffCompletedEvent", "CrewKickoffFailedEvent", "CrewKickoffStartedEvent", diff --git a/lib/crewai/src/crewai/events/event_types.py b/lib/crewai/src/crewai/events/event_types.py index f336ce75a..dcf31cb03 100644 --- a/lib/crewai/src/crewai/events/event_types.py +++ b/lib/crewai/src/crewai/events/event_types.py @@ -53,6 +53,8 @@ from crewai.events.types.crew_events import ( CrewTrainStartedEvent, ) from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, FlowFinishedEvent, FlowStartedEvent, MethodExecutionFailedEvent, @@ -154,6 +156,8 @@ EventTypes = ( | TaskStartedEvent | TaskCompletedEvent | TaskFailedEvent + | ConversationMessageAddedEvent + | ConversationRouteSelectedEvent | FlowStartedEvent | FlowFinishedEvent | MethodExecutionStartedEvent diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index c7901fed3..c85e6202a 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -62,6 +62,8 @@ from crewai.events.types.crew_events import ( CrewKickoffStartedEvent, ) from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, FlowCreatedEvent, FlowFinishedEvent, FlowPlotEvent, @@ -255,6 +257,18 @@ class TraceCollectionListener(BaseEventListener): def on_method_failed(source: Any, event: MethodExecutionFailedEvent) -> None: self._handle_trace_event("method_execution_failed", source, event) + @event_bus.on(ConversationMessageAddedEvent) + def on_conversation_message_added( + source: Any, event: ConversationMessageAddedEvent + ) -> None: + self._handle_action_event("conversation_message_added", source, event) + + @event_bus.on(ConversationRouteSelectedEvent) + def on_conversation_route_selected( + source: Any, event: ConversationRouteSelectedEvent + ) -> None: + self._handle_action_event("conversation_route_selected", source, event) + @event_bus.on(FlowFinishedEvent) def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None: self._handle_trace_event("flow_finished", source, event) diff --git a/lib/crewai/src/crewai/events/types/flow_events.py b/lib/crewai/src/crewai/events/types/flow_events.py index c2c1e2912..5ff4d4038 100644 --- a/lib/crewai/src/crewai/events/types/flow_events.py +++ b/lib/crewai/src/crewai/events/types/flow_events.py @@ -166,6 +166,31 @@ class FlowInputReceivedEvent(FlowEvent): type: Literal["flow_input_received"] = "flow_input_received" +class ConversationMessageAddedEvent(FlowEvent): + """Event emitted when a conversational Flow records a message. + + This gives trace consumers a first-class transcript signal instead of + requiring them to inspect the full method state payload. + """ + + session_id: str + role: Literal["user", "assistant", "system", "tool"] + content: Any + message_index: int + type: Literal["conversation_message_added"] = "conversation_message_added" + + +class ConversationRouteSelectedEvent(FlowEvent): + """Event emitted when a conversational Flow selects a route for a turn.""" + + session_id: str + route: str + user_message: str | None = None + message_index: int | None = None + previous_intent: str | None = None + type: Literal["conversation_route_selected"] = "conversation_route_selected" + + class HumanFeedbackRequestedEvent(FlowEvent): """Event emitted when human feedback is requested. diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index 3d3a8d05d..3801d0570 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -24,6 +24,11 @@ from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast from pydantic import BaseModel, Field, create_model +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, +) from crewai.experimental.conversational import ( AgentMessage, ConversationConfig, @@ -122,19 +127,36 @@ class _ConversationalMixin: """Route the current turn to a listener label.""" state = cast(ConversationState, self.state) context = self.build_router_context() + previous_intent = state.last_intent configured_route = self.route_turn(context) if configured_route: state.last_intent = configured_route + self._emit_conversation_route_selected( + configured_route, + previous_intent=previous_intent, + ) return configured_route if state.last_intent: + self._emit_conversation_route_selected( + state.last_intent, + previous_intent=previous_intent, + ) return state.last_intent if self.can_answer_from_history(context): state.last_intent = "answer_from_history" + self._emit_conversation_route_selected( + "answer_from_history", + previous_intent=previous_intent, + ) return "answer_from_history" state.last_intent = "converse" + self._emit_conversation_route_selected( + "converse", + previous_intent=previous_intent, + ) return "converse" @listen("converse") @@ -406,13 +428,61 @@ class _ConversationalMixin: metadata: dict[str, Any] | None = None, ) -> None: """Append a final user-visible assistant message.""" - cast(ConversationState, self.state).messages.append( + state = cast(ConversationState, self.state) + state.messages.append( ConversationMessage( role="assistant", content=content, metadata=metadata or {}, ) ) + self._emit_conversation_message_added( + role="assistant", + content=content, + message_index=len(state.messages) - 1, + ) + + def _emit_conversation_message_added( + self, + *, + role: Literal["user", "assistant", "system", "tool"], + content: Any, + message_index: int, + ) -> None: + """Emit a compact transcript event for conversational trace views.""" + state = cast(ConversationState, self.state) + crewai_event_bus.emit( + self, + ConversationMessageAddedEvent( + type="conversation_message_added", + flow_name=self.name or self.__class__.__name__, + session_id=state.id, + role=role, + content=content, + message_index=message_index, + ), + ) + + def _emit_conversation_route_selected( + self, + route: str, + *, + previous_intent: str | None = None, + ) -> None: + """Emit the conversational routing decision for the current turn.""" + state = cast(ConversationState, self.state) + crewai_event_bus.emit( + self, + ConversationRouteSelectedEvent( + type="conversation_route_selected", + flow_name=self.name or self.__class__.__name__, + session_id=state.id, + route=route, + user_message=state.current_user_message, + message_index=(len(state.messages) - 1) if state.messages else None, + previous_intent=previous_intent, + ), + ) def append_message( self, @@ -447,6 +517,11 @@ class _ConversationalMixin: if self.conversational: state = cast(ConversationState, self.state) state.messages.append(ConversationMessage(role="user", content=text)) + self._emit_conversation_message_added( + role="user", + content=text, + message_index=len(state.messages) - 1, + ) state.current_user_message = text state.last_user_message = text if outcomes and llm is not None: diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index 80b7a84da..7dfefd3d8 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -912,6 +912,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): _pending_user_message: str | dict[str, Any] | None = PrivateAttr(default=None) _pending_intents: Sequence[str] | None = PrivateAttr(default=None) _pending_intent_llm: str | "BaseLLM" | None = PrivateAttr(default=None) + _deferred_flow_started_event_id: str | None = PrivateAttr(default=None) def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override] class _FlowGeneric(cls): # type: ignore[valid-type,misc] @@ -2201,8 +2202,59 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): if filtered_inputs: self._initialize_state(filtered_inputs) + defer_trace_finalization = self._should_defer_trace_finalization() + deferred_started_event_id = self._deferred_flow_started_event_id + should_emit_flow_started = not ( + defer_trace_finalization and deferred_started_event_id + ) + + if ( + defer_trace_finalization + and deferred_started_event_id + and get_current_parent_id() is None + ): + restore_event_scope(((deferred_started_event_id, "flow_started"),)) + elif get_current_parent_id() is None: + reset_emission_counter() + reset_last_event_id() + + if should_emit_flow_started: + # In normal flows, each kickoff owns its own flow lifecycle. + # Deferred conversational sessions are different: the first + # turn opens the flow scope and later turns reuse it until + # ``finalize_session_traces()`` emits the single finish event. + started_event = FlowStartedEvent( + type="flow_started", + flow_name=self.name or self.__class__.__name__, + inputs=inputs, + ) + future = crewai_event_bus.emit(self, started_event) + if future: + try: + await asyncio.wrap_future(future) + except Exception: + logger.warning("FlowStartedEvent handler failed", exc_info=True) + # Stash the started event id so a deferred + # ``finalize_session_traces()`` can restore the event scope + # before emitting ``FlowFinishedEvent`` (otherwise the bus + # warns "Ending event 'flow_finished' emitted with empty + # scope stack"). + if defer_trace_finalization: + object.__setattr__( + self, "_deferred_flow_started_event_id", started_event.event_id + ) + if not self.suppress_flow_events: + self._log_flow_event( + f"Flow started with ID: {self.flow_id}", color="bold magenta" + ) + + # After FlowStarted: env events must not pre-empt trace batch init + # with implicit "crew" execution_type. + get_env_context() + # Conversational hook: apply the pending user message AFTER state - # restore so it survives ``self.persistence.load_state(...)``. + # restore and AFTER flow scope initialization, so transcript events + # are parented under the current conversation trace. # ``handle_turn`` stashes the message on ``self._pending_user_message`` # before calling ``kickoff``; this drains it. if ( @@ -2211,43 +2263,6 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): ): self._apply_pending_conversational_turn() - if get_current_parent_id() is None: - reset_emission_counter() - reset_last_event_id() - - # ``FlowStartedEvent`` always fires — ``suppress_flow_events`` - # only hides the Rich console panel (and the textual log line - # below), it doesn't gate observability events. Tracing / - # downstream listeners still need to see flow_started. - started_event = FlowStartedEvent( - type="flow_started", - flow_name=self.name or self.__class__.__name__, - inputs=inputs, - ) - future = crewai_event_bus.emit(self, started_event) - if future: - try: - await asyncio.wrap_future(future) - except Exception: - logger.warning("FlowStartedEvent handler failed", exc_info=True) - # Stash the started event id so a deferred - # ``finalize_session_traces()`` can restore the event scope - # before emitting ``FlowFinishedEvent`` (otherwise the bus - # warns "Ending event 'flow_finished' emitted with empty - # scope stack"). - if self._should_defer_trace_finalization(): - object.__setattr__( - self, "_deferred_flow_started_event_id", started_event.event_id - ) - if not self.suppress_flow_events: - self._log_flow_event( - f"Flow started with ID: {self.flow_id}", color="bold magenta" - ) - - # After FlowStarted: env events must not pre-empt trace batch init - # with implicit "crew" execution_type. - get_env_context() - if inputs is not None and "id" not in inputs: self._initialize_state(inputs) diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 4ed61394e..fab03ad29 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -11,6 +11,8 @@ from pydantic import BaseModel from crewai.events.event_bus import crewai_event_bus from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, FlowStartedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, @@ -421,6 +423,56 @@ class TestConversationalFlow: assert any(message["content"] == "prior findings" for message in messages) assert any(message["content"] == "summarize findings" for message in messages) + def test_conversational_turn_emits_message_and_route_events(self) -> None: + class ResearchRoute(BaseModel): + intent: Literal["research", "converse", "end"] + + router_llm = MagicMock() + router_llm.call.return_value = ResearchRoute(intent="converse") + chat_llm = MagicMock() + chat_llm.call.return_value = "hello back" + + @ConversationConfig( + llm=chat_llm, + router=RouterConfig( + response_format=ResearchRoute, + llm=router_llm, + routes=["research"], + ), + ) + class RoutedFlow(ConversationalFlow): + @listen("research") + def run_research(self) -> str: + self.append_assistant_message("researched") + return "researched" + + messages: list[ConversationMessageAddedEvent] = [] + routes: list[ConversationRouteSelectedEvent] = [] + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(ConversationMessageAddedEvent) + def capture_message(_: Any, event: ConversationMessageAddedEvent) -> None: + messages.append(event) + + @crewai_event_bus.on(ConversationRouteSelectedEvent) + def capture_route(_: Any, event: ConversationRouteSelectedEvent) -> None: + routes.append(event) + + flow = RoutedFlow() + flow.handle_turn("just chat") + crewai_event_bus.flush() + + assert [(event.role, event.content) for event in messages] == [ + ("user", "just chat"), + ("assistant", "hello back"), + ] + assert [event.message_index for event in messages] == [0, 1] + assert len(routes) == 1 + assert routes[0].route == "converse" + assert routes[0].user_message == "just chat" + assert routes[0].session_id == messages[0].session_id + def test_builtin_end_marks_conversation_ended(self) -> None: class ResearchRoute(BaseModel): intent: Literal["research", "converse", "end"] @@ -969,6 +1021,72 @@ class TestConversationalFlow: "defer_trace_finalization=True must skip per-turn finalize" ) + def test_deferred_conversation_emits_one_flow_started(self) -> None: + """Deferred conversational sessions emit one flow_started for the session.""" + from crewai.events.types.flow_events import FlowStartedEvent + + @ConversationConfig(defer_trace_finalization=True) + class DeferredFlow(ConversationalFlow): + def route_turn(self, context: dict[str, Any]) -> str | None: + return "work" + + @listen("work") + def do_work(self) -> str: + self.append_assistant_message("worked") + return "worked" + + flow = DeferredFlow() + observed_events: list[str] = [] + started_events: list[FlowStartedEvent] = [] + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(FlowStartedEvent) + def capture(_: Any, event: FlowStartedEvent) -> None: + observed_events.append(event.type) + started_events.append(event) + + @crewai_event_bus.on(ConversationMessageAddedEvent) + def capture_message( + _: Any, event: ConversationMessageAddedEvent + ) -> None: + if event.role == "user": + observed_events.append(event.type) + + flow.handle_turn("turn 1") + flow.handle_turn("turn 2") + flow.handle_turn("turn 3") + crewai_event_bus.flush() + + assert len(started_events) == 1, ( + "deferred conversational traces should emit one session-level " + "flow_started event, not one per turn" + ) + assert observed_events[0] == "flow_started" + assert observed_events[1] == "conversation_message_added" + + def test_route_event_uses_no_message_index_for_empty_transcript(self) -> None: + """Route events do not reference index zero when no message exists.""" + + @ConversationConfig() + class DeferredFlow(ConversationalFlow): + pass + + flow = DeferredFlow() + route_events: list[ConversationRouteSelectedEvent] = [] + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(ConversationRouteSelectedEvent) + def capture(_: Any, event: ConversationRouteSelectedEvent) -> None: + route_events.append(event) + + flow._emit_conversation_route_selected("converse") + crewai_event_bus.flush() + + assert len(route_events) == 1 + assert route_events[0].message_index is None + def test_finalize_session_traces_emits_finished_and_finalizes_batch(self) -> None: """``finalize_session_traces()`` emits one ``FlowFinishedEvent`` + one ``finalize_batch``.