diff --git a/lib/crewai/src/crewai/events/__init__.py b/lib/crewai/src/crewai/events/__init__.py index 4d9d836c3..b026c451d 100644 --- a/lib/crewai/src/crewai/events/__init__.py +++ b/lib/crewai/src/crewai/events/__init__.py @@ -61,6 +61,8 @@ if TYPE_CHECKING: CrewTrainStartedEvent, ) from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, FlowCreatedEvent, FlowEvent, FlowFinishedEvent, @@ -176,6 +178,8 @@ _LAZY_EVENT_MAPPING: dict[str, str] = { "CrewTrainCompletedEvent": "crewai.events.types.crew_events", "CrewTrainFailedEvent": "crewai.events.types.crew_events", "CrewTrainStartedEvent": "crewai.events.types.crew_events", + "ConversationMessageAddedEvent": "crewai.events.types.flow_events", + "ConversationRouteSelectedEvent": "crewai.events.types.flow_events", "FlowCreatedEvent": "crewai.events.types.flow_events", "FlowEvent": "crewai.events.types.flow_events", "FlowFinishedEvent": "crewai.events.types.flow_events", @@ -291,6 +295,8 @@ __all__ = [ "CheckpointRestoreStartedEvent", "CheckpointStartedEvent", "CircularDependencyError", + "ConversationMessageAddedEvent", + "ConversationRouteSelectedEvent", "CrewKickoffCompletedEvent", "CrewKickoffFailedEvent", "CrewKickoffStartedEvent", diff --git a/lib/crewai/src/crewai/events/event_types.py b/lib/crewai/src/crewai/events/event_types.py index f336ce75a..dcf31cb03 100644 --- a/lib/crewai/src/crewai/events/event_types.py +++ b/lib/crewai/src/crewai/events/event_types.py @@ -53,6 +53,8 @@ from crewai.events.types.crew_events import ( CrewTrainStartedEvent, ) from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, FlowFinishedEvent, FlowStartedEvent, MethodExecutionFailedEvent, @@ -154,6 +156,8 @@ EventTypes = ( | TaskStartedEvent | TaskCompletedEvent | TaskFailedEvent + | ConversationMessageAddedEvent + | ConversationRouteSelectedEvent | FlowStartedEvent | FlowFinishedEvent | MethodExecutionStartedEvent diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index c7901fed3..5bcafb427 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -62,6 +62,8 @@ from crewai.events.types.crew_events import ( CrewKickoffStartedEvent, ) from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, FlowCreatedEvent, FlowFinishedEvent, FlowPlotEvent, @@ -255,6 +257,18 @@ class TraceCollectionListener(BaseEventListener): def on_method_failed(source: Any, event: MethodExecutionFailedEvent) -> None: self._handle_trace_event("method_execution_failed", source, event) + @event_bus.on(ConversationMessageAddedEvent) + def on_conversation_message_added( + source: Any, event: ConversationMessageAddedEvent + ) -> None: + self._handle_trace_event("conversation_message_added", source, event) + + @event_bus.on(ConversationRouteSelectedEvent) + def on_conversation_route_selected( + source: Any, event: ConversationRouteSelectedEvent + ) -> None: + self._handle_trace_event("conversation_route_selected", source, event) + @event_bus.on(FlowFinishedEvent) def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None: self._handle_trace_event("flow_finished", source, event) diff --git a/lib/crewai/src/crewai/events/types/flow_events.py b/lib/crewai/src/crewai/events/types/flow_events.py index c2c1e2912..5ff4d4038 100644 --- a/lib/crewai/src/crewai/events/types/flow_events.py +++ b/lib/crewai/src/crewai/events/types/flow_events.py @@ -166,6 +166,31 @@ class FlowInputReceivedEvent(FlowEvent): type: Literal["flow_input_received"] = "flow_input_received" +class ConversationMessageAddedEvent(FlowEvent): + """Event emitted when a conversational Flow records a message. + + This gives trace consumers a first-class transcript signal instead of + requiring them to inspect the full method state payload. + """ + + session_id: str + role: Literal["user", "assistant", "system", "tool"] + content: Any + message_index: int + type: Literal["conversation_message_added"] = "conversation_message_added" + + +class ConversationRouteSelectedEvent(FlowEvent): + """Event emitted when a conversational Flow selects a route for a turn.""" + + session_id: str + route: str + user_message: str | None = None + message_index: int | None = None + previous_intent: str | None = None + type: Literal["conversation_route_selected"] = "conversation_route_selected" + + class HumanFeedbackRequestedEvent(FlowEvent): """Event emitted when human feedback is requested. diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index a66c5bc68..0b46a96d1 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -24,6 +24,11 @@ from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast from pydantic import BaseModel, Field, create_model +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, +) from crewai.experimental.conversational import ( AgentMessage, ConversationConfig, @@ -122,19 +127,36 @@ class _ConversationalMixin: """Route the current turn to a listener label.""" state = cast(ConversationState, self.state) context = self.build_router_context() + previous_intent = state.last_intent configured_route = self.route_turn(context) if configured_route: state.last_intent = configured_route + self._emit_conversation_route_selected( + configured_route, + previous_intent=previous_intent, + ) return configured_route if state.last_intent: + self._emit_conversation_route_selected( + state.last_intent, + previous_intent=previous_intent, + ) return state.last_intent if self.can_answer_from_history(context): state.last_intent = "answer_from_history" + self._emit_conversation_route_selected( + "answer_from_history", + previous_intent=previous_intent, + ) return "answer_from_history" state.last_intent = "converse" + self._emit_conversation_route_selected( + "converse", + previous_intent=previous_intent, + ) return "converse" @listen("converse") @@ -353,13 +375,61 @@ class _ConversationalMixin: metadata: dict[str, Any] | None = None, ) -> None: """Append a final user-visible assistant message.""" - cast(ConversationState, self.state).messages.append( + state = cast(ConversationState, self.state) + state.messages.append( ConversationMessage( role="assistant", content=content, metadata=metadata or {}, ) ) + self._emit_conversation_message_added( + role="assistant", + content=content, + message_index=len(state.messages) - 1, + ) + + def _emit_conversation_message_added( + self, + *, + role: Literal["user", "assistant", "system", "tool"], + content: Any, + message_index: int, + ) -> None: + """Emit a compact transcript event for conversational trace views.""" + state = cast(ConversationState, self.state) + crewai_event_bus.emit( + self, + ConversationMessageAddedEvent( + type="conversation_message_added", + flow_name=self.name or self.__class__.__name__, + session_id=state.id, + role=role, + content=content, + message_index=message_index, + ), + ) + + def _emit_conversation_route_selected( + self, + route: str, + *, + previous_intent: str | None = None, + ) -> None: + """Emit the conversational routing decision for the current turn.""" + state = cast(ConversationState, self.state) + crewai_event_bus.emit( + self, + ConversationRouteSelectedEvent( + type="conversation_route_selected", + flow_name=self.name or self.__class__.__name__, + session_id=state.id, + route=route, + user_message=state.current_user_message, + message_index=max(len(state.messages) - 1, 0), + previous_intent=previous_intent, + ), + ) def append_message( self, @@ -394,6 +464,11 @@ class _ConversationalMixin: if self.conversational: state = cast(ConversationState, self.state) state.messages.append(ConversationMessage(role="user", content=text)) + self._emit_conversation_message_added( + role="user", + content=text, + message_index=len(state.messages) - 1, + ) state.current_user_message = text state.last_user_message = text if outcomes and llm is not None: diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 77567fe5d..0525ac439 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -11,6 +11,8 @@ from pydantic import BaseModel from crewai.events.event_bus import crewai_event_bus from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener from crewai.events.types.flow_events import ( + ConversationMessageAddedEvent, + ConversationRouteSelectedEvent, FlowStartedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, @@ -421,6 +423,56 @@ class TestConversationalFlow: assert any(message["content"] == "prior findings" for message in messages) assert any(message["content"] == "summarize findings" for message in messages) + def test_conversational_turn_emits_message_and_route_events(self) -> None: + class ResearchRoute(BaseModel): + intent: Literal["research", "converse", "end"] + + router_llm = MagicMock() + router_llm.call.return_value = ResearchRoute(intent="converse") + chat_llm = MagicMock() + chat_llm.call.return_value = "hello back" + + @ConversationConfig( + llm=chat_llm, + router=RouterConfig( + response_format=ResearchRoute, + llm=router_llm, + routes=["research"], + ), + ) + class RoutedFlow(ConversationalFlow): + @listen("research") + def run_research(self) -> str: + self.append_assistant_message("researched") + return "researched" + + messages: list[ConversationMessageAddedEvent] = [] + routes: list[ConversationRouteSelectedEvent] = [] + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(ConversationMessageAddedEvent) + def capture_message(_: Any, event: ConversationMessageAddedEvent) -> None: + messages.append(event) + + @crewai_event_bus.on(ConversationRouteSelectedEvent) + def capture_route(_: Any, event: ConversationRouteSelectedEvent) -> None: + routes.append(event) + + flow = RoutedFlow() + flow.handle_turn("just chat") + crewai_event_bus.flush() + + assert [(event.role, event.content) for event in messages] == [ + ("user", "just chat"), + ("assistant", "hello back"), + ] + assert [event.message_index for event in messages] == [0, 1] + assert len(routes) == 1 + assert routes[0].route == "converse" + assert routes[0].user_message == "just chat" + assert routes[0].session_id == messages[0].session_id + def test_builtin_end_marks_conversation_ended(self) -> None: class ResearchRoute(BaseModel): intent: Literal["research", "converse", "end"]