From 6d153284d4de4d3c4a5f458a91e36761e2074190 Mon Sep 17 00:00:00 2001 From: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> Date: Tue, 21 Apr 2026 10:12:24 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20merge=20execution=20metadata=20on=20dupl?= =?UTF-8?q?icate=20batch=20initialization=20in=20Tr=E2=80=A6=20(#5573)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: merge execution metadata on duplicate batch initialization in TraceBatchManager - Updated TraceBatchManager to merge execution metadata when a batch is initialized multiple times. - Enhanced logging to reflect the merging of metadata during duplicate initialization. - Added a test case to verify that execution metadata is correctly merged when initializing a batch after a lazy action. * drop env events emitting from traces listener --- .../listeners/tracing/trace_batch_manager.py | 5 ++- .../listeners/tracing/trace_listener.py | 30 +------------- lib/crewai/src/crewai/flow/flow.py | 7 +++- lib/crewai/tests/tracing/test_tracing.py | 40 +++++++++++++++++++ 4 files changed, 52 insertions(+), 30 deletions(-) diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py index d2a0912f6..e35fe66e1 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py @@ -81,8 +81,11 @@ class TraceBatchManager: """Initialize a new trace batch (thread-safe)""" with self._batch_ready_cv: if self.current_batch is not None: + # Lazy init (e.g. DefaultEnvEvent) may have created the batch without + # execution_type; merge metadata from a later flow/crew initializer. + self.current_batch.execution_metadata.update(execution_metadata) logger.debug( - "Batch already initialized, skipping duplicate initialization" + "Batch already initialized, merged execution metadata and skipped duplicate initialization" ) return self.current_batch diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index c4cc6cb71..046bc0f1a 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -60,12 +60,6 @@ from crewai.events.types.crew_events import ( CrewKickoffFailedEvent, CrewKickoffStartedEvent, ) -from crewai.events.types.env_events import ( - CCEnvEvent, - CodexEnvEvent, - CursorEnvEvent, - DefaultEnvEvent, -) from crewai.events.types.flow_events import ( FlowCreatedEvent, FlowFinishedEvent, @@ -212,7 +206,6 @@ class TraceCollectionListener(BaseEventListener): self._listeners_setup = True return - self._register_env_event_handlers(crewai_event_bus) self._register_flow_event_handlers(crewai_event_bus) self._register_context_event_handlers(crewai_event_bus) self._register_action_event_handlers(crewai_event_bus) @@ -221,25 +214,6 @@ class TraceCollectionListener(BaseEventListener): self._listeners_setup = True - def _register_env_event_handlers(self, event_bus: CrewAIEventsBus) -> None: - """Register handlers for environment context events.""" - - @event_bus.on(CCEnvEvent) - def on_cc_env(source: Any, event: CCEnvEvent) -> None: - self._handle_action_event("cc_env", source, event) - - @event_bus.on(CodexEnvEvent) - def on_codex_env(source: Any, event: CodexEnvEvent) -> None: - self._handle_action_event("codex_env", source, event) - - @event_bus.on(CursorEnvEvent) - def on_cursor_env(source: Any, event: CursorEnvEvent) -> None: - self._handle_action_event("cursor_env", source, event) - - @event_bus.on(DefaultEnvEvent) - def on_default_env(source: Any, event: DefaultEnvEvent) -> None: - self._handle_action_event("default_env", source, event) - def _register_flow_event_handlers(self, event_bus: CrewAIEventsBus) -> None: """Register handlers for flow events.""" @@ -286,8 +260,8 @@ class TraceCollectionListener(BaseEventListener): if self.batch_manager.batch_owner_type != "flow": # Always call _initialize_crew_batch to claim ownership. # If batch was already initialized by a concurrent action event - # (race condition with DefaultEnvEvent), initialize_batch() returns - # early but batch_owner_type is still correctly set to "crew". + # (e.g. LLM/tool before crew_kickoff_started), initialize_batch() + # returns early but batch_owner_type is still correctly set to "crew". # Skip only when a parent flow already owns the batch. self._initialize_crew_batch(source, event) self._handle_trace_event("crew_kickoff_started", source, event) diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 88457f7aa..b363ebc71 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1503,6 +1503,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): except Exception: logger.warning("FlowStartedEvent handler failed", exc_info=True) + get_env_context() + context = self._pending_feedback_context emit = context.emit default_outcome = context.default_outcome @@ -2004,7 +2006,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): restored = apply_checkpoint(self, from_checkpoint) if restored is not None: return restored.kickoff(inputs=inputs, input_files=input_files) - get_env_context() if self.stream: result_holder: list[Any] = [] current_task_info: TaskInfo = { @@ -2206,6 +2207,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): f"Flow started with ID: {self.flow_id}", color="bold magenta" ) + # After FlowStarted (when not suppressed): env events must not pre-empt + # trace batch init with implicit "crew" execution_type. + get_env_context() + if inputs is not None and "id" not in inputs: self._initialize_state(inputs) diff --git a/lib/crewai/tests/tracing/test_tracing.py b/lib/crewai/tests/tracing/test_tracing.py index 640aca832..38bb060bd 100644 --- a/lib/crewai/tests/tracing/test_tracing.py +++ b/lib/crewai/tests/tracing/test_tracing.py @@ -1640,3 +1640,43 @@ class TestBackendInitializedGatedOnSuccess: assert bm.backend_initialized is False assert bm.trace_batch_id is None + + +class TestTraceBatchManagerDuplicateInitMerge: + """Second initialize_batch call merges execution_metadata (flow after lazy action).""" + + def test_duplicate_initialize_merges_execution_metadata(self): + with ( + patch( + "crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces", + return_value=True, + ), + patch( + "crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context", + return_value=True, + ), + ): + bm = TraceBatchManager() + bm.initialize_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={ + "crew_name": "Unknown Crew", + "crewai_version": "9.9.9", + }, + ) + first_batch_id = bm.current_batch.batch_id + bm.initialize_batch( + user_context={"privacy_level": "standard"}, + execution_metadata={ + "flow_name": "ResearchFlow", + "execution_type": "flow", + "crewai_version": "9.9.9", + "execution_start": "2026-01-01T00:00:00+00:00", + }, + ) + + assert bm.current_batch.batch_id == first_batch_id + meta = bm.current_batch.execution_metadata + assert meta.get("execution_type") == "flow" + assert meta.get("flow_name") == "ResearchFlow" + assert meta.get("crew_name") == "Unknown Crew"