mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-30 23:02:50 +00:00
fix: reuse parent run_id in nested streaming and fix test context isolation
Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -207,12 +207,16 @@ def create_streaming_state(
|
||||
) -> StreamingState:
|
||||
"""Create and register streaming state.
|
||||
|
||||
Each call generates a unique ``run_id`` that is:
|
||||
Each call assigns a ``run_id`` that is:
|
||||
* stored in a ``contextvars.ContextVar`` so that downstream LLM emit
|
||||
paths can stamp ``LLMStreamChunkEvent.run_id`` automatically, and
|
||||
* passed to the stream handler so it only accepts events with a
|
||||
matching ``run_id``, preventing cross-run chunk contamination.
|
||||
|
||||
If the current context already carries a ``run_id`` (e.g. a parent
|
||||
flow already created a streaming state), the existing value is reused
|
||||
so that nested streaming (flow → crew) shares the same scope.
|
||||
|
||||
Args:
|
||||
current_task_info: Task context info.
|
||||
result_holder: List to hold the final result.
|
||||
@@ -221,7 +225,7 @@ def create_streaming_state(
|
||||
Returns:
|
||||
Initialized StreamingState with registered handler.
|
||||
"""
|
||||
run_id = str(uuid.uuid4())
|
||||
run_id = _current_stream_run_id.get() or str(uuid.uuid4())
|
||||
_current_stream_run_id.set(run_id)
|
||||
|
||||
sync_queue: queue.Queue[StreamChunk | None | Exception] = queue.Queue()
|
||||
|
||||
@@ -907,14 +907,15 @@ class TestStreamingRunIsolation:
|
||||
assert item.content == "own-chunk"
|
||||
|
||||
def test_concurrent_streaming_states_do_not_cross_contaminate(self) -> None:
|
||||
"""Two streaming states created concurrently must each receive only
|
||||
their own events, even though both handlers are registered on the
|
||||
same global event bus.
|
||||
"""Two streaming states created in separate contexts (simulating
|
||||
concurrent runs) must each receive only their own events, even
|
||||
though both handlers are registered on the same global event bus.
|
||||
"""
|
||||
import threading
|
||||
import contextvars
|
||||
|
||||
from crewai.utilities.streaming import (
|
||||
create_streaming_state,
|
||||
_current_stream_run_id,
|
||||
TaskInfo,
|
||||
_unregister_handler,
|
||||
)
|
||||
@@ -934,21 +935,19 @@ class TestStreamingRunIsolation:
|
||||
"agent_id": "aid-b",
|
||||
}
|
||||
|
||||
state_a = create_streaming_state(task_a, [])
|
||||
run_id_a = state_a.sync_queue # we'll read from this queue
|
||||
def _create_in_fresh_context(
|
||||
task_info: TaskInfo,
|
||||
) -> "StreamingState":
|
||||
"""Reset the run_id contextvar and create streaming state."""
|
||||
_current_stream_run_id.set(None)
|
||||
return create_streaming_state(task_info, [])
|
||||
|
||||
state_b = create_streaming_state(task_b, [])
|
||||
run_id_b = state_b.sync_queue
|
||||
|
||||
# We need the run_ids that were generated. They were set on the
|
||||
# contextvar but we can infer them by emitting known events.
|
||||
# Instead, peek at the handler closure – or simply emit tagged events
|
||||
# directly and check which queue gets them.
|
||||
|
||||
# Emit event tagged for state_a's run.
|
||||
# We need the run_id. Retrieve it by inspecting the handler's closure.
|
||||
import types
|
||||
# Create each streaming state in a *separate* context so they get
|
||||
# distinct run_ids (simulates truly concurrent runs).
|
||||
state_a = contextvars.copy_context().run(_create_in_fresh_context, task_a)
|
||||
state_b = contextvars.copy_context().run(_create_in_fresh_context, task_b)
|
||||
|
||||
# Extract run_ids from handler closures.
|
||||
def _get_run_id_from_handler(handler: Any) -> str | None:
|
||||
"""Extract the run_id captured in the handler closure."""
|
||||
fn = handler
|
||||
|
||||
Reference in New Issue
Block a user