feat: enhance flow event tracing and session management

- Updated TraceCollectionListener to handle nested flows without re-claiming parent session batches.
- Ensured that method execution events are always emitted for tracing, regardless of flow event suppression.
- Improved finalization logic for flow trace batches to respect session deferral flags.
- Added tests to verify that method execution events are emitted correctly when flow events are suppressed and that deferred session finalization is respected in nested flows.
This commit is contained in:
lorenzejay
2026-05-21 13:07:31 -07:00
parent eca18b03ab
commit f1c5ea3642
3 changed files with 121 additions and 60 deletions

View File

@@ -231,7 +231,15 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(FlowStartedEvent)
def on_flow_started(source: Any, event: FlowStartedEvent) -> None:
# Always call _initialize_flow_batch to claim ownership.
# Nested flows (e.g. AgentExecutor inside a conversational Flow) must
# not re-claim an open session batch owned by the parent kickoff.
if (
self.batch_manager.defer_session_finalization
and self.batch_manager.is_batch_initialized()
and self.batch_manager.batch_owner_type == "flow"
):
self._handle_trace_event("flow_started", source, event)
return
# If batch was already initialized by a concurrent action event
# (race condition), initialize_batch() returns early but
# batch_owner_type is still correctly set to "flow".
@@ -275,6 +283,8 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None:
self._handle_trace_event("crew_kickoff_completed", source, event)
if self.batch_manager.defer_session_finalization:
return
if self._nested_in_flow_execution():
return
if self.batch_manager.batch_owner_type == "crew":
@@ -287,6 +297,8 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None:
self._handle_trace_event("crew_kickoff_failed", source, event)
if self.batch_manager.defer_session_finalization:
return
if self._nested_in_flow_execution():
return
if self.first_time_handler.is_first_time:
@@ -760,8 +772,10 @@ class TraceCollectionListener(BaseEventListener):
def _try_initialize_flow_batch_from_context(self, event: Any) -> bool:
"""Claim a flow trace batch when an action event fires inside kickoff.
Flows with ``suppress_flow_events=True`` skip ``FlowStartedEvent``, so
LLM/tool events must not fall back to implicit crew batches.
When ``suppress_flow_events=True``, console panels are hidden but
``FlowStartedEvent`` and method lifecycle events still emit; if no
batch exists yet, LLM/tool events must not fall back to implicit crew
batches.
"""
from crewai.flow.flow_context import current_flow_id, current_flow_name

View File

@@ -2908,27 +2908,26 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
Returns:
A tuple of (result, finished_event_id) where finished_event_id is
the event_id of the MethodExecutionFinishedEvent, or None if events
are suppressed.
the event_id of the MethodExecutionFinishedEvent.
"""
try:
dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (
kwargs or {}
)
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
params=dumped_params,
state=self._copy_and_serialize_state(),
),
)
if future:
self._event_futures.append(future)
# Always emit for tracing; suppress_flow_events only hides console panels.
future = crewai_event_bus.emit(
self,
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
params=dumped_params,
state=self._copy_and_serialize_state(),
),
)
if future:
self._event_futures.append(future)
# Set method name in context so ask() can read it without
# stack inspection. Must happen before copy_context() so the
@@ -2970,18 +2969,17 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self._completed_methods.add(method_name)
finished_event_id: str | None = None
if not self.suppress_flow_events:
finished_event = MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
result=result,
)
finished_event_id = finished_event.event_id
future = crewai_event_bus.emit(self, finished_event)
if future:
self._event_futures.append(future)
finished_event = MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
result=result,
)
finished_event_id = finished_event.event_id
future = crewai_event_bus.emit(self, finished_event)
if future:
self._event_futures.append(future)
return result, finished_event_id
except Exception as e:
@@ -2996,24 +2994,23 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self.persistence = SQLiteFlowPersistence()
# Emit paused event (not failed)
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
MethodExecutionPausedEvent(
type="method_execution_paused",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
flow_id=e.context.flow_id,
message=e.context.message,
emit=e.context.emit,
),
)
if future:
self._event_futures.append(future)
elif not self.suppress_flow_events:
# Regular failure - emit failed event
# Emit paused event (not failed); always emit for tracing.
future = crewai_event_bus.emit(
self,
MethodExecutionPausedEvent(
type="method_execution_paused",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
flow_id=e.context.flow_id,
message=e.context.message,
emit=e.context.emit,
),
)
if future:
self._event_futures.append(future)
else:
# Regular failure - always emit for tracing.
future = crewai_event_bus.emit(
self,
MethodExecutionFailedEvent(
@@ -3911,15 +3908,19 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
def _finalize_flow_trace_batch(self, *, force: bool = False) -> None:
"""Finalize the active trace batch when this flow owns it."""
if not force and self._should_defer_trace_finalization():
return
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type != "flow":
batch_manager = trace_listener.batch_manager
if not force and (
self._should_defer_trace_finalization()
or batch_manager.defer_session_finalization
):
return
if batch_manager.batch_owner_type != "flow":
return
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()

View File

@@ -11,7 +11,11 @@ from pydantic import BaseModel, Field
from crewai.events.event_bus import crewai_event_bus
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
from crewai.events.types.flow_events import FlowStartedEvent
from crewai.events.types.flow_events import (
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import LLMCallStartedEvent
from crewai.flow import Flow, ChatState, listen, start
from crewai.flow.flow_context import current_flow_id, current_flow_name
@@ -213,6 +217,31 @@ class TestFlowTracingWhenSuppressed:
assert started == ["QuietFlow"]
def test_method_execution_emitted_when_panel_events_suppressed(self) -> None:
class QuietFlow(Flow[ChatState]):
suppress_flow_events = True
@start()
def begin(self) -> str:
return "ok"
started: list[str] = []
finished: list[str] = []
original_emit = crewai_event_bus.emit
def track_emit(source: Any, event: Any, *args: Any, **kwargs: Any) -> Any:
if isinstance(event, MethodExecutionStartedEvent):
started.append(event.method_name)
if isinstance(event, MethodExecutionFinishedEvent):
finished.append(event.method_name)
return original_emit(source, event, *args, **kwargs)
with patch.object(crewai_event_bus, "emit", side_effect=track_emit):
QuietFlow().kickoff()
assert started == ["begin"]
assert finished == ["begin"]
def test_llm_action_inside_flow_claims_flow_trace_batch(self) -> None:
listener = TraceCollectionListener()
listener.batch_manager.current_batch = None
@@ -267,17 +296,16 @@ class TestDeferTraceFinalization:
flow = SimpleChatFlow()
flow.defer_trace_finalization = True
with patch(
"crewai.events.listeners.tracing.trace_listener.TraceCollectionListener"
) as mock_listener_cls:
mock_listener_cls.return_value.batch_manager.batch_owner_type = "flow"
mock_listener_cls.return_value.first_time_handler.is_first_time = False
listener = TraceCollectionListener()
listener.batch_manager.batch_owner_type = "flow"
listener.first_time_handler.is_first_time = False
with patch.object(listener.batch_manager, "finalize_batch") as mock_finalize:
flow._finalize_flow_trace_batch()
mock_listener_cls.assert_not_called()
mock_finalize.assert_not_called()
flow._finalize_flow_trace_batch(force=True)
mock_listener_cls.assert_called_once()
mock_finalize.assert_called_once()
class TestDeferredFlowLifecycleEvents:
@@ -460,6 +488,24 @@ class TestNestedCrewTracing:
finally:
current_flow_id.reset(token)
def test_finalize_flow_trace_batch_respects_defer_session_flag(self) -> None:
"""Nested Flow kickoffs (e.g. AgentExecutor) must not finalize a deferred session batch."""
class InnerFlow(Flow[ChatState]):
@start()
def begin(self) -> str:
return "ok"
listener = TraceCollectionListener()
listener.batch_manager.batch_owner_type = "flow"
listener.batch_manager.defer_session_finalization = True
listener.first_time_handler.is_first_time = False
inner = InnerFlow()
with patch.object(listener.batch_manager, "finalize_batch") as mock_finalize:
inner._finalize_flow_trace_batch()
mock_finalize.assert_not_called()
def test_flow_owned_batch_skips_finalize_without_flow_context(self) -> None:
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,