diff --git a/lib/crewai/src/crewai/utilities/streaming.py b/lib/crewai/src/crewai/utilities/streaming.py index 78c5c0313..ff3f1bc87 100644 --- a/lib/crewai/src/crewai/utilities/streaming.py +++ b/lib/crewai/src/crewai/utilities/streaming.py @@ -207,12 +207,16 @@ def create_streaming_state( ) -> StreamingState: """Create and register streaming state. - Each call generates a unique ``run_id`` that is: + Each call assigns a ``run_id`` that is: * stored in a ``contextvars.ContextVar`` so that downstream LLM emit paths can stamp ``LLMStreamChunkEvent.run_id`` automatically, and * passed to the stream handler so it only accepts events with a matching ``run_id``, preventing cross-run chunk contamination. + If the current context already carries a ``run_id`` (e.g. a parent + flow already created a streaming state), the existing value is reused + so that nested streaming (flow → crew) shares the same scope. + Args: current_task_info: Task context info. result_holder: List to hold the final result. @@ -221,7 +225,7 @@ def create_streaming_state( Returns: Initialized StreamingState with registered handler. """ - run_id = str(uuid.uuid4()) + run_id = _current_stream_run_id.get() or str(uuid.uuid4()) _current_stream_run_id.set(run_id) sync_queue: queue.Queue[StreamChunk | None | Exception] = queue.Queue() diff --git a/lib/crewai/tests/test_streaming.py b/lib/crewai/tests/test_streaming.py index 382f1cb36..8a0cc28d1 100644 --- a/lib/crewai/tests/test_streaming.py +++ b/lib/crewai/tests/test_streaming.py @@ -907,14 +907,15 @@ class TestStreamingRunIsolation: assert item.content == "own-chunk" def test_concurrent_streaming_states_do_not_cross_contaminate(self) -> None: - """Two streaming states created concurrently must each receive only - their own events, even though both handlers are registered on the - same global event bus. + """Two streaming states created in separate contexts (simulating + concurrent runs) must each receive only their own events, even + though both handlers are registered on the same global event bus. """ - import threading + import contextvars from crewai.utilities.streaming import ( create_streaming_state, + _current_stream_run_id, TaskInfo, _unregister_handler, ) @@ -934,21 +935,19 @@ class TestStreamingRunIsolation: "agent_id": "aid-b", } - state_a = create_streaming_state(task_a, []) - run_id_a = state_a.sync_queue # we'll read from this queue + def _create_in_fresh_context( + task_info: TaskInfo, + ) -> "StreamingState": + """Reset the run_id contextvar and create streaming state.""" + _current_stream_run_id.set(None) + return create_streaming_state(task_info, []) - state_b = create_streaming_state(task_b, []) - run_id_b = state_b.sync_queue - - # We need the run_ids that were generated. They were set on the - # contextvar but we can infer them by emitting known events. - # Instead, peek at the handler closure – or simply emit tagged events - # directly and check which queue gets them. - - # Emit event tagged for state_a's run. - # We need the run_id. Retrieve it by inspecting the handler's closure. - import types + # Create each streaming state in a *separate* context so they get + # distinct run_ids (simulates truly concurrent runs). + state_a = contextvars.copy_context().run(_create_in_fresh_context, task_a) + state_b = contextvars.copy_context().run(_create_in_fresh_context, task_b) + # Extract run_ids from handler closures. def _get_run_id_from_handler(handler: Any) -> str | None: """Extract the run_id captured in the handler closure.""" fn = handler