diff --git a/lib/crewai/src/crewai/events/types/llm_events.py b/lib/crewai/src/crewai/events/types/llm_events.py index b138f908c..0e5c22fe7 100644 --- a/lib/crewai/src/crewai/events/types/llm_events.py +++ b/lib/crewai/src/crewai/events/types/llm_events.py @@ -87,6 +87,7 @@ class LLMStreamChunkEvent(LLMEventBase): tool_call: ToolCall | None = None call_type: LLMCallType | None = None response_id: str | None = None + run_id: str | None = None class LLMThinkingChunkEvent(LLMEventBase): diff --git a/lib/crewai/src/crewai/llm.py b/lib/crewai/src/crewai/llm.py index e6f5cc68b..886ffbf05 100644 --- a/lib/crewai/src/crewai/llm.py +++ b/lib/crewai/src/crewai/llm.py @@ -38,6 +38,7 @@ from crewai.llms.base_llm import ( get_current_call_id, llm_call_context, ) +from crewai.utilities.streaming import get_current_stream_run_id from crewai.llms.constants import ( ANTHROPIC_MODELS, AZURE_MODELS, @@ -790,6 +791,7 @@ class LLM(BaseLLM): call_type=LLMCallType.LLM_CALL, response_id=response_id, call_id=get_current_call_id(), + run_id=get_current_stream_run_id(), ), ) # --- 4) Fallback to non-streaming if no content received @@ -1003,6 +1005,7 @@ class LLM(BaseLLM): call_type=LLMCallType.TOOL_CALL, response_id=response_id, call_id=get_current_call_id(), + run_id=get_current_stream_run_id(), ), ) @@ -1456,6 +1459,7 @@ class LLM(BaseLLM): from_agent=from_agent, response_id=response_id, call_id=get_current_call_id(), + run_id=get_current_stream_run_id(), ), ) diff --git a/lib/crewai/src/crewai/llms/base_llm.py b/lib/crewai/src/crewai/llms/base_llm.py index 41ce1d2cd..19de9ae47 100644 --- a/lib/crewai/src/crewai/llms/base_llm.py +++ b/lib/crewai/src/crewai/llms/base_llm.py @@ -36,6 +36,7 @@ from crewai.events.types.llm_events import ( LLMStreamChunkEvent, LLMThinkingChunkEvent, ) +from crewai.utilities.streaming import get_current_stream_run_id from crewai.events.types.tool_usage_events import ( ToolUsageErrorEvent, ToolUsageFinishedEvent, @@ -527,6 +528,7 @@ class BaseLLM(BaseModel, ABC): call_type=call_type, response_id=response_id, call_id=get_current_call_id(), + run_id=get_current_stream_run_id(), ), ) diff --git a/lib/crewai/src/crewai/utilities/streaming.py b/lib/crewai/src/crewai/utilities/streaming.py index 008144bff..78c5c0313 100644 --- a/lib/crewai/src/crewai/utilities/streaming.py +++ b/lib/crewai/src/crewai/utilities/streaming.py @@ -7,6 +7,7 @@ import logging import queue import threading from typing import Any, NamedTuple +import uuid from typing_extensions import TypedDict @@ -25,6 +26,17 @@ from crewai.utilities.string_utils import sanitize_tool_name logger = logging.getLogger(__name__) +# ContextVar that tracks the current streaming run_id. +# Set by create_streaming_state() so that LLM emit paths can stamp events. +_current_stream_run_id: contextvars.ContextVar[str | None] = contextvars.ContextVar( + "_current_stream_run_id", default=None +) + + +def get_current_stream_run_id() -> str | None: + """Return the active streaming run_id for the current context, if any.""" + return _current_stream_run_id.get() + class TaskInfo(TypedDict): """Task context information for streaming.""" @@ -106,6 +118,7 @@ def _create_stream_handler( sync_queue: queue.Queue[StreamChunk | None | Exception], async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None, loop: asyncio.AbstractEventLoop | None = None, + run_id: str | None = None, ) -> Callable[[Any, BaseEvent], None]: """Create a stream handler function. @@ -114,6 +127,9 @@ def _create_stream_handler( sync_queue: Synchronous queue for chunks. async_queue: Optional async queue for chunks. loop: Optional event loop for async operations. + run_id: Unique identifier for this streaming run. When set, the handler + only accepts events whose ``run_id`` matches, preventing cross-run + chunk contamination in concurrent streaming scenarios. Returns: Handler function that can be registered with the event bus. @@ -129,6 +145,10 @@ def _create_stream_handler( if not isinstance(event, LLMStreamChunkEvent): return + # Filter: only accept events belonging to this streaming run. + if run_id is not None and event.run_id is not None and event.run_id != run_id: + return + chunk = _create_stream_chunk(event, current_task_info) if async_queue is not None and loop is not None: @@ -187,6 +207,12 @@ def create_streaming_state( ) -> StreamingState: """Create and register streaming state. + Each call generates a unique ``run_id`` that is: + * stored in a ``contextvars.ContextVar`` so that downstream LLM emit + paths can stamp ``LLMStreamChunkEvent.run_id`` automatically, and + * passed to the stream handler so it only accepts events with a + matching ``run_id``, preventing cross-run chunk contamination. + Args: current_task_info: Task context info. result_holder: List to hold the final result. @@ -195,6 +221,9 @@ def create_streaming_state( Returns: Initialized StreamingState with registered handler. """ + run_id = str(uuid.uuid4()) + _current_stream_run_id.set(run_id) + sync_queue: queue.Queue[StreamChunk | None | Exception] = queue.Queue() async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None loop: asyncio.AbstractEventLoop | None = None @@ -203,7 +232,9 @@ def create_streaming_state( async_queue = asyncio.Queue() loop = asyncio.get_event_loop() - handler = _create_stream_handler(current_task_info, sync_queue, async_queue, loop) + handler = _create_stream_handler( + current_task_info, sync_queue, async_queue, loop, run_id=run_id + ) crewai_event_bus.register_handler(LLMStreamChunkEvent, handler) return StreamingState( diff --git a/lib/crewai/tests/test_streaming.py b/lib/crewai/tests/test_streaming.py index 7b1c8e1ba..382f1cb36 100644 --- a/lib/crewai/tests/test_streaming.py +++ b/lib/crewai/tests/test_streaming.py @@ -861,6 +861,266 @@ class TestStreamingCancellation: assert not streaming.is_cancelled +class TestStreamingRunIsolation: + """Tests for concurrent streaming run isolation (issue #5376). + + The singleton event bus fans out events to all registered handlers. + Without run_id scoping, concurrent streaming runs receive each other's + chunks. These tests verify that the run_id filtering prevents + cross-run chunk contamination. + """ + + def test_handler_ignores_events_from_different_run(self) -> None: + """A handler with run_id must reject events carrying a different run_id.""" + import queue as _queue + + from crewai.utilities.streaming import _create_stream_handler, TaskInfo + + task_info: TaskInfo = { + "index": 0, + "name": "task-a", + "id": "tid-a", + "agent_role": "Agent", + "agent_id": "aid-a", + } + q: _queue.Queue[StreamChunk | None | Exception] = _queue.Queue() + handler = _create_stream_handler(task_info, q, run_id="run-A") + + # Event from a *different* run – must be silently dropped. + foreign_event = LLMStreamChunkEvent( + chunk="foreign-chunk", + call_id="cid", + run_id="run-B", + ) + handler(None, foreign_event) + assert q.empty(), "Handler must not enqueue events from a different run_id" + + # Event from the *same* run – must be enqueued. + own_event = LLMStreamChunkEvent( + chunk="own-chunk", + call_id="cid", + run_id="run-A", + ) + handler(None, own_event) + assert not q.empty(), "Handler must enqueue events with matching run_id" + item = q.get_nowait() + assert item.content == "own-chunk" + + def test_concurrent_streaming_states_do_not_cross_contaminate(self) -> None: + """Two streaming states created concurrently must each receive only + their own events, even though both handlers are registered on the + same global event bus. + """ + import threading + + from crewai.utilities.streaming import ( + create_streaming_state, + TaskInfo, + _unregister_handler, + ) + + task_a: TaskInfo = { + "index": 0, + "name": "task-a", + "id": "tid-a", + "agent_role": "Agent-A", + "agent_id": "aid-a", + } + task_b: TaskInfo = { + "index": 1, + "name": "task-b", + "id": "tid-b", + "agent_role": "Agent-B", + "agent_id": "aid-b", + } + + state_a = create_streaming_state(task_a, []) + run_id_a = state_a.sync_queue # we'll read from this queue + + state_b = create_streaming_state(task_b, []) + run_id_b = state_b.sync_queue + + # We need the run_ids that were generated. They were set on the + # contextvar but we can infer them by emitting known events. + # Instead, peek at the handler closure – or simply emit tagged events + # directly and check which queue gets them. + + # Emit event tagged for state_a's run. + # We need the run_id. Retrieve it by inspecting the handler's closure. + import types + + def _get_run_id_from_handler(handler: Any) -> str | None: + """Extract the run_id captured in the handler closure.""" + fn = handler + if hasattr(fn, "__wrapped__"): + fn = fn.__wrapped__ + for cell in (fn.__closure__ or []): + try: + val = cell.cell_contents + if isinstance(val, str) and len(val) == 36 and val.count("-") == 4: + return val + except ValueError: + continue + return None + + rid_a = _get_run_id_from_handler(state_a.handler) + rid_b = _get_run_id_from_handler(state_b.handler) + assert rid_a is not None and rid_b is not None + assert rid_a != rid_b, "Each streaming state must have a unique run_id" + + # Emit events for run A. + for i in range(3): + crewai_event_bus.emit( + self, + LLMStreamChunkEvent( + chunk=f"A-{i}", + call_id="cid-a", + run_id=rid_a, + ), + ) + + # Emit events for run B. + for i in range(3): + crewai_event_bus.emit( + self, + LLMStreamChunkEvent( + chunk=f"B-{i}", + call_id="cid-b", + run_id=rid_b, + ), + ) + + # Drain queues. + chunks_a = [] + while not state_a.sync_queue.empty(): + chunks_a.append(state_a.sync_queue.get_nowait()) + + chunks_b = [] + while not state_b.sync_queue.empty(): + chunks_b.append(state_b.sync_queue.get_nowait()) + + # Verify isolation. + contents_a = [c.content for c in chunks_a] + contents_b = [c.content for c in chunks_b] + + assert contents_a == ["A-0", "A-1", "A-2"], ( + f"State A must only contain its own chunks, got {contents_a}" + ) + assert contents_b == ["B-0", "B-1", "B-2"], ( + f"State B must only contain its own chunks, got {contents_b}" + ) + + # No cross-contamination. + for c in contents_a: + assert not c.startswith("B-"), f"Run A received run B chunk: {c}" + for c in contents_b: + assert not c.startswith("A-"), f"Run B received run A chunk: {c}" + + # Cleanup. + _unregister_handler(state_a.handler) + _unregister_handler(state_b.handler) + + def test_concurrent_threads_isolated(self) -> None: + """Simulate two concurrent streaming runs in separate threads and + verify that each collects only its own chunks. + """ + import contextvars + import threading + import time + + from crewai.utilities.streaming import ( + create_streaming_state, + get_current_stream_run_id, + TaskInfo, + _unregister_handler, + ) + + results: dict[str, list[str]] = {"A": [], "B": []} + errors: list[Exception] = [] + + def run_streaming(label: str, task_info: TaskInfo) -> None: + try: + state = create_streaming_state(task_info, []) + run_id = get_current_stream_run_id() + assert run_id is not None + + # Simulate LLM emitting chunks stamped with this run's id. + for i in range(5): + crewai_event_bus.emit( + None, + LLMStreamChunkEvent( + chunk=f"{label}-{i}", + call_id=f"cid-{label}", + run_id=run_id, + ), + ) + time.sleep(0.005) + + # Drain the queue. + while not state.sync_queue.empty(): + item = state.sync_queue.get_nowait() + results[label].append(item.content) + + _unregister_handler(state.handler) + except Exception as exc: + errors.append(exc) + + task_a: TaskInfo = { + "index": 0, + "name": "task-a", + "id": "tid-a", + "agent_role": "Agent-A", + "agent_id": "aid-a", + } + task_b: TaskInfo = { + "index": 1, + "name": "task-b", + "id": "tid-b", + "agent_role": "Agent-B", + "agent_id": "aid-b", + } + + t_a = threading.Thread(target=run_streaming, args=("A", task_a)) + t_b = threading.Thread(target=run_streaming, args=("B", task_b)) + + t_a.start() + t_b.start() + t_a.join(timeout=10) + t_b.join(timeout=10) + + assert not errors, f"Threads raised errors: {errors}" + + # Each thread must see only its own chunks. + for c in results["A"]: + assert c.startswith("A-"), f"Run A received foreign chunk: {c}" + for c in results["B"]: + assert c.startswith("B-"), f"Run B received foreign chunk: {c}" + + assert len(results["A"]) == 5, ( + f"Run A expected 5 chunks, got {len(results['A'])}: {results['A']}" + ) + assert len(results["B"]) == 5, ( + f"Run B expected 5 chunks, got {len(results['B'])}: {results['B']}" + ) + + def test_run_id_stamped_on_llm_stream_chunk_event(self) -> None: + """Verify that LLMStreamChunkEvent accepts and stores run_id.""" + event = LLMStreamChunkEvent( + chunk="test", + call_id="cid", + run_id="my-run-id", + ) + assert event.run_id == "my-run-id" + + def test_run_id_defaults_to_none(self) -> None: + """Verify that run_id defaults to None when not provided.""" + event = LLMStreamChunkEvent( + chunk="test", + call_id="cid", + ) + assert event.run_id is None + + class TestStreamingImports: """Tests for correct imports of streaming types."""