Lorenze/imp/conversational flow traces (#6044)

* feat: add conversation message and route selection events

- Introduced `ConversationMessageAddedEvent` and `ConversationRouteSelectedEvent` to enhance conversational flow tracking.
- Updated event listeners to emit these events during message handling and routing decisions.
- Enhanced the `_ConversationalMixin` class to emit events for user and assistant messages, as well as selected routes.
- Added tests to verify the correct emission of these events during conversational turns.

* ensure flow started events only emiited once

* refactor(tracing): rename trace event handler methods to action event handlers

Updated the  class to replace  with  for  and  events, improving clarity in event handling.

Additionally, adjusted comments in the  class to clarify the application of pending user messages in relation to state restoration and flow scope initialization.

* fix(conversational_mixin): handle empty message index in route events

Updated the message index handling in the  class to return  when there are no messages. Added tests to ensure that route events do not reference index zero when the transcript is empty, and verified the correct emission of conversation message events during flow handling.
This commit is contained in:
Lorenze Jay
2026-06-05 14:10:19 -07:00
committed by GitHub
parent 3723f0db76
commit 8cd51fc67e
7 changed files with 296 additions and 39 deletions

View File

@@ -61,6 +61,8 @@ if TYPE_CHECKING:
CrewTrainStartedEvent,
)
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
FlowCreatedEvent,
FlowEvent,
FlowFinishedEvent,
@@ -176,6 +178,8 @@ _LAZY_EVENT_MAPPING: dict[str, str] = {
"CrewTrainCompletedEvent": "crewai.events.types.crew_events",
"CrewTrainFailedEvent": "crewai.events.types.crew_events",
"CrewTrainStartedEvent": "crewai.events.types.crew_events",
"ConversationMessageAddedEvent": "crewai.events.types.flow_events",
"ConversationRouteSelectedEvent": "crewai.events.types.flow_events",
"FlowCreatedEvent": "crewai.events.types.flow_events",
"FlowEvent": "crewai.events.types.flow_events",
"FlowFinishedEvent": "crewai.events.types.flow_events",
@@ -291,6 +295,8 @@ __all__ = [
"CheckpointRestoreStartedEvent",
"CheckpointStartedEvent",
"CircularDependencyError",
"ConversationMessageAddedEvent",
"ConversationRouteSelectedEvent",
"CrewKickoffCompletedEvent",
"CrewKickoffFailedEvent",
"CrewKickoffStartedEvent",

View File

@@ -53,6 +53,8 @@ from crewai.events.types.crew_events import (
CrewTrainStartedEvent,
)
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
@@ -154,6 +156,8 @@ EventTypes = (
| TaskStartedEvent
| TaskCompletedEvent
| TaskFailedEvent
| ConversationMessageAddedEvent
| ConversationRouteSelectedEvent
| FlowStartedEvent
| FlowFinishedEvent
| MethodExecutionStartedEvent

View File

@@ -62,6 +62,8 @@ from crewai.events.types.crew_events import (
CrewKickoffStartedEvent,
)
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
FlowCreatedEvent,
FlowFinishedEvent,
FlowPlotEvent,
@@ -255,6 +257,18 @@ class TraceCollectionListener(BaseEventListener):
def on_method_failed(source: Any, event: MethodExecutionFailedEvent) -> None:
self._handle_trace_event("method_execution_failed", source, event)
@event_bus.on(ConversationMessageAddedEvent)
def on_conversation_message_added(
source: Any, event: ConversationMessageAddedEvent
) -> None:
self._handle_action_event("conversation_message_added", source, event)
@event_bus.on(ConversationRouteSelectedEvent)
def on_conversation_route_selected(
source: Any, event: ConversationRouteSelectedEvent
) -> None:
self._handle_action_event("conversation_route_selected", source, event)
@event_bus.on(FlowFinishedEvent)
def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
self._handle_trace_event("flow_finished", source, event)

View File

@@ -166,6 +166,31 @@ class FlowInputReceivedEvent(FlowEvent):
type: Literal["flow_input_received"] = "flow_input_received"
class ConversationMessageAddedEvent(FlowEvent):
"""Event emitted when a conversational Flow records a message.
This gives trace consumers a first-class transcript signal instead of
requiring them to inspect the full method state payload.
"""
session_id: str
role: Literal["user", "assistant", "system", "tool"]
content: Any
message_index: int
type: Literal["conversation_message_added"] = "conversation_message_added"
class ConversationRouteSelectedEvent(FlowEvent):
"""Event emitted when a conversational Flow selects a route for a turn."""
session_id: str
route: str
user_message: str | None = None
message_index: int | None = None
previous_intent: str | None = None
type: Literal["conversation_route_selected"] = "conversation_route_selected"
class HumanFeedbackRequestedEvent(FlowEvent):
"""Event emitted when human feedback is requested.

View File

@@ -24,6 +24,11 @@ from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast
from pydantic import BaseModel, Field, create_model
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
)
from crewai.experimental.conversational import (
AgentMessage,
ConversationConfig,
@@ -122,19 +127,36 @@ class _ConversationalMixin:
"""Route the current turn to a listener label."""
state = cast(ConversationState, self.state)
context = self.build_router_context()
previous_intent = state.last_intent
configured_route = self.route_turn(context)
if configured_route:
state.last_intent = configured_route
self._emit_conversation_route_selected(
configured_route,
previous_intent=previous_intent,
)
return configured_route
if state.last_intent:
self._emit_conversation_route_selected(
state.last_intent,
previous_intent=previous_intent,
)
return state.last_intent
if self.can_answer_from_history(context):
state.last_intent = "answer_from_history"
self._emit_conversation_route_selected(
"answer_from_history",
previous_intent=previous_intent,
)
return "answer_from_history"
state.last_intent = "converse"
self._emit_conversation_route_selected(
"converse",
previous_intent=previous_intent,
)
return "converse"
@listen("converse")
@@ -406,13 +428,61 @@ class _ConversationalMixin:
metadata: dict[str, Any] | None = None,
) -> None:
"""Append a final user-visible assistant message."""
cast(ConversationState, self.state).messages.append(
state = cast(ConversationState, self.state)
state.messages.append(
ConversationMessage(
role="assistant",
content=content,
metadata=metadata or {},
)
)
self._emit_conversation_message_added(
role="assistant",
content=content,
message_index=len(state.messages) - 1,
)
def _emit_conversation_message_added(
self,
*,
role: Literal["user", "assistant", "system", "tool"],
content: Any,
message_index: int,
) -> None:
"""Emit a compact transcript event for conversational trace views."""
state = cast(ConversationState, self.state)
crewai_event_bus.emit(
self,
ConversationMessageAddedEvent(
type="conversation_message_added",
flow_name=self.name or self.__class__.__name__,
session_id=state.id,
role=role,
content=content,
message_index=message_index,
),
)
def _emit_conversation_route_selected(
self,
route: str,
*,
previous_intent: str | None = None,
) -> None:
"""Emit the conversational routing decision for the current turn."""
state = cast(ConversationState, self.state)
crewai_event_bus.emit(
self,
ConversationRouteSelectedEvent(
type="conversation_route_selected",
flow_name=self.name or self.__class__.__name__,
session_id=state.id,
route=route,
user_message=state.current_user_message,
message_index=(len(state.messages) - 1) if state.messages else None,
previous_intent=previous_intent,
),
)
def append_message(
self,
@@ -447,6 +517,11 @@ class _ConversationalMixin:
if self.conversational:
state = cast(ConversationState, self.state)
state.messages.append(ConversationMessage(role="user", content=text))
self._emit_conversation_message_added(
role="user",
content=text,
message_index=len(state.messages) - 1,
)
state.current_user_message = text
state.last_user_message = text
if outcomes and llm is not None:

View File

@@ -912,6 +912,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
_pending_user_message: str | dict[str, Any] | None = PrivateAttr(default=None)
_pending_intents: Sequence[str] | None = PrivateAttr(default=None)
_pending_intent_llm: str | "BaseLLM" | None = PrivateAttr(default=None)
_deferred_flow_started_event_id: str | None = PrivateAttr(default=None)
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
class _FlowGeneric(cls): # type: ignore[valid-type,misc]
@@ -2201,8 +2202,59 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
if filtered_inputs:
self._initialize_state(filtered_inputs)
defer_trace_finalization = self._should_defer_trace_finalization()
deferred_started_event_id = self._deferred_flow_started_event_id
should_emit_flow_started = not (
defer_trace_finalization and deferred_started_event_id
)
if (
defer_trace_finalization
and deferred_started_event_id
and get_current_parent_id() is None
):
restore_event_scope(((deferred_started_event_id, "flow_started"),))
elif get_current_parent_id() is None:
reset_emission_counter()
reset_last_event_id()
if should_emit_flow_started:
# In normal flows, each kickoff owns its own flow lifecycle.
# Deferred conversational sessions are different: the first
# turn opens the flow scope and later turns reuse it until
# ``finalize_session_traces()`` emits the single finish event.
started_event = FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
)
future = crewai_event_bus.emit(self, started_event)
if future:
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowStartedEvent handler failed", exc_info=True)
# Stash the started event id so a deferred
# ``finalize_session_traces()`` can restore the event scope
# before emitting ``FlowFinishedEvent`` (otherwise the bus
# warns "Ending event 'flow_finished' emitted with empty
# scope stack").
if defer_trace_finalization:
object.__setattr__(
self, "_deferred_flow_started_event_id", started_event.event_id
)
if not self.suppress_flow_events:
self._log_flow_event(
f"Flow started with ID: {self.flow_id}", color="bold magenta"
)
# After FlowStarted: env events must not pre-empt trace batch init
# with implicit "crew" execution_type.
get_env_context()
# Conversational hook: apply the pending user message AFTER state
# restore so it survives ``self.persistence.load_state(...)``.
# restore and AFTER flow scope initialization, so transcript events
# are parented under the current conversation trace.
# ``handle_turn`` stashes the message on ``self._pending_user_message``
# before calling ``kickoff``; this drains it.
if (
@@ -2211,43 +2263,6 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
):
self._apply_pending_conversational_turn()
if get_current_parent_id() is None:
reset_emission_counter()
reset_last_event_id()
# ``FlowStartedEvent`` always fires — ``suppress_flow_events``
# only hides the Rich console panel (and the textual log line
# below), it doesn't gate observability events. Tracing /
# downstream listeners still need to see flow_started.
started_event = FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
)
future = crewai_event_bus.emit(self, started_event)
if future:
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowStartedEvent handler failed", exc_info=True)
# Stash the started event id so a deferred
# ``finalize_session_traces()`` can restore the event scope
# before emitting ``FlowFinishedEvent`` (otherwise the bus
# warns "Ending event 'flow_finished' emitted with empty
# scope stack").
if self._should_defer_trace_finalization():
object.__setattr__(
self, "_deferred_flow_started_event_id", started_event.event_id
)
if not self.suppress_flow_events:
self._log_flow_event(
f"Flow started with ID: {self.flow_id}", color="bold magenta"
)
# After FlowStarted: env events must not pre-empt trace batch init
# with implicit "crew" execution_type.
get_env_context()
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)

View File

@@ -11,6 +11,8 @@ from pydantic import BaseModel
from crewai.events.event_bus import crewai_event_bus
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
@@ -421,6 +423,56 @@ class TestConversationalFlow:
assert any(message["content"] == "prior findings" for message in messages)
assert any(message["content"] == "summarize findings" for message in messages)
def test_conversational_turn_emits_message_and_route_events(self) -> None:
class ResearchRoute(BaseModel):
intent: Literal["research", "converse", "end"]
router_llm = MagicMock()
router_llm.call.return_value = ResearchRoute(intent="converse")
chat_llm = MagicMock()
chat_llm.call.return_value = "hello back"
@ConversationConfig(
llm=chat_llm,
router=RouterConfig(
response_format=ResearchRoute,
llm=router_llm,
routes=["research"],
),
)
class RoutedFlow(ConversationalFlow):
@listen("research")
def run_research(self) -> str:
self.append_assistant_message("researched")
return "researched"
messages: list[ConversationMessageAddedEvent] = []
routes: list[ConversationRouteSelectedEvent] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(ConversationMessageAddedEvent)
def capture_message(_: Any, event: ConversationMessageAddedEvent) -> None:
messages.append(event)
@crewai_event_bus.on(ConversationRouteSelectedEvent)
def capture_route(_: Any, event: ConversationRouteSelectedEvent) -> None:
routes.append(event)
flow = RoutedFlow()
flow.handle_turn("just chat")
crewai_event_bus.flush()
assert [(event.role, event.content) for event in messages] == [
("user", "just chat"),
("assistant", "hello back"),
]
assert [event.message_index for event in messages] == [0, 1]
assert len(routes) == 1
assert routes[0].route == "converse"
assert routes[0].user_message == "just chat"
assert routes[0].session_id == messages[0].session_id
def test_builtin_end_marks_conversation_ended(self) -> None:
class ResearchRoute(BaseModel):
intent: Literal["research", "converse", "end"]
@@ -969,6 +1021,72 @@ class TestConversationalFlow:
"defer_trace_finalization=True must skip per-turn finalize"
)
def test_deferred_conversation_emits_one_flow_started(self) -> None:
"""Deferred conversational sessions emit one flow_started for the session."""
from crewai.events.types.flow_events import FlowStartedEvent
@ConversationConfig(defer_trace_finalization=True)
class DeferredFlow(ConversationalFlow):
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> str:
self.append_assistant_message("worked")
return "worked"
flow = DeferredFlow()
observed_events: list[str] = []
started_events: list[FlowStartedEvent] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowStartedEvent)
def capture(_: Any, event: FlowStartedEvent) -> None:
observed_events.append(event.type)
started_events.append(event)
@crewai_event_bus.on(ConversationMessageAddedEvent)
def capture_message(
_: Any, event: ConversationMessageAddedEvent
) -> None:
if event.role == "user":
observed_events.append(event.type)
flow.handle_turn("turn 1")
flow.handle_turn("turn 2")
flow.handle_turn("turn 3")
crewai_event_bus.flush()
assert len(started_events) == 1, (
"deferred conversational traces should emit one session-level "
"flow_started event, not one per turn"
)
assert observed_events[0] == "flow_started"
assert observed_events[1] == "conversation_message_added"
def test_route_event_uses_no_message_index_for_empty_transcript(self) -> None:
"""Route events do not reference index zero when no message exists."""
@ConversationConfig()
class DeferredFlow(ConversationalFlow):
pass
flow = DeferredFlow()
route_events: list[ConversationRouteSelectedEvent] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(ConversationRouteSelectedEvent)
def capture(_: Any, event: ConversationRouteSelectedEvent) -> None:
route_events.append(event)
flow._emit_conversation_route_selected("converse")
crewai_event_bus.flush()
assert len(route_events) == 1
assert route_events[0].message_index is None
def test_finalize_session_traces_emits_finished_and_finalizes_batch(self) -> None:
"""``finalize_session_traces()`` emits one ``FlowFinishedEvent`` + one ``finalize_batch``.