From e183edd60c9ed764cf06b88e6c242e9942a4699c Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Wed, 24 Jun 2026 12:02:23 -0700 Subject: [PATCH] Track conversational flow turn usage in telemetry --- lib/crewai/src/crewai/events/__init__.py | 3 + .../src/crewai/events/event_listener.py | 7 +++ lib/crewai/src/crewai/events/event_types.py | 2 + .../src/crewai/events/types/flow_events.py | 7 +++ .../experimental/conversational_mixin.py | 9 +++ lib/crewai/tests/test_flow_conversation.py | 61 +++++++++++++++++++ 6 files changed, 89 insertions(+) diff --git a/lib/crewai/src/crewai/events/__init__.py b/lib/crewai/src/crewai/events/__init__.py index ce4a01a22..0b7b944ad 100644 --- a/lib/crewai/src/crewai/events/__init__.py +++ b/lib/crewai/src/crewai/events/__init__.py @@ -63,6 +63,7 @@ if TYPE_CHECKING: from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnStartedEvent, FlowCreatedEvent, FlowEvent, FlowFinishedEvent, @@ -185,6 +186,7 @@ _LAZY_EVENT_MAPPING: dict[str, str] = { "CrewTrainStartedEvent": "crewai.events.types.crew_events", "ConversationMessageAddedEvent": "crewai.events.types.flow_events", "ConversationRouteSelectedEvent": "crewai.events.types.flow_events", + "ConversationTurnStartedEvent": "crewai.events.types.flow_events", "FlowCreatedEvent": "crewai.events.types.flow_events", "FlowEvent": "crewai.events.types.flow_events", "FlowFinishedEvent": "crewai.events.types.flow_events", @@ -305,6 +307,7 @@ __all__ = [ "CircularDependencyError", "ConversationMessageAddedEvent", "ConversationRouteSelectedEvent", + "ConversationTurnStartedEvent", "CrewKickoffCompletedEvent", "CrewKickoffFailedEvent", "CrewKickoffStartedEvent", diff --git a/lib/crewai/src/crewai/events/event_listener.py b/lib/crewai/src/crewai/events/event_listener.py index 883147478..6af504a9f 100644 --- a/lib/crewai/src/crewai/events/event_listener.py +++ b/lib/crewai/src/crewai/events/event_listener.py @@ -41,6 +41,7 @@ from crewai.events.types.env_events import ( DefaultEnvEvent, ) from crewai.events.types.flow_events import ( + ConversationTurnStartedEvent, FlowCreatedEvent, FlowFinishedEvent, FlowPausedEvent, @@ -317,6 +318,12 @@ class EventListener(BaseEventListener): source.flow_id, ) + @crewai_event_bus.on(ConversationTurnStartedEvent) + def on_conversation_turn_started( + _: Any, event: ConversationTurnStartedEvent + ) -> None: + self._telemetry.feature_usage_span("flow:conversation") + @crewai_event_bus.on(MethodExecutionStartedEvent) def on_method_execution_started( source: Any, event: MethodExecutionStartedEvent diff --git a/lib/crewai/src/crewai/events/event_types.py b/lib/crewai/src/crewai/events/event_types.py index f78278d50..2b5d92d7e 100644 --- a/lib/crewai/src/crewai/events/event_types.py +++ b/lib/crewai/src/crewai/events/event_types.py @@ -55,6 +55,7 @@ from crewai.events.types.crew_events import ( from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnStartedEvent, FlowFinishedEvent, FlowStartedEvent, MethodExecutionFailedEvent, @@ -162,6 +163,7 @@ EventTypes = ( | TaskFailedEvent | ConversationMessageAddedEvent | ConversationRouteSelectedEvent + | ConversationTurnStartedEvent | FlowStartedEvent | FlowFinishedEvent | MethodExecutionStartedEvent diff --git a/lib/crewai/src/crewai/events/types/flow_events.py b/lib/crewai/src/crewai/events/types/flow_events.py index 2af20c979..0a151649a 100644 --- a/lib/crewai/src/crewai/events/types/flow_events.py +++ b/lib/crewai/src/crewai/events/types/flow_events.py @@ -184,6 +184,13 @@ class ConversationMessageAddedEvent(FlowEvent): type: Literal["conversation_message_added"] = "conversation_message_added" +class ConversationTurnStartedEvent(FlowEvent): + """Event emitted when a conversational Flow starts a user turn.""" + + session_id: str + type: Literal["conversation_turn_started"] = "conversation_turn_started" + + class ConversationRouteSelectedEvent(FlowEvent): """Event emitted when a conversational Flow selects a route for a turn.""" diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index 4f39565c0..c3349d678 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -30,6 +30,7 @@ from crewai.events.event_bus import crewai_event_bus from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnStartedEvent, ) from crewai.experimental.conversational import ( AgentMessage, @@ -280,6 +281,14 @@ class _ConversationalMixin: """ state = cast(ConversationState, self.state) sid = session_id or state.id + crewai_event_bus.emit( + self, + ConversationTurnStartedEvent( + type="conversation_turn_started", + flow_name=self.name or self.__class__.__name__, + session_id=sid, + ), + ) # Stash the pending turn so the kickoff extension hook picks it up # after persist restore. diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 3fea6b471..08a339806 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -14,6 +14,7 @@ from crewai.events.listeners.tracing.trace_listener import TraceCollectionListen from crewai.events.types.flow_events import ( ConversationMessageAddedEvent, ConversationRouteSelectedEvent, + ConversationTurnStartedEvent, FlowStartedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, @@ -1125,6 +1126,66 @@ class TestConversationalFlow: assert observed_events[0] == "flow_started" assert observed_events[1] == "conversation_message_added" + def test_handle_turn_emits_turn_started_for_each_conversational_turn( + self, + ) -> None: + """Each ``handle_turn()`` emits a usage-friendly turn start event.""" + + @ConversationConfig(defer_trace_finalization=True) + class DeferredFlow(ConversationalFlow): + def route_turn(self, context: dict[str, Any]) -> str | None: + return "work" + + @listen("work") + def do_work(self) -> str: + self.append_assistant_message("worked") + return "worked" + + flow = DeferredFlow() + default_session_id = flow.state.id + turn_events: list[ConversationTurnStartedEvent] = [] + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(ConversationTurnStartedEvent) + def capture(_: Any, event: ConversationTurnStartedEvent) -> None: + turn_events.append(event) + + flow.handle_turn("turn 1") + flow.handle_turn("turn 2", session_id="custom-session") + crewai_event_bus.flush() + + assert [event.type for event in turn_events] == [ + "conversation_turn_started", + "conversation_turn_started", + ] + assert turn_events[0].session_id == default_session_id + assert turn_events[1].session_id == "custom-session" + + def test_conversation_turn_started_tracks_feature_usage(self) -> None: + """Conversation turn events count conversational Flow usage.""" + from crewai.events.event_listener import event_listener + + with ( + crewai_event_bus.scoped_handlers(), + patch.object( + event_listener._telemetry, + "feature_usage_span", + ) as feature_usage_span, + ): + event_listener.setup_listeners(crewai_event_bus) + crewai_event_bus.emit( + self, + ConversationTurnStartedEvent( + type="conversation_turn_started", + flow_name="ChatFlow", + session_id="session-1", + ), + ) + crewai_event_bus.flush() + + feature_usage_span.assert_any_call("flow:conversation") + def test_route_event_uses_no_message_index_for_empty_transcript(self) -> None: """Route events do not reference index zero when no message exists."""