fix: add run_id scoping to streaming handlers to prevent cross-run chunk contamination (#5376)

The singleton event bus fans out LLMStreamChunkEvent to all registered
handlers. When multiple streaming runs execute concurrently, each run's
handler receives chunks from all other runs, causing cross-run chunk
contamination.

Fix:
- Add run_id field to LLMStreamChunkEvent
- Use contextvars.ContextVar to track the current streaming run_id
- create_streaming_state() generates a unique run_id per run and sets it
  in the context var
- LLM emit paths (base_llm.py, llm.py) stamp run_id on emitted events
- Stream handler filters events by matching run_id

Tests:
- Handler rejects events from different run_id
- Two concurrent streaming states receive only their own events
- Two concurrent threads with isolated contextvars receive only their
  own chunks
- run_id field defaults to None for backward compatibility

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2026-04-09 05:56:47 +00:00
parent 06fe163611
commit 5511f0a089
5 changed files with 299 additions and 1 deletions

View File

@@ -87,6 +87,7 @@ class LLMStreamChunkEvent(LLMEventBase):
tool_call: ToolCall | None = None
call_type: LLMCallType | None = None
response_id: str | None = None
run_id: str | None = None
class LLMThinkingChunkEvent(LLMEventBase):

View File

@@ -38,6 +38,7 @@ from crewai.llms.base_llm import (
get_current_call_id,
llm_call_context,
)
from crewai.utilities.streaming import get_current_stream_run_id
from crewai.llms.constants import (
ANTHROPIC_MODELS,
AZURE_MODELS,
@@ -790,6 +791,7 @@ class LLM(BaseLLM):
call_type=LLMCallType.LLM_CALL,
response_id=response_id,
call_id=get_current_call_id(),
run_id=get_current_stream_run_id(),
),
)
# --- 4) Fallback to non-streaming if no content received
@@ -1003,6 +1005,7 @@ class LLM(BaseLLM):
call_type=LLMCallType.TOOL_CALL,
response_id=response_id,
call_id=get_current_call_id(),
run_id=get_current_stream_run_id(),
),
)
@@ -1456,6 +1459,7 @@ class LLM(BaseLLM):
from_agent=from_agent,
response_id=response_id,
call_id=get_current_call_id(),
run_id=get_current_stream_run_id(),
),
)

View File

@@ -36,6 +36,7 @@ from crewai.events.types.llm_events import (
LLMStreamChunkEvent,
LLMThinkingChunkEvent,
)
from crewai.utilities.streaming import get_current_stream_run_id
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
@@ -527,6 +528,7 @@ class BaseLLM(BaseModel, ABC):
call_type=call_type,
response_id=response_id,
call_id=get_current_call_id(),
run_id=get_current_stream_run_id(),
),
)

View File

@@ -7,6 +7,7 @@ import logging
import queue
import threading
from typing import Any, NamedTuple
import uuid
from typing_extensions import TypedDict
@@ -25,6 +26,17 @@ from crewai.utilities.string_utils import sanitize_tool_name
logger = logging.getLogger(__name__)
# ContextVar that tracks the current streaming run_id.
# Set by create_streaming_state() so that LLM emit paths can stamp events.
_current_stream_run_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"_current_stream_run_id", default=None
)
def get_current_stream_run_id() -> str | None:
"""Return the active streaming run_id for the current context, if any."""
return _current_stream_run_id.get()
class TaskInfo(TypedDict):
"""Task context information for streaming."""
@@ -106,6 +118,7 @@ def _create_stream_handler(
sync_queue: queue.Queue[StreamChunk | None | Exception],
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None,
loop: asyncio.AbstractEventLoop | None = None,
run_id: str | None = None,
) -> Callable[[Any, BaseEvent], None]:
"""Create a stream handler function.
@@ -114,6 +127,9 @@ def _create_stream_handler(
sync_queue: Synchronous queue for chunks.
async_queue: Optional async queue for chunks.
loop: Optional event loop for async operations.
run_id: Unique identifier for this streaming run. When set, the handler
only accepts events whose ``run_id`` matches, preventing cross-run
chunk contamination in concurrent streaming scenarios.
Returns:
Handler function that can be registered with the event bus.
@@ -129,6 +145,10 @@ def _create_stream_handler(
if not isinstance(event, LLMStreamChunkEvent):
return
# Filter: only accept events belonging to this streaming run.
if run_id is not None and event.run_id is not None and event.run_id != run_id:
return
chunk = _create_stream_chunk(event, current_task_info)
if async_queue is not None and loop is not None:
@@ -187,6 +207,12 @@ def create_streaming_state(
) -> StreamingState:
"""Create and register streaming state.
Each call generates a unique ``run_id`` that is:
* stored in a ``contextvars.ContextVar`` so that downstream LLM emit
paths can stamp ``LLMStreamChunkEvent.run_id`` automatically, and
* passed to the stream handler so it only accepts events with a
matching ``run_id``, preventing cross-run chunk contamination.
Args:
current_task_info: Task context info.
result_holder: List to hold the final result.
@@ -195,6 +221,9 @@ def create_streaming_state(
Returns:
Initialized StreamingState with registered handler.
"""
run_id = str(uuid.uuid4())
_current_stream_run_id.set(run_id)
sync_queue: queue.Queue[StreamChunk | None | Exception] = queue.Queue()
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None
loop: asyncio.AbstractEventLoop | None = None
@@ -203,7 +232,9 @@ def create_streaming_state(
async_queue = asyncio.Queue()
loop = asyncio.get_event_loop()
handler = _create_stream_handler(current_task_info, sync_queue, async_queue, loop)
handler = _create_stream_handler(
current_task_info, sync_queue, async_queue, loop, run_id=run_id
)
crewai_event_bus.register_handler(LLMStreamChunkEvent, handler)
return StreamingState(

View File

@@ -861,6 +861,266 @@ class TestStreamingCancellation:
assert not streaming.is_cancelled
class TestStreamingRunIsolation:
"""Tests for concurrent streaming run isolation (issue #5376).
The singleton event bus fans out events to all registered handlers.
Without run_id scoping, concurrent streaming runs receive each other's
chunks. These tests verify that the run_id filtering prevents
cross-run chunk contamination.
"""
def test_handler_ignores_events_from_different_run(self) -> None:
"""A handler with run_id must reject events carrying a different run_id."""
import queue as _queue
from crewai.utilities.streaming import _create_stream_handler, TaskInfo
task_info: TaskInfo = {
"index": 0,
"name": "task-a",
"id": "tid-a",
"agent_role": "Agent",
"agent_id": "aid-a",
}
q: _queue.Queue[StreamChunk | None | Exception] = _queue.Queue()
handler = _create_stream_handler(task_info, q, run_id="run-A")
# Event from a *different* run must be silently dropped.
foreign_event = LLMStreamChunkEvent(
chunk="foreign-chunk",
call_id="cid",
run_id="run-B",
)
handler(None, foreign_event)
assert q.empty(), "Handler must not enqueue events from a different run_id"
# Event from the *same* run must be enqueued.
own_event = LLMStreamChunkEvent(
chunk="own-chunk",
call_id="cid",
run_id="run-A",
)
handler(None, own_event)
assert not q.empty(), "Handler must enqueue events with matching run_id"
item = q.get_nowait()
assert item.content == "own-chunk"
def test_concurrent_streaming_states_do_not_cross_contaminate(self) -> None:
"""Two streaming states created concurrently must each receive only
their own events, even though both handlers are registered on the
same global event bus.
"""
import threading
from crewai.utilities.streaming import (
create_streaming_state,
TaskInfo,
_unregister_handler,
)
task_a: TaskInfo = {
"index": 0,
"name": "task-a",
"id": "tid-a",
"agent_role": "Agent-A",
"agent_id": "aid-a",
}
task_b: TaskInfo = {
"index": 1,
"name": "task-b",
"id": "tid-b",
"agent_role": "Agent-B",
"agent_id": "aid-b",
}
state_a = create_streaming_state(task_a, [])
run_id_a = state_a.sync_queue # we'll read from this queue
state_b = create_streaming_state(task_b, [])
run_id_b = state_b.sync_queue
# We need the run_ids that were generated. They were set on the
# contextvar but we can infer them by emitting known events.
# Instead, peek at the handler closure or simply emit tagged events
# directly and check which queue gets them.
# Emit event tagged for state_a's run.
# We need the run_id. Retrieve it by inspecting the handler's closure.
import types
def _get_run_id_from_handler(handler: Any) -> str | None:
"""Extract the run_id captured in the handler closure."""
fn = handler
if hasattr(fn, "__wrapped__"):
fn = fn.__wrapped__
for cell in (fn.__closure__ or []):
try:
val = cell.cell_contents
if isinstance(val, str) and len(val) == 36 and val.count("-") == 4:
return val
except ValueError:
continue
return None
rid_a = _get_run_id_from_handler(state_a.handler)
rid_b = _get_run_id_from_handler(state_b.handler)
assert rid_a is not None and rid_b is not None
assert rid_a != rid_b, "Each streaming state must have a unique run_id"
# Emit events for run A.
for i in range(3):
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
chunk=f"A-{i}",
call_id="cid-a",
run_id=rid_a,
),
)
# Emit events for run B.
for i in range(3):
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
chunk=f"B-{i}",
call_id="cid-b",
run_id=rid_b,
),
)
# Drain queues.
chunks_a = []
while not state_a.sync_queue.empty():
chunks_a.append(state_a.sync_queue.get_nowait())
chunks_b = []
while not state_b.sync_queue.empty():
chunks_b.append(state_b.sync_queue.get_nowait())
# Verify isolation.
contents_a = [c.content for c in chunks_a]
contents_b = [c.content for c in chunks_b]
assert contents_a == ["A-0", "A-1", "A-2"], (
f"State A must only contain its own chunks, got {contents_a}"
)
assert contents_b == ["B-0", "B-1", "B-2"], (
f"State B must only contain its own chunks, got {contents_b}"
)
# No cross-contamination.
for c in contents_a:
assert not c.startswith("B-"), f"Run A received run B chunk: {c}"
for c in contents_b:
assert not c.startswith("A-"), f"Run B received run A chunk: {c}"
# Cleanup.
_unregister_handler(state_a.handler)
_unregister_handler(state_b.handler)
def test_concurrent_threads_isolated(self) -> None:
"""Simulate two concurrent streaming runs in separate threads and
verify that each collects only its own chunks.
"""
import contextvars
import threading
import time
from crewai.utilities.streaming import (
create_streaming_state,
get_current_stream_run_id,
TaskInfo,
_unregister_handler,
)
results: dict[str, list[str]] = {"A": [], "B": []}
errors: list[Exception] = []
def run_streaming(label: str, task_info: TaskInfo) -> None:
try:
state = create_streaming_state(task_info, [])
run_id = get_current_stream_run_id()
assert run_id is not None
# Simulate LLM emitting chunks stamped with this run's id.
for i in range(5):
crewai_event_bus.emit(
None,
LLMStreamChunkEvent(
chunk=f"{label}-{i}",
call_id=f"cid-{label}",
run_id=run_id,
),
)
time.sleep(0.005)
# Drain the queue.
while not state.sync_queue.empty():
item = state.sync_queue.get_nowait()
results[label].append(item.content)
_unregister_handler(state.handler)
except Exception as exc:
errors.append(exc)
task_a: TaskInfo = {
"index": 0,
"name": "task-a",
"id": "tid-a",
"agent_role": "Agent-A",
"agent_id": "aid-a",
}
task_b: TaskInfo = {
"index": 1,
"name": "task-b",
"id": "tid-b",
"agent_role": "Agent-B",
"agent_id": "aid-b",
}
t_a = threading.Thread(target=run_streaming, args=("A", task_a))
t_b = threading.Thread(target=run_streaming, args=("B", task_b))
t_a.start()
t_b.start()
t_a.join(timeout=10)
t_b.join(timeout=10)
assert not errors, f"Threads raised errors: {errors}"
# Each thread must see only its own chunks.
for c in results["A"]:
assert c.startswith("A-"), f"Run A received foreign chunk: {c}"
for c in results["B"]:
assert c.startswith("B-"), f"Run B received foreign chunk: {c}"
assert len(results["A"]) == 5, (
f"Run A expected 5 chunks, got {len(results['A'])}: {results['A']}"
)
assert len(results["B"]) == 5, (
f"Run B expected 5 chunks, got {len(results['B'])}: {results['B']}"
)
def test_run_id_stamped_on_llm_stream_chunk_event(self) -> None:
"""Verify that LLMStreamChunkEvent accepts and stores run_id."""
event = LLMStreamChunkEvent(
chunk="test",
call_id="cid",
run_id="my-run-id",
)
assert event.run_id == "my-run-id"
def test_run_id_defaults_to_none(self) -> None:
"""Verify that run_id defaults to None when not provided."""
event = LLMStreamChunkEvent(
chunk="test",
call_id="cid",
)
assert event.run_id is None
class TestStreamingImports:
"""Tests for correct imports of streaming types."""