Compare commits

...

2 Commits

Author SHA1 Message Date
Devin AI
0a22307962 fix: reuse parent run_id in nested streaming and fix test context isolation
Co-Authored-By: João <joao@crewai.com>
2026-04-09 06:06:12 +00:00
Devin AI
5511f0a089 fix: add run_id scoping to streaming handlers to prevent cross-run chunk contamination (#5376)
The singleton event bus fans out LLMStreamChunkEvent to all registered
handlers. When multiple streaming runs execute concurrently, each run's
handler receives chunks from all other runs, causing cross-run chunk
contamination.

Fix:
- Add run_id field to LLMStreamChunkEvent
- Use contextvars.ContextVar to track the current streaming run_id
- create_streaming_state() generates a unique run_id per run and sets it
  in the context var
- LLM emit paths (base_llm.py, llm.py) stamp run_id on emitted events
- Stream handler filters events by matching run_id

Tests:
- Handler rejects events from different run_id
- Two concurrent streaming states receive only their own events
- Two concurrent threads with isolated contextvars receive only their
  own chunks
- run_id field defaults to None for backward compatibility

Co-Authored-By: João <joao@crewai.com>
2026-04-09 05:56:47 +00:00
5 changed files with 302 additions and 1 deletions

View File

@@ -87,6 +87,7 @@ class LLMStreamChunkEvent(LLMEventBase):
tool_call: ToolCall | None = None
call_type: LLMCallType | None = None
response_id: str | None = None
run_id: str | None = None
class LLMThinkingChunkEvent(LLMEventBase):

View File

@@ -38,6 +38,7 @@ from crewai.llms.base_llm import (
get_current_call_id,
llm_call_context,
)
from crewai.utilities.streaming import get_current_stream_run_id
from crewai.llms.constants import (
ANTHROPIC_MODELS,
AZURE_MODELS,
@@ -790,6 +791,7 @@ class LLM(BaseLLM):
call_type=LLMCallType.LLM_CALL,
response_id=response_id,
call_id=get_current_call_id(),
run_id=get_current_stream_run_id(),
),
)
# --- 4) Fallback to non-streaming if no content received
@@ -1003,6 +1005,7 @@ class LLM(BaseLLM):
call_type=LLMCallType.TOOL_CALL,
response_id=response_id,
call_id=get_current_call_id(),
run_id=get_current_stream_run_id(),
),
)
@@ -1456,6 +1459,7 @@ class LLM(BaseLLM):
from_agent=from_agent,
response_id=response_id,
call_id=get_current_call_id(),
run_id=get_current_stream_run_id(),
),
)

View File

@@ -36,6 +36,7 @@ from crewai.events.types.llm_events import (
LLMStreamChunkEvent,
LLMThinkingChunkEvent,
)
from crewai.utilities.streaming import get_current_stream_run_id
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
@@ -527,6 +528,7 @@ class BaseLLM(BaseModel, ABC):
call_type=call_type,
response_id=response_id,
call_id=get_current_call_id(),
run_id=get_current_stream_run_id(),
),
)

View File

@@ -7,6 +7,7 @@ import logging
import queue
import threading
from typing import Any, NamedTuple
import uuid
from typing_extensions import TypedDict
@@ -25,6 +26,17 @@ from crewai.utilities.string_utils import sanitize_tool_name
logger = logging.getLogger(__name__)
# ContextVar that tracks the current streaming run_id.
# Set by create_streaming_state() so that LLM emit paths can stamp events.
_current_stream_run_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"_current_stream_run_id", default=None
)
def get_current_stream_run_id() -> str | None:
"""Return the active streaming run_id for the current context, if any."""
return _current_stream_run_id.get()
class TaskInfo(TypedDict):
"""Task context information for streaming."""
@@ -106,6 +118,7 @@ def _create_stream_handler(
sync_queue: queue.Queue[StreamChunk | None | Exception],
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None,
loop: asyncio.AbstractEventLoop | None = None,
run_id: str | None = None,
) -> Callable[[Any, BaseEvent], None]:
"""Create a stream handler function.
@@ -114,6 +127,9 @@ def _create_stream_handler(
sync_queue: Synchronous queue for chunks.
async_queue: Optional async queue for chunks.
loop: Optional event loop for async operations.
run_id: Unique identifier for this streaming run. When set, the handler
only accepts events whose ``run_id`` matches, preventing cross-run
chunk contamination in concurrent streaming scenarios.
Returns:
Handler function that can be registered with the event bus.
@@ -129,6 +145,10 @@ def _create_stream_handler(
if not isinstance(event, LLMStreamChunkEvent):
return
# Filter: only accept events belonging to this streaming run.
if run_id is not None and event.run_id is not None and event.run_id != run_id:
return
chunk = _create_stream_chunk(event, current_task_info)
if async_queue is not None and loop is not None:
@@ -187,6 +207,16 @@ def create_streaming_state(
) -> StreamingState:
"""Create and register streaming state.
Each call assigns a ``run_id`` that is:
* stored in a ``contextvars.ContextVar`` so that downstream LLM emit
paths can stamp ``LLMStreamChunkEvent.run_id`` automatically, and
* passed to the stream handler so it only accepts events with a
matching ``run_id``, preventing cross-run chunk contamination.
If the current context already carries a ``run_id`` (e.g. a parent
flow already created a streaming state), the existing value is reused
so that nested streaming (flow → crew) shares the same scope.
Args:
current_task_info: Task context info.
result_holder: List to hold the final result.
@@ -195,6 +225,9 @@ def create_streaming_state(
Returns:
Initialized StreamingState with registered handler.
"""
run_id = _current_stream_run_id.get() or str(uuid.uuid4())
_current_stream_run_id.set(run_id)
sync_queue: queue.Queue[StreamChunk | None | Exception] = queue.Queue()
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None
loop: asyncio.AbstractEventLoop | None = None
@@ -203,7 +236,9 @@ def create_streaming_state(
async_queue = asyncio.Queue()
loop = asyncio.get_event_loop()
handler = _create_stream_handler(current_task_info, sync_queue, async_queue, loop)
handler = _create_stream_handler(
current_task_info, sync_queue, async_queue, loop, run_id=run_id
)
crewai_event_bus.register_handler(LLMStreamChunkEvent, handler)
return StreamingState(

View File

@@ -861,6 +861,265 @@ class TestStreamingCancellation:
assert not streaming.is_cancelled
class TestStreamingRunIsolation:
"""Tests for concurrent streaming run isolation (issue #5376).
The singleton event bus fans out events to all registered handlers.
Without run_id scoping, concurrent streaming runs receive each other's
chunks. These tests verify that the run_id filtering prevents
cross-run chunk contamination.
"""
def test_handler_ignores_events_from_different_run(self) -> None:
"""A handler with run_id must reject events carrying a different run_id."""
import queue as _queue
from crewai.utilities.streaming import _create_stream_handler, TaskInfo
task_info: TaskInfo = {
"index": 0,
"name": "task-a",
"id": "tid-a",
"agent_role": "Agent",
"agent_id": "aid-a",
}
q: _queue.Queue[StreamChunk | None | Exception] = _queue.Queue()
handler = _create_stream_handler(task_info, q, run_id="run-A")
# Event from a *different* run must be silently dropped.
foreign_event = LLMStreamChunkEvent(
chunk="foreign-chunk",
call_id="cid",
run_id="run-B",
)
handler(None, foreign_event)
assert q.empty(), "Handler must not enqueue events from a different run_id"
# Event from the *same* run must be enqueued.
own_event = LLMStreamChunkEvent(
chunk="own-chunk",
call_id="cid",
run_id="run-A",
)
handler(None, own_event)
assert not q.empty(), "Handler must enqueue events with matching run_id"
item = q.get_nowait()
assert item.content == "own-chunk"
def test_concurrent_streaming_states_do_not_cross_contaminate(self) -> None:
"""Two streaming states created in separate contexts (simulating
concurrent runs) must each receive only their own events, even
though both handlers are registered on the same global event bus.
"""
import contextvars
from crewai.utilities.streaming import (
create_streaming_state,
_current_stream_run_id,
TaskInfo,
_unregister_handler,
)
task_a: TaskInfo = {
"index": 0,
"name": "task-a",
"id": "tid-a",
"agent_role": "Agent-A",
"agent_id": "aid-a",
}
task_b: TaskInfo = {
"index": 1,
"name": "task-b",
"id": "tid-b",
"agent_role": "Agent-B",
"agent_id": "aid-b",
}
def _create_in_fresh_context(
task_info: TaskInfo,
) -> "StreamingState":
"""Reset the run_id contextvar and create streaming state."""
_current_stream_run_id.set(None)
return create_streaming_state(task_info, [])
# Create each streaming state in a *separate* context so they get
# distinct run_ids (simulates truly concurrent runs).
state_a = contextvars.copy_context().run(_create_in_fresh_context, task_a)
state_b = contextvars.copy_context().run(_create_in_fresh_context, task_b)
# Extract run_ids from handler closures.
def _get_run_id_from_handler(handler: Any) -> str | None:
"""Extract the run_id captured in the handler closure."""
fn = handler
if hasattr(fn, "__wrapped__"):
fn = fn.__wrapped__
for cell in (fn.__closure__ or []):
try:
val = cell.cell_contents
if isinstance(val, str) and len(val) == 36 and val.count("-") == 4:
return val
except ValueError:
continue
return None
rid_a = _get_run_id_from_handler(state_a.handler)
rid_b = _get_run_id_from_handler(state_b.handler)
assert rid_a is not None and rid_b is not None
assert rid_a != rid_b, "Each streaming state must have a unique run_id"
# Emit events for run A.
for i in range(3):
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
chunk=f"A-{i}",
call_id="cid-a",
run_id=rid_a,
),
)
# Emit events for run B.
for i in range(3):
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
chunk=f"B-{i}",
call_id="cid-b",
run_id=rid_b,
),
)
# Drain queues.
chunks_a = []
while not state_a.sync_queue.empty():
chunks_a.append(state_a.sync_queue.get_nowait())
chunks_b = []
while not state_b.sync_queue.empty():
chunks_b.append(state_b.sync_queue.get_nowait())
# Verify isolation.
contents_a = [c.content for c in chunks_a]
contents_b = [c.content for c in chunks_b]
assert contents_a == ["A-0", "A-1", "A-2"], (
f"State A must only contain its own chunks, got {contents_a}"
)
assert contents_b == ["B-0", "B-1", "B-2"], (
f"State B must only contain its own chunks, got {contents_b}"
)
# No cross-contamination.
for c in contents_a:
assert not c.startswith("B-"), f"Run A received run B chunk: {c}"
for c in contents_b:
assert not c.startswith("A-"), f"Run B received run A chunk: {c}"
# Cleanup.
_unregister_handler(state_a.handler)
_unregister_handler(state_b.handler)
def test_concurrent_threads_isolated(self) -> None:
"""Simulate two concurrent streaming runs in separate threads and
verify that each collects only its own chunks.
"""
import contextvars
import threading
import time
from crewai.utilities.streaming import (
create_streaming_state,
get_current_stream_run_id,
TaskInfo,
_unregister_handler,
)
results: dict[str, list[str]] = {"A": [], "B": []}
errors: list[Exception] = []
def run_streaming(label: str, task_info: TaskInfo) -> None:
try:
state = create_streaming_state(task_info, [])
run_id = get_current_stream_run_id()
assert run_id is not None
# Simulate LLM emitting chunks stamped with this run's id.
for i in range(5):
crewai_event_bus.emit(
None,
LLMStreamChunkEvent(
chunk=f"{label}-{i}",
call_id=f"cid-{label}",
run_id=run_id,
),
)
time.sleep(0.005)
# Drain the queue.
while not state.sync_queue.empty():
item = state.sync_queue.get_nowait()
results[label].append(item.content)
_unregister_handler(state.handler)
except Exception as exc:
errors.append(exc)
task_a: TaskInfo = {
"index": 0,
"name": "task-a",
"id": "tid-a",
"agent_role": "Agent-A",
"agent_id": "aid-a",
}
task_b: TaskInfo = {
"index": 1,
"name": "task-b",
"id": "tid-b",
"agent_role": "Agent-B",
"agent_id": "aid-b",
}
t_a = threading.Thread(target=run_streaming, args=("A", task_a))
t_b = threading.Thread(target=run_streaming, args=("B", task_b))
t_a.start()
t_b.start()
t_a.join(timeout=10)
t_b.join(timeout=10)
assert not errors, f"Threads raised errors: {errors}"
# Each thread must see only its own chunks.
for c in results["A"]:
assert c.startswith("A-"), f"Run A received foreign chunk: {c}"
for c in results["B"]:
assert c.startswith("B-"), f"Run B received foreign chunk: {c}"
assert len(results["A"]) == 5, (
f"Run A expected 5 chunks, got {len(results['A'])}: {results['A']}"
)
assert len(results["B"]) == 5, (
f"Run B expected 5 chunks, got {len(results['B'])}: {results['B']}"
)
def test_run_id_stamped_on_llm_stream_chunk_event(self) -> None:
"""Verify that LLMStreamChunkEvent accepts and stores run_id."""
event = LLMStreamChunkEvent(
chunk="test",
call_id="cid",
run_id="my-run-id",
)
assert event.run_id == "my-run-id"
def test_run_id_defaults_to_none(self) -> None:
"""Verify that run_id defaults to None when not provided."""
event = LLMStreamChunkEvent(
chunk="test",
call_id="cid",
)
assert event.run_id is None
class TestStreamingImports:
"""Tests for correct imports of streaming types."""