Track conversational flow turn usage in telemetry

This commit is contained in:
lorenzejay
2026-06-24 12:02:23 -07:00
parent f2a074e35b
commit e183edd60c
6 changed files with 89 additions and 0 deletions

View File

@@ -63,6 +63,7 @@ if TYPE_CHECKING:
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
ConversationTurnStartedEvent,
FlowCreatedEvent,
FlowEvent,
FlowFinishedEvent,
@@ -185,6 +186,7 @@ _LAZY_EVENT_MAPPING: dict[str, str] = {
"CrewTrainStartedEvent": "crewai.events.types.crew_events",
"ConversationMessageAddedEvent": "crewai.events.types.flow_events",
"ConversationRouteSelectedEvent": "crewai.events.types.flow_events",
"ConversationTurnStartedEvent": "crewai.events.types.flow_events",
"FlowCreatedEvent": "crewai.events.types.flow_events",
"FlowEvent": "crewai.events.types.flow_events",
"FlowFinishedEvent": "crewai.events.types.flow_events",
@@ -305,6 +307,7 @@ __all__ = [
"CircularDependencyError",
"ConversationMessageAddedEvent",
"ConversationRouteSelectedEvent",
"ConversationTurnStartedEvent",
"CrewKickoffCompletedEvent",
"CrewKickoffFailedEvent",
"CrewKickoffStartedEvent",

View File

@@ -41,6 +41,7 @@ from crewai.events.types.env_events import (
DefaultEnvEvent,
)
from crewai.events.types.flow_events import (
ConversationTurnStartedEvent,
FlowCreatedEvent,
FlowFinishedEvent,
FlowPausedEvent,
@@ -317,6 +318,12 @@ class EventListener(BaseEventListener):
source.flow_id,
)
@crewai_event_bus.on(ConversationTurnStartedEvent)
def on_conversation_turn_started(
_: Any, event: ConversationTurnStartedEvent
) -> None:
self._telemetry.feature_usage_span("flow:conversation")
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(
source: Any, event: MethodExecutionStartedEvent

View File

@@ -55,6 +55,7 @@ from crewai.events.types.crew_events import (
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
ConversationTurnStartedEvent,
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
@@ -162,6 +163,7 @@ EventTypes = (
| TaskFailedEvent
| ConversationMessageAddedEvent
| ConversationRouteSelectedEvent
| ConversationTurnStartedEvent
| FlowStartedEvent
| FlowFinishedEvent
| MethodExecutionStartedEvent

View File

@@ -184,6 +184,13 @@ class ConversationMessageAddedEvent(FlowEvent):
type: Literal["conversation_message_added"] = "conversation_message_added"
class ConversationTurnStartedEvent(FlowEvent):
"""Event emitted when a conversational Flow starts a user turn."""
session_id: str
type: Literal["conversation_turn_started"] = "conversation_turn_started"
class ConversationRouteSelectedEvent(FlowEvent):
"""Event emitted when a conversational Flow selects a route for a turn."""

View File

@@ -30,6 +30,7 @@ from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
ConversationTurnStartedEvent,
)
from crewai.experimental.conversational import (
AgentMessage,
@@ -280,6 +281,14 @@ class _ConversationalMixin:
"""
state = cast(ConversationState, self.state)
sid = session_id or state.id
crewai_event_bus.emit(
self,
ConversationTurnStartedEvent(
type="conversation_turn_started",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
),
)
# Stash the pending turn so the kickoff extension hook picks it up
# after persist restore.

View File

@@ -14,6 +14,7 @@ from crewai.events.listeners.tracing.trace_listener import TraceCollectionListen
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
ConversationTurnStartedEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
@@ -1125,6 +1126,66 @@ class TestConversationalFlow:
assert observed_events[0] == "flow_started"
assert observed_events[1] == "conversation_message_added"
def test_handle_turn_emits_turn_started_for_each_conversational_turn(
self,
) -> None:
"""Each ``handle_turn()`` emits a usage-friendly turn start event."""
@ConversationConfig(defer_trace_finalization=True)
class DeferredFlow(ConversationalFlow):
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> str:
self.append_assistant_message("worked")
return "worked"
flow = DeferredFlow()
default_session_id = flow.state.id
turn_events: list[ConversationTurnStartedEvent] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(ConversationTurnStartedEvent)
def capture(_: Any, event: ConversationTurnStartedEvent) -> None:
turn_events.append(event)
flow.handle_turn("turn 1")
flow.handle_turn("turn 2", session_id="custom-session")
crewai_event_bus.flush()
assert [event.type for event in turn_events] == [
"conversation_turn_started",
"conversation_turn_started",
]
assert turn_events[0].session_id == default_session_id
assert turn_events[1].session_id == "custom-session"
def test_conversation_turn_started_tracks_feature_usage(self) -> None:
"""Conversation turn events count conversational Flow usage."""
from crewai.events.event_listener import event_listener
with (
crewai_event_bus.scoped_handlers(),
patch.object(
event_listener._telemetry,
"feature_usage_span",
) as feature_usage_span,
):
event_listener.setup_listeners(crewai_event_bus)
crewai_event_bus.emit(
self,
ConversationTurnStartedEvent(
type="conversation_turn_started",
flow_name="ChatFlow",
session_id="session-1",
),
)
crewai_event_bus.flush()
feature_usage_span.assert_any_call("flow:conversation")
def test_route_event_uses_no_message_index_for_empty_transcript(self) -> None:
"""Route events do not reference index zero when no message exists."""