feat: add conversation message and route selection events

- Introduced `ConversationMessageAddedEvent` and `ConversationRouteSelectedEvent` to enhance conversational flow tracking.
- Updated event listeners to emit these events during message handling and routing decisions.
- Enhanced the `_ConversationalMixin` class to emit events for user and assistant messages, as well as selected routes.
- Added tests to verify the correct emission of these events during conversational turns.
This commit is contained in:
lorenzejay
2026-06-03 15:08:50 -07:00
parent 73d20fb0c3
commit 880355648f
6 changed files with 177 additions and 1 deletions

View File

@@ -61,6 +61,8 @@ if TYPE_CHECKING:
CrewTrainStartedEvent,
)
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
FlowCreatedEvent,
FlowEvent,
FlowFinishedEvent,
@@ -176,6 +178,8 @@ _LAZY_EVENT_MAPPING: dict[str, str] = {
"CrewTrainCompletedEvent": "crewai.events.types.crew_events",
"CrewTrainFailedEvent": "crewai.events.types.crew_events",
"CrewTrainStartedEvent": "crewai.events.types.crew_events",
"ConversationMessageAddedEvent": "crewai.events.types.flow_events",
"ConversationRouteSelectedEvent": "crewai.events.types.flow_events",
"FlowCreatedEvent": "crewai.events.types.flow_events",
"FlowEvent": "crewai.events.types.flow_events",
"FlowFinishedEvent": "crewai.events.types.flow_events",
@@ -291,6 +295,8 @@ __all__ = [
"CheckpointRestoreStartedEvent",
"CheckpointStartedEvent",
"CircularDependencyError",
"ConversationMessageAddedEvent",
"ConversationRouteSelectedEvent",
"CrewKickoffCompletedEvent",
"CrewKickoffFailedEvent",
"CrewKickoffStartedEvent",

View File

@@ -53,6 +53,8 @@ from crewai.events.types.crew_events import (
CrewTrainStartedEvent,
)
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
@@ -154,6 +156,8 @@ EventTypes = (
| TaskStartedEvent
| TaskCompletedEvent
| TaskFailedEvent
| ConversationMessageAddedEvent
| ConversationRouteSelectedEvent
| FlowStartedEvent
| FlowFinishedEvent
| MethodExecutionStartedEvent

View File

@@ -62,6 +62,8 @@ from crewai.events.types.crew_events import (
CrewKickoffStartedEvent,
)
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
FlowCreatedEvent,
FlowFinishedEvent,
FlowPlotEvent,
@@ -255,6 +257,18 @@ class TraceCollectionListener(BaseEventListener):
def on_method_failed(source: Any, event: MethodExecutionFailedEvent) -> None:
self._handle_trace_event("method_execution_failed", source, event)
@event_bus.on(ConversationMessageAddedEvent)
def on_conversation_message_added(
source: Any, event: ConversationMessageAddedEvent
) -> None:
self._handle_trace_event("conversation_message_added", source, event)
@event_bus.on(ConversationRouteSelectedEvent)
def on_conversation_route_selected(
source: Any, event: ConversationRouteSelectedEvent
) -> None:
self._handle_trace_event("conversation_route_selected", source, event)
@event_bus.on(FlowFinishedEvent)
def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
self._handle_trace_event("flow_finished", source, event)

View File

@@ -166,6 +166,31 @@ class FlowInputReceivedEvent(FlowEvent):
type: Literal["flow_input_received"] = "flow_input_received"
class ConversationMessageAddedEvent(FlowEvent):
"""Event emitted when a conversational Flow records a message.
This gives trace consumers a first-class transcript signal instead of
requiring them to inspect the full method state payload.
"""
session_id: str
role: Literal["user", "assistant", "system", "tool"]
content: Any
message_index: int
type: Literal["conversation_message_added"] = "conversation_message_added"
class ConversationRouteSelectedEvent(FlowEvent):
"""Event emitted when a conversational Flow selects a route for a turn."""
session_id: str
route: str
user_message: str | None = None
message_index: int | None = None
previous_intent: str | None = None
type: Literal["conversation_route_selected"] = "conversation_route_selected"
class HumanFeedbackRequestedEvent(FlowEvent):
"""Event emitted when human feedback is requested.

View File

@@ -24,6 +24,11 @@ from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast
from pydantic import BaseModel, Field, create_model
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
)
from crewai.experimental.conversational import (
AgentMessage,
ConversationConfig,
@@ -122,19 +127,36 @@ class _ConversationalMixin:
"""Route the current turn to a listener label."""
state = cast(ConversationState, self.state)
context = self.build_router_context()
previous_intent = state.last_intent
configured_route = self.route_turn(context)
if configured_route:
state.last_intent = configured_route
self._emit_conversation_route_selected(
configured_route,
previous_intent=previous_intent,
)
return configured_route
if state.last_intent:
self._emit_conversation_route_selected(
state.last_intent,
previous_intent=previous_intent,
)
return state.last_intent
if self.can_answer_from_history(context):
state.last_intent = "answer_from_history"
self._emit_conversation_route_selected(
"answer_from_history",
previous_intent=previous_intent,
)
return "answer_from_history"
state.last_intent = "converse"
self._emit_conversation_route_selected(
"converse",
previous_intent=previous_intent,
)
return "converse"
@listen("converse")
@@ -353,13 +375,61 @@ class _ConversationalMixin:
metadata: dict[str, Any] | None = None,
) -> None:
"""Append a final user-visible assistant message."""
cast(ConversationState, self.state).messages.append(
state = cast(ConversationState, self.state)
state.messages.append(
ConversationMessage(
role="assistant",
content=content,
metadata=metadata or {},
)
)
self._emit_conversation_message_added(
role="assistant",
content=content,
message_index=len(state.messages) - 1,
)
def _emit_conversation_message_added(
self,
*,
role: Literal["user", "assistant", "system", "tool"],
content: Any,
message_index: int,
) -> None:
"""Emit a compact transcript event for conversational trace views."""
state = cast(ConversationState, self.state)
crewai_event_bus.emit(
self,
ConversationMessageAddedEvent(
type="conversation_message_added",
flow_name=self.name or self.__class__.__name__,
session_id=state.id,
role=role,
content=content,
message_index=message_index,
),
)
def _emit_conversation_route_selected(
self,
route: str,
*,
previous_intent: str | None = None,
) -> None:
"""Emit the conversational routing decision for the current turn."""
state = cast(ConversationState, self.state)
crewai_event_bus.emit(
self,
ConversationRouteSelectedEvent(
type="conversation_route_selected",
flow_name=self.name or self.__class__.__name__,
session_id=state.id,
route=route,
user_message=state.current_user_message,
message_index=max(len(state.messages) - 1, 0),
previous_intent=previous_intent,
),
)
def append_message(
self,
@@ -394,6 +464,11 @@ class _ConversationalMixin:
if self.conversational:
state = cast(ConversationState, self.state)
state.messages.append(ConversationMessage(role="user", content=text))
self._emit_conversation_message_added(
role="user",
content=text,
message_index=len(state.messages) - 1,
)
state.current_user_message = text
state.last_user_message = text
if outcomes and llm is not None:

View File

@@ -11,6 +11,8 @@ from pydantic import BaseModel
from crewai.events.event_bus import crewai_event_bus
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
from crewai.events.types.flow_events import (
ConversationMessageAddedEvent,
ConversationRouteSelectedEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
@@ -421,6 +423,56 @@ class TestConversationalFlow:
assert any(message["content"] == "prior findings" for message in messages)
assert any(message["content"] == "summarize findings" for message in messages)
def test_conversational_turn_emits_message_and_route_events(self) -> None:
class ResearchRoute(BaseModel):
intent: Literal["research", "converse", "end"]
router_llm = MagicMock()
router_llm.call.return_value = ResearchRoute(intent="converse")
chat_llm = MagicMock()
chat_llm.call.return_value = "hello back"
@ConversationConfig(
llm=chat_llm,
router=RouterConfig(
response_format=ResearchRoute,
llm=router_llm,
routes=["research"],
),
)
class RoutedFlow(ConversationalFlow):
@listen("research")
def run_research(self) -> str:
self.append_assistant_message("researched")
return "researched"
messages: list[ConversationMessageAddedEvent] = []
routes: list[ConversationRouteSelectedEvent] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(ConversationMessageAddedEvent)
def capture_message(_: Any, event: ConversationMessageAddedEvent) -> None:
messages.append(event)
@crewai_event_bus.on(ConversationRouteSelectedEvent)
def capture_route(_: Any, event: ConversationRouteSelectedEvent) -> None:
routes.append(event)
flow = RoutedFlow()
flow.handle_turn("just chat")
crewai_event_bus.flush()
assert [(event.role, event.content) for event in messages] == [
("user", "just chat"),
("assistant", "hello back"),
]
assert [event.message_index for event in messages] == [0, 1]
assert len(routes) == 1
assert routes[0].route == "converse"
assert routes[0].user_message == "just chat"
assert routes[0].session_id == messages[0].session_id
def test_builtin_end_marks_conversation_ended(self) -> None:
class ResearchRoute(BaseModel):
intent: Literal["research", "converse", "end"]