Track conversational flow turn usage in telemetry (#6324)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled

* Track conversational flow turn usage in telemetry

* adjusted name to flow:conversation_turn

* only mark on turn completed event

* ensure tui also emits these events
This commit is contained in:
Lorenze Jay
2026-06-25 11:02:07 -07:00
committed by GitHub
parent 654abcb40d
commit 9b31226494
8 changed files with 286 additions and 12 deletions

View File

@@ -65,8 +65,12 @@ def _load_conversational_flow_from_kickoff_script() -> Any | None:
def _run_conversational_flow_tui(flow: Any) -> Any:
from crewai.events.event_listener import EventListener
from crewai_cli.crew_run_tui import CrewRunApp
EventListener() # ensures we get events from the TUI
app = CrewRunApp(
crew_name=getattr(flow, "name", None) or type(flow).__name__,
conversational=True,

View File

@@ -61,3 +61,40 @@ def test_kickoff_flow_falls_back_to_uv_when_no_conversational_flow(
kickoff_flow.kickoff_flow()
assert calls == [["uv", "run", "kickoff"]]
def test_run_conversational_flow_tui_initializes_event_listener(monkeypatch) -> None:
calls: list[str] = []
class FakeEventListener:
def __init__(self) -> None:
calls.append("listener")
class FakeCrewRunApp:
def __init__(self, *, crew_name: str, conversational: bool) -> None:
calls.append("app")
self.crew_name = crew_name
self.conversational = conversational
self._status = "completed"
self._crew_result = "done"
self._flow = None
def run(self) -> None:
calls.append("run")
class DemoFlow:
name = "Demo"
monkeypatch.setattr(
"crewai.events.event_listener.EventListener",
FakeEventListener,
)
monkeypatch.setattr(
"crewai_cli.crew_run_tui.CrewRunApp",
FakeCrewRunApp,
)
result = kickoff_flow._run_conversational_flow_tui(DemoFlow())
assert result == "done"
assert calls == ["listener", "app", "run"]

View File

@@ -63,6 +63,9 @@ if TYPE_CHECKING:
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
ConversationTurnCompletedEvent,
ConversationTurnFailedEvent,
ConversationTurnStartedEvent,
FlowCreatedEvent,
FlowEvent,
FlowFinishedEvent,
@@ -185,6 +188,9 @@ _LAZY_EVENT_MAPPING: dict[str, str] = {
"CrewTrainStartedEvent": "crewai.events.types.crew_events",
"ConversationMessageAddedEvent": "crewai.events.types.flow_events",
"ConversationRouteSelectedEvent": "crewai.events.types.flow_events",
"ConversationTurnCompletedEvent": "crewai.events.types.flow_events",
"ConversationTurnFailedEvent": "crewai.events.types.flow_events",
"ConversationTurnStartedEvent": "crewai.events.types.flow_events",
"FlowCreatedEvent": "crewai.events.types.flow_events",
"FlowEvent": "crewai.events.types.flow_events",
"FlowFinishedEvent": "crewai.events.types.flow_events",
@@ -305,6 +311,9 @@ __all__ = [
"CircularDependencyError",
"ConversationMessageAddedEvent",
"ConversationRouteSelectedEvent",
"ConversationTurnCompletedEvent",
"ConversationTurnFailedEvent",
"ConversationTurnStartedEvent",
"CrewKickoffCompletedEvent",
"CrewKickoffFailedEvent",
"CrewKickoffStartedEvent",

View File

@@ -41,6 +41,7 @@ from crewai.events.types.env_events import (
DefaultEnvEvent,
)
from crewai.events.types.flow_events import (
ConversationTurnCompletedEvent,
FlowCreatedEvent,
FlowFinishedEvent,
FlowPausedEvent,
@@ -317,6 +318,12 @@ class EventListener(BaseEventListener):
source.flow_id,
)
@crewai_event_bus.on(ConversationTurnCompletedEvent)
def on_conversation_turn_completed(
_: Any, event: ConversationTurnCompletedEvent
) -> None:
self._telemetry.feature_usage_span("flow:conversation_turn")
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(
source: Any, event: MethodExecutionStartedEvent

View File

@@ -55,6 +55,9 @@ from crewai.events.types.crew_events import (
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
ConversationTurnCompletedEvent,
ConversationTurnFailedEvent,
ConversationTurnStartedEvent,
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
@@ -162,6 +165,9 @@ EventTypes = (
| TaskFailedEvent
| ConversationMessageAddedEvent
| ConversationRouteSelectedEvent
| ConversationTurnCompletedEvent
| ConversationTurnFailedEvent
| ConversationTurnStartedEvent
| FlowStartedEvent
| FlowFinishedEvent
| MethodExecutionStartedEvent

View File

@@ -184,6 +184,34 @@ class ConversationMessageAddedEvent(FlowEvent):
type: Literal["conversation_message_added"] = "conversation_message_added"
class ConversationTurnStartedEvent(FlowEvent):
"""Event emitted when a conversational Flow starts a user turn."""
session_id: str
type: Literal["conversation_turn_started"] = "conversation_turn_started"
class ConversationTurnCompletedEvent(FlowEvent):
"""Event emitted when a conversational Flow completes a user turn."""
session_id: str
type: Literal["conversation_turn_completed"] = "conversation_turn_completed"
class ConversationTurnFailedEvent(FlowEvent):
"""Event emitted when a conversational Flow turn fails."""
session_id: str
error: Exception
type: Literal["conversation_turn_failed"] = "conversation_turn_failed"
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_serializer("error")
def _serialize_error(self, error: Exception) -> str:
return str(error)
class ConversationRouteSelectedEvent(FlowEvent):
"""Event emitted when a conversational Flow selects a route for a turn."""

View File

@@ -30,6 +30,9 @@ from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
ConversationTurnCompletedEvent,
ConversationTurnFailedEvent,
ConversationTurnStartedEvent,
)
from crewai.experimental.conversational import (
AgentMessage,
@@ -280,6 +283,14 @@ class _ConversationalMixin:
"""
state = cast(ConversationState, self.state)
sid = session_id or state.id
crewai_event_bus.emit(
self,
ConversationTurnStartedEvent(
type="conversation_turn_started",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
),
)
# Stash the pending turn so the kickoff extension hook picks it up
# after persist restore.
@@ -287,27 +298,62 @@ class _ConversationalMixin:
self._pending_intents = list(intents) if intents else None
self._pending_intent_llm = intent_llm
# Each turn is a fresh execution; clear graph tracking so the second
# turn re-runs instead of being treated as a checkpoint restore.
if "from_checkpoint" not in kickoff_kwargs:
self._reset_turn_execution_state()
assistant_count = self._assistant_message_count()
failed_event: ConversationTurnFailedEvent | None = None
try:
# Each turn is a fresh execution; clear graph tracking so the second
# turn re-runs instead of being treated as a checkpoint restore.
if "from_checkpoint" not in kickoff_kwargs:
self._reset_turn_execution_state()
assistant_count = self._assistant_message_count()
result = self.kickoff(inputs={"id": sid}, **kickoff_kwargs)
if (
result is not None
and self._assistant_message_count() == assistant_count
and self._is_public_turn_result(result)
):
self.append_assistant_message(self._stringify_result(result))
except Exception as exc:
failed_event = ConversationTurnFailedEvent(
type="conversation_turn_failed",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
error=exc,
)
raise
finally:
self._pending_user_message = None
self._pending_intents = None
self._pending_intent_llm = None
if failed_event is not None:
self._emit_terminal_conversation_turn_event(failed_event)
if (
result is not None
and self._assistant_message_count() == assistant_count
and self._is_public_turn_result(result)
):
self.append_assistant_message(self._stringify_result(result))
self._emit_terminal_conversation_turn_event(
ConversationTurnCompletedEvent(
type="conversation_turn_completed",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
),
)
return result
def _emit_terminal_conversation_turn_event(
self,
event: ConversationTurnCompletedEvent | ConversationTurnFailedEvent,
) -> None:
"""Emit a terminal turn event and wait for its own handlers."""
future = crewai_event_bus.emit(self, event)
if future is None:
return
try:
future.result(timeout=30)
except Exception:
logger.warning(
"%s handler failed or timed out",
event.__class__.__name__,
exc_info=True,
)
def chat(
self,
*,

View File

@@ -14,6 +14,9 @@ from crewai.events.listeners.tracing.trace_listener import TraceCollectionListen
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
ConversationTurnCompletedEvent,
ConversationTurnFailedEvent,
ConversationTurnStartedEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
@@ -1123,6 +1126,140 @@ class TestConversationalFlow:
assert observed_events[0] == "flow_started"
assert observed_events[1] == "conversation_message_added"
def test_handle_turn_emits_started_and_completed_for_each_conversational_turn(
self,
) -> None:
"""Each ``handle_turn()`` emits paired turn lifecycle events."""
@ConversationConfig(defer_trace_finalization=True)
class DeferredFlow(ConversationalFlow):
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> str:
self.append_assistant_message("worked")
return "worked"
flow = DeferredFlow()
default_session_id = flow.state.id
turn_events: list[
ConversationTurnStartedEvent | ConversationTurnCompletedEvent
] = []
original_emit = crewai_event_bus.emit
def capture_emit(source: Any, event: Any) -> Any:
if isinstance(
event, (ConversationTurnStartedEvent, ConversationTurnCompletedEvent)
):
turn_events.append(event)
return original_emit(source, event)
with patch.object(crewai_event_bus, "emit", side_effect=capture_emit):
flow.handle_turn("turn 1")
flow.handle_turn("turn 2", session_id="custom-session")
crewai_event_bus.flush()
assert [event.type for event in turn_events] == [
"conversation_turn_started",
"conversation_turn_completed",
"conversation_turn_started",
"conversation_turn_completed",
]
assert turn_events[0].session_id == default_session_id
assert turn_events[1].session_id == default_session_id
assert turn_events[2].session_id == "custom-session"
assert turn_events[3].session_id == "custom-session"
def test_handle_turn_emits_failed_instead_of_completed_when_turn_raises(
self,
) -> None:
"""Failed turns emit a terminal failure event without completion."""
@ConversationConfig(defer_trace_finalization=True)
class FailingFlow(ConversationalFlow):
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> str:
raise RuntimeError("turn exploded")
flow = FailingFlow()
turn_events: list[
ConversationTurnStartedEvent
| ConversationTurnCompletedEvent
| ConversationTurnFailedEvent
] = []
handled_failed_events: list[ConversationTurnFailedEvent] = []
original_emit = crewai_event_bus.emit
def capture_emit(source: Any, event: Any) -> Any:
if isinstance(
event,
(
ConversationTurnStartedEvent,
ConversationTurnCompletedEvent,
ConversationTurnFailedEvent,
),
):
turn_events.append(event)
return original_emit(source, event)
with (
crewai_event_bus.scoped_handlers(),
patch.object(crewai_event_bus, "emit", side_effect=capture_emit),
):
@crewai_event_bus.on(ConversationTurnFailedEvent)
def capture_failed(
_: Any, event: ConversationTurnFailedEvent
) -> None:
handled_failed_events.append(event)
with pytest.raises(RuntimeError, match="turn exploded"):
flow.handle_turn("turn 1")
assert [event.type for event in turn_events] == [
"conversation_turn_started",
"conversation_turn_failed",
]
assert turn_events[0].session_id == flow.state.id
failed_event = turn_events[1]
assert isinstance(failed_event, ConversationTurnFailedEvent)
assert failed_event.session_id == flow.state.id
assert str(failed_event.error) == "turn exploded"
assert handled_failed_events == [failed_event]
def test_conversation_turn_completed_tracks_feature_usage(self) -> None:
"""Completed conversation turns count conversational Flow usage."""
from crewai.events.event_listener import event_listener
@ConversationConfig(defer_trace_finalization=True)
class DeferredFlow(ConversationalFlow):
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> str:
self.append_assistant_message("worked")
return "worked"
flow = DeferredFlow()
with (
crewai_event_bus.scoped_handlers(),
patch.object(
event_listener._telemetry,
"feature_usage_span",
) as feature_usage_span,
):
event_listener.setup_listeners(crewai_event_bus)
flow.handle_turn("turn 1")
feature_usage_span.assert_any_call("flow:conversation_turn")
def test_route_event_uses_no_message_index_for_empty_transcript(self) -> None:
"""Route events do not reference index zero when no message exists."""