From f1c5ea3642c45f8c62e196902a78fe4a9f41b5c2 Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Thu, 21 May 2026 13:07:31 -0700 Subject: [PATCH] feat: enhance flow event tracing and session management - Updated TraceCollectionListener to handle nested flows without re-claiming parent session batches. - Ensured that method execution events are always emitted for tracing, regardless of flow event suppression. - Improved finalization logic for flow trace batches to respect session deferral flags. - Added tests to verify that method execution events are emitted correctly when flow events are suppressed and that deferred session finalization is respected in nested flows. --- .../listeners/tracing/trace_listener.py | 20 +++- lib/crewai/src/crewai/flow/flow.py | 99 ++++++++++--------- lib/crewai/tests/test_flow_conversation.py | 62 ++++++++++-- 3 files changed, 121 insertions(+), 60 deletions(-) diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index 663764bcd..021b75a52 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -231,7 +231,15 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(FlowStartedEvent) def on_flow_started(source: Any, event: FlowStartedEvent) -> None: - # Always call _initialize_flow_batch to claim ownership. + # Nested flows (e.g. AgentExecutor inside a conversational Flow) must + # not re-claim an open session batch owned by the parent kickoff. + if ( + self.batch_manager.defer_session_finalization + and self.batch_manager.is_batch_initialized() + and self.batch_manager.batch_owner_type == "flow" + ): + self._handle_trace_event("flow_started", source, event) + return # If batch was already initialized by a concurrent action event # (race condition), initialize_batch() returns early but # batch_owner_type is still correctly set to "flow". @@ -275,6 +283,8 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(CrewKickoffCompletedEvent) def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None: self._handle_trace_event("crew_kickoff_completed", source, event) + if self.batch_manager.defer_session_finalization: + return if self._nested_in_flow_execution(): return if self.batch_manager.batch_owner_type == "crew": @@ -287,6 +297,8 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(CrewKickoffFailedEvent) def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None: self._handle_trace_event("crew_kickoff_failed", source, event) + if self.batch_manager.defer_session_finalization: + return if self._nested_in_flow_execution(): return if self.first_time_handler.is_first_time: @@ -760,8 +772,10 @@ class TraceCollectionListener(BaseEventListener): def _try_initialize_flow_batch_from_context(self, event: Any) -> bool: """Claim a flow trace batch when an action event fires inside kickoff. - Flows with ``suppress_flow_events=True`` skip ``FlowStartedEvent``, so - LLM/tool events must not fall back to implicit crew batches. + When ``suppress_flow_events=True``, console panels are hidden but + ``FlowStartedEvent`` and method lifecycle events still emit; if no + batch exists yet, LLM/tool events must not fall back to implicit crew + batches. """ from crewai.flow.flow_context import current_flow_id, current_flow_name diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index af342d9b1..8199d5a60 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -2908,27 +2908,26 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): Returns: A tuple of (result, finished_event_id) where finished_event_id is - the event_id of the MethodExecutionFinishedEvent, or None if events - are suppressed. + the event_id of the MethodExecutionFinishedEvent. """ try: dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | ( kwargs or {} ) - if not self.suppress_flow_events: - future = crewai_event_bus.emit( - self, - MethodExecutionStartedEvent( - type="method_execution_started", - method_name=method_name, - flow_name=self.name or self.__class__.__name__, - params=dumped_params, - state=self._copy_and_serialize_state(), - ), - ) - if future: - self._event_futures.append(future) + # Always emit for tracing; suppress_flow_events only hides console panels. + future = crewai_event_bus.emit( + self, + MethodExecutionStartedEvent( + type="method_execution_started", + method_name=method_name, + flow_name=self.name or self.__class__.__name__, + params=dumped_params, + state=self._copy_and_serialize_state(), + ), + ) + if future: + self._event_futures.append(future) # Set method name in context so ask() can read it without # stack inspection. Must happen before copy_context() so the @@ -2970,18 +2969,17 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self._completed_methods.add(method_name) finished_event_id: str | None = None - if not self.suppress_flow_events: - finished_event = MethodExecutionFinishedEvent( - type="method_execution_finished", - method_name=method_name, - flow_name=self.name or self.__class__.__name__, - state=self._copy_and_serialize_state(), - result=result, - ) - finished_event_id = finished_event.event_id - future = crewai_event_bus.emit(self, finished_event) - if future: - self._event_futures.append(future) + finished_event = MethodExecutionFinishedEvent( + type="method_execution_finished", + method_name=method_name, + flow_name=self.name or self.__class__.__name__, + state=self._copy_and_serialize_state(), + result=result, + ) + finished_event_id = finished_event.event_id + future = crewai_event_bus.emit(self, finished_event) + if future: + self._event_futures.append(future) return result, finished_event_id except Exception as e: @@ -2996,24 +2994,23 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): self.persistence = SQLiteFlowPersistence() - # Emit paused event (not failed) - if not self.suppress_flow_events: - future = crewai_event_bus.emit( - self, - MethodExecutionPausedEvent( - type="method_execution_paused", - method_name=method_name, - flow_name=self.name or self.__class__.__name__, - state=self._copy_and_serialize_state(), - flow_id=e.context.flow_id, - message=e.context.message, - emit=e.context.emit, - ), - ) - if future: - self._event_futures.append(future) - elif not self.suppress_flow_events: - # Regular failure - emit failed event + # Emit paused event (not failed); always emit for tracing. + future = crewai_event_bus.emit( + self, + MethodExecutionPausedEvent( + type="method_execution_paused", + method_name=method_name, + flow_name=self.name or self.__class__.__name__, + state=self._copy_and_serialize_state(), + flow_id=e.context.flow_id, + message=e.context.message, + emit=e.context.emit, + ), + ) + if future: + self._event_futures.append(future) + else: + # Regular failure - always emit for tracing. future = crewai_event_bus.emit( self, MethodExecutionFailedEvent( @@ -3911,15 +3908,19 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): def _finalize_flow_trace_batch(self, *, force: bool = False) -> None: """Finalize the active trace batch when this flow owns it.""" - if not force and self._should_defer_trace_finalization(): - return - from crewai.events.listeners.tracing.trace_listener import ( TraceCollectionListener, ) trace_listener = TraceCollectionListener() - if trace_listener.batch_manager.batch_owner_type != "flow": + batch_manager = trace_listener.batch_manager + if not force and ( + self._should_defer_trace_finalization() + or batch_manager.defer_session_finalization + ): + return + + if batch_manager.batch_owner_type != "flow": return if trace_listener.first_time_handler.is_first_time: trace_listener.first_time_handler.mark_events_collected() diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index b613fc096..9d9616b65 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -11,7 +11,11 @@ from pydantic import BaseModel, Field from crewai.events.event_bus import crewai_event_bus from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener -from crewai.events.types.flow_events import FlowStartedEvent +from crewai.events.types.flow_events import ( + FlowStartedEvent, + MethodExecutionFinishedEvent, + MethodExecutionStartedEvent, +) from crewai.events.types.llm_events import LLMCallStartedEvent from crewai.flow import Flow, ChatState, listen, start from crewai.flow.flow_context import current_flow_id, current_flow_name @@ -213,6 +217,31 @@ class TestFlowTracingWhenSuppressed: assert started == ["QuietFlow"] + def test_method_execution_emitted_when_panel_events_suppressed(self) -> None: + class QuietFlow(Flow[ChatState]): + suppress_flow_events = True + + @start() + def begin(self) -> str: + return "ok" + + started: list[str] = [] + finished: list[str] = [] + original_emit = crewai_event_bus.emit + + def track_emit(source: Any, event: Any, *args: Any, **kwargs: Any) -> Any: + if isinstance(event, MethodExecutionStartedEvent): + started.append(event.method_name) + if isinstance(event, MethodExecutionFinishedEvent): + finished.append(event.method_name) + return original_emit(source, event, *args, **kwargs) + + with patch.object(crewai_event_bus, "emit", side_effect=track_emit): + QuietFlow().kickoff() + + assert started == ["begin"] + assert finished == ["begin"] + def test_llm_action_inside_flow_claims_flow_trace_batch(self) -> None: listener = TraceCollectionListener() listener.batch_manager.current_batch = None @@ -267,17 +296,16 @@ class TestDeferTraceFinalization: flow = SimpleChatFlow() flow.defer_trace_finalization = True - with patch( - "crewai.events.listeners.tracing.trace_listener.TraceCollectionListener" - ) as mock_listener_cls: - mock_listener_cls.return_value.batch_manager.batch_owner_type = "flow" - mock_listener_cls.return_value.first_time_handler.is_first_time = False + listener = TraceCollectionListener() + listener.batch_manager.batch_owner_type = "flow" + listener.first_time_handler.is_first_time = False + with patch.object(listener.batch_manager, "finalize_batch") as mock_finalize: flow._finalize_flow_trace_batch() - mock_listener_cls.assert_not_called() + mock_finalize.assert_not_called() flow._finalize_flow_trace_batch(force=True) - mock_listener_cls.assert_called_once() + mock_finalize.assert_called_once() class TestDeferredFlowLifecycleEvents: @@ -460,6 +488,24 @@ class TestNestedCrewTracing: finally: current_flow_id.reset(token) + def test_finalize_flow_trace_batch_respects_defer_session_flag(self) -> None: + """Nested Flow kickoffs (e.g. AgentExecutor) must not finalize a deferred session batch.""" + + class InnerFlow(Flow[ChatState]): + @start() + def begin(self) -> str: + return "ok" + + listener = TraceCollectionListener() + listener.batch_manager.batch_owner_type = "flow" + listener.batch_manager.defer_session_finalization = True + listener.first_time_handler.is_first_time = False + + inner = InnerFlow() + with patch.object(listener.batch_manager, "finalize_batch") as mock_finalize: + inner._finalize_flow_trace_batch() + mock_finalize.assert_not_called() + def test_flow_owned_batch_skips_finalize_without_flow_context(self) -> None: from crewai.events.listeners.tracing.trace_listener import ( TraceCollectionListener,