mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-12 14:02:47 +00:00
Compare commits
2 Commits
chore/clea
...
devin/1775
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0a22307962 | ||
|
|
5511f0a089 |
@@ -87,6 +87,7 @@ class LLMStreamChunkEvent(LLMEventBase):
|
||||
tool_call: ToolCall | None = None
|
||||
call_type: LLMCallType | None = None
|
||||
response_id: str | None = None
|
||||
run_id: str | None = None
|
||||
|
||||
|
||||
class LLMThinkingChunkEvent(LLMEventBase):
|
||||
|
||||
@@ -38,6 +38,7 @@ from crewai.llms.base_llm import (
|
||||
get_current_call_id,
|
||||
llm_call_context,
|
||||
)
|
||||
from crewai.utilities.streaming import get_current_stream_run_id
|
||||
from crewai.llms.constants import (
|
||||
ANTHROPIC_MODELS,
|
||||
AZURE_MODELS,
|
||||
@@ -790,6 +791,7 @@ class LLM(BaseLLM):
|
||||
call_type=LLMCallType.LLM_CALL,
|
||||
response_id=response_id,
|
||||
call_id=get_current_call_id(),
|
||||
run_id=get_current_stream_run_id(),
|
||||
),
|
||||
)
|
||||
# --- 4) Fallback to non-streaming if no content received
|
||||
@@ -1003,6 +1005,7 @@ class LLM(BaseLLM):
|
||||
call_type=LLMCallType.TOOL_CALL,
|
||||
response_id=response_id,
|
||||
call_id=get_current_call_id(),
|
||||
run_id=get_current_stream_run_id(),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -1456,6 +1459,7 @@ class LLM(BaseLLM):
|
||||
from_agent=from_agent,
|
||||
response_id=response_id,
|
||||
call_id=get_current_call_id(),
|
||||
run_id=get_current_stream_run_id(),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ from crewai.events.types.llm_events import (
|
||||
LLMStreamChunkEvent,
|
||||
LLMThinkingChunkEvent,
|
||||
)
|
||||
from crewai.utilities.streaming import get_current_stream_run_id
|
||||
from crewai.events.types.tool_usage_events import (
|
||||
ToolUsageErrorEvent,
|
||||
ToolUsageFinishedEvent,
|
||||
@@ -527,6 +528,7 @@ class BaseLLM(BaseModel, ABC):
|
||||
call_type=call_type,
|
||||
response_id=response_id,
|
||||
call_id=get_current_call_id(),
|
||||
run_id=get_current_stream_run_id(),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import logging
|
||||
import queue
|
||||
import threading
|
||||
from typing import Any, NamedTuple
|
||||
import uuid
|
||||
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
@@ -25,6 +26,17 @@ from crewai.utilities.string_utils import sanitize_tool_name
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ContextVar that tracks the current streaming run_id.
|
||||
# Set by create_streaming_state() so that LLM emit paths can stamp events.
|
||||
_current_stream_run_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
|
||||
"_current_stream_run_id", default=None
|
||||
)
|
||||
|
||||
|
||||
def get_current_stream_run_id() -> str | None:
|
||||
"""Return the active streaming run_id for the current context, if any."""
|
||||
return _current_stream_run_id.get()
|
||||
|
||||
|
||||
class TaskInfo(TypedDict):
|
||||
"""Task context information for streaming."""
|
||||
@@ -106,6 +118,7 @@ def _create_stream_handler(
|
||||
sync_queue: queue.Queue[StreamChunk | None | Exception],
|
||||
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None,
|
||||
loop: asyncio.AbstractEventLoop | None = None,
|
||||
run_id: str | None = None,
|
||||
) -> Callable[[Any, BaseEvent], None]:
|
||||
"""Create a stream handler function.
|
||||
|
||||
@@ -114,6 +127,9 @@ def _create_stream_handler(
|
||||
sync_queue: Synchronous queue for chunks.
|
||||
async_queue: Optional async queue for chunks.
|
||||
loop: Optional event loop for async operations.
|
||||
run_id: Unique identifier for this streaming run. When set, the handler
|
||||
only accepts events whose ``run_id`` matches, preventing cross-run
|
||||
chunk contamination in concurrent streaming scenarios.
|
||||
|
||||
Returns:
|
||||
Handler function that can be registered with the event bus.
|
||||
@@ -129,6 +145,10 @@ def _create_stream_handler(
|
||||
if not isinstance(event, LLMStreamChunkEvent):
|
||||
return
|
||||
|
||||
# Filter: only accept events belonging to this streaming run.
|
||||
if run_id is not None and event.run_id is not None and event.run_id != run_id:
|
||||
return
|
||||
|
||||
chunk = _create_stream_chunk(event, current_task_info)
|
||||
|
||||
if async_queue is not None and loop is not None:
|
||||
@@ -187,6 +207,16 @@ def create_streaming_state(
|
||||
) -> StreamingState:
|
||||
"""Create and register streaming state.
|
||||
|
||||
Each call assigns a ``run_id`` that is:
|
||||
* stored in a ``contextvars.ContextVar`` so that downstream LLM emit
|
||||
paths can stamp ``LLMStreamChunkEvent.run_id`` automatically, and
|
||||
* passed to the stream handler so it only accepts events with a
|
||||
matching ``run_id``, preventing cross-run chunk contamination.
|
||||
|
||||
If the current context already carries a ``run_id`` (e.g. a parent
|
||||
flow already created a streaming state), the existing value is reused
|
||||
so that nested streaming (flow → crew) shares the same scope.
|
||||
|
||||
Args:
|
||||
current_task_info: Task context info.
|
||||
result_holder: List to hold the final result.
|
||||
@@ -195,6 +225,9 @@ def create_streaming_state(
|
||||
Returns:
|
||||
Initialized StreamingState with registered handler.
|
||||
"""
|
||||
run_id = _current_stream_run_id.get() or str(uuid.uuid4())
|
||||
_current_stream_run_id.set(run_id)
|
||||
|
||||
sync_queue: queue.Queue[StreamChunk | None | Exception] = queue.Queue()
|
||||
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None
|
||||
loop: asyncio.AbstractEventLoop | None = None
|
||||
@@ -203,7 +236,9 @@ def create_streaming_state(
|
||||
async_queue = asyncio.Queue()
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
handler = _create_stream_handler(current_task_info, sync_queue, async_queue, loop)
|
||||
handler = _create_stream_handler(
|
||||
current_task_info, sync_queue, async_queue, loop, run_id=run_id
|
||||
)
|
||||
crewai_event_bus.register_handler(LLMStreamChunkEvent, handler)
|
||||
|
||||
return StreamingState(
|
||||
|
||||
@@ -861,6 +861,265 @@ class TestStreamingCancellation:
|
||||
assert not streaming.is_cancelled
|
||||
|
||||
|
||||
class TestStreamingRunIsolation:
|
||||
"""Tests for concurrent streaming run isolation (issue #5376).
|
||||
|
||||
The singleton event bus fans out events to all registered handlers.
|
||||
Without run_id scoping, concurrent streaming runs receive each other's
|
||||
chunks. These tests verify that the run_id filtering prevents
|
||||
cross-run chunk contamination.
|
||||
"""
|
||||
|
||||
def test_handler_ignores_events_from_different_run(self) -> None:
|
||||
"""A handler with run_id must reject events carrying a different run_id."""
|
||||
import queue as _queue
|
||||
|
||||
from crewai.utilities.streaming import _create_stream_handler, TaskInfo
|
||||
|
||||
task_info: TaskInfo = {
|
||||
"index": 0,
|
||||
"name": "task-a",
|
||||
"id": "tid-a",
|
||||
"agent_role": "Agent",
|
||||
"agent_id": "aid-a",
|
||||
}
|
||||
q: _queue.Queue[StreamChunk | None | Exception] = _queue.Queue()
|
||||
handler = _create_stream_handler(task_info, q, run_id="run-A")
|
||||
|
||||
# Event from a *different* run – must be silently dropped.
|
||||
foreign_event = LLMStreamChunkEvent(
|
||||
chunk="foreign-chunk",
|
||||
call_id="cid",
|
||||
run_id="run-B",
|
||||
)
|
||||
handler(None, foreign_event)
|
||||
assert q.empty(), "Handler must not enqueue events from a different run_id"
|
||||
|
||||
# Event from the *same* run – must be enqueued.
|
||||
own_event = LLMStreamChunkEvent(
|
||||
chunk="own-chunk",
|
||||
call_id="cid",
|
||||
run_id="run-A",
|
||||
)
|
||||
handler(None, own_event)
|
||||
assert not q.empty(), "Handler must enqueue events with matching run_id"
|
||||
item = q.get_nowait()
|
||||
assert item.content == "own-chunk"
|
||||
|
||||
def test_concurrent_streaming_states_do_not_cross_contaminate(self) -> None:
|
||||
"""Two streaming states created in separate contexts (simulating
|
||||
concurrent runs) must each receive only their own events, even
|
||||
though both handlers are registered on the same global event bus.
|
||||
"""
|
||||
import contextvars
|
||||
|
||||
from crewai.utilities.streaming import (
|
||||
create_streaming_state,
|
||||
_current_stream_run_id,
|
||||
TaskInfo,
|
||||
_unregister_handler,
|
||||
)
|
||||
|
||||
task_a: TaskInfo = {
|
||||
"index": 0,
|
||||
"name": "task-a",
|
||||
"id": "tid-a",
|
||||
"agent_role": "Agent-A",
|
||||
"agent_id": "aid-a",
|
||||
}
|
||||
task_b: TaskInfo = {
|
||||
"index": 1,
|
||||
"name": "task-b",
|
||||
"id": "tid-b",
|
||||
"agent_role": "Agent-B",
|
||||
"agent_id": "aid-b",
|
||||
}
|
||||
|
||||
def _create_in_fresh_context(
|
||||
task_info: TaskInfo,
|
||||
) -> "StreamingState":
|
||||
"""Reset the run_id contextvar and create streaming state."""
|
||||
_current_stream_run_id.set(None)
|
||||
return create_streaming_state(task_info, [])
|
||||
|
||||
# Create each streaming state in a *separate* context so they get
|
||||
# distinct run_ids (simulates truly concurrent runs).
|
||||
state_a = contextvars.copy_context().run(_create_in_fresh_context, task_a)
|
||||
state_b = contextvars.copy_context().run(_create_in_fresh_context, task_b)
|
||||
|
||||
# Extract run_ids from handler closures.
|
||||
def _get_run_id_from_handler(handler: Any) -> str | None:
|
||||
"""Extract the run_id captured in the handler closure."""
|
||||
fn = handler
|
||||
if hasattr(fn, "__wrapped__"):
|
||||
fn = fn.__wrapped__
|
||||
for cell in (fn.__closure__ or []):
|
||||
try:
|
||||
val = cell.cell_contents
|
||||
if isinstance(val, str) and len(val) == 36 and val.count("-") == 4:
|
||||
return val
|
||||
except ValueError:
|
||||
continue
|
||||
return None
|
||||
|
||||
rid_a = _get_run_id_from_handler(state_a.handler)
|
||||
rid_b = _get_run_id_from_handler(state_b.handler)
|
||||
assert rid_a is not None and rid_b is not None
|
||||
assert rid_a != rid_b, "Each streaming state must have a unique run_id"
|
||||
|
||||
# Emit events for run A.
|
||||
for i in range(3):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
LLMStreamChunkEvent(
|
||||
chunk=f"A-{i}",
|
||||
call_id="cid-a",
|
||||
run_id=rid_a,
|
||||
),
|
||||
)
|
||||
|
||||
# Emit events for run B.
|
||||
for i in range(3):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
LLMStreamChunkEvent(
|
||||
chunk=f"B-{i}",
|
||||
call_id="cid-b",
|
||||
run_id=rid_b,
|
||||
),
|
||||
)
|
||||
|
||||
# Drain queues.
|
||||
chunks_a = []
|
||||
while not state_a.sync_queue.empty():
|
||||
chunks_a.append(state_a.sync_queue.get_nowait())
|
||||
|
||||
chunks_b = []
|
||||
while not state_b.sync_queue.empty():
|
||||
chunks_b.append(state_b.sync_queue.get_nowait())
|
||||
|
||||
# Verify isolation.
|
||||
contents_a = [c.content for c in chunks_a]
|
||||
contents_b = [c.content for c in chunks_b]
|
||||
|
||||
assert contents_a == ["A-0", "A-1", "A-2"], (
|
||||
f"State A must only contain its own chunks, got {contents_a}"
|
||||
)
|
||||
assert contents_b == ["B-0", "B-1", "B-2"], (
|
||||
f"State B must only contain its own chunks, got {contents_b}"
|
||||
)
|
||||
|
||||
# No cross-contamination.
|
||||
for c in contents_a:
|
||||
assert not c.startswith("B-"), f"Run A received run B chunk: {c}"
|
||||
for c in contents_b:
|
||||
assert not c.startswith("A-"), f"Run B received run A chunk: {c}"
|
||||
|
||||
# Cleanup.
|
||||
_unregister_handler(state_a.handler)
|
||||
_unregister_handler(state_b.handler)
|
||||
|
||||
def test_concurrent_threads_isolated(self) -> None:
|
||||
"""Simulate two concurrent streaming runs in separate threads and
|
||||
verify that each collects only its own chunks.
|
||||
"""
|
||||
import contextvars
|
||||
import threading
|
||||
import time
|
||||
|
||||
from crewai.utilities.streaming import (
|
||||
create_streaming_state,
|
||||
get_current_stream_run_id,
|
||||
TaskInfo,
|
||||
_unregister_handler,
|
||||
)
|
||||
|
||||
results: dict[str, list[str]] = {"A": [], "B": []}
|
||||
errors: list[Exception] = []
|
||||
|
||||
def run_streaming(label: str, task_info: TaskInfo) -> None:
|
||||
try:
|
||||
state = create_streaming_state(task_info, [])
|
||||
run_id = get_current_stream_run_id()
|
||||
assert run_id is not None
|
||||
|
||||
# Simulate LLM emitting chunks stamped with this run's id.
|
||||
for i in range(5):
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
LLMStreamChunkEvent(
|
||||
chunk=f"{label}-{i}",
|
||||
call_id=f"cid-{label}",
|
||||
run_id=run_id,
|
||||
),
|
||||
)
|
||||
time.sleep(0.005)
|
||||
|
||||
# Drain the queue.
|
||||
while not state.sync_queue.empty():
|
||||
item = state.sync_queue.get_nowait()
|
||||
results[label].append(item.content)
|
||||
|
||||
_unregister_handler(state.handler)
|
||||
except Exception as exc:
|
||||
errors.append(exc)
|
||||
|
||||
task_a: TaskInfo = {
|
||||
"index": 0,
|
||||
"name": "task-a",
|
||||
"id": "tid-a",
|
||||
"agent_role": "Agent-A",
|
||||
"agent_id": "aid-a",
|
||||
}
|
||||
task_b: TaskInfo = {
|
||||
"index": 1,
|
||||
"name": "task-b",
|
||||
"id": "tid-b",
|
||||
"agent_role": "Agent-B",
|
||||
"agent_id": "aid-b",
|
||||
}
|
||||
|
||||
t_a = threading.Thread(target=run_streaming, args=("A", task_a))
|
||||
t_b = threading.Thread(target=run_streaming, args=("B", task_b))
|
||||
|
||||
t_a.start()
|
||||
t_b.start()
|
||||
t_a.join(timeout=10)
|
||||
t_b.join(timeout=10)
|
||||
|
||||
assert not errors, f"Threads raised errors: {errors}"
|
||||
|
||||
# Each thread must see only its own chunks.
|
||||
for c in results["A"]:
|
||||
assert c.startswith("A-"), f"Run A received foreign chunk: {c}"
|
||||
for c in results["B"]:
|
||||
assert c.startswith("B-"), f"Run B received foreign chunk: {c}"
|
||||
|
||||
assert len(results["A"]) == 5, (
|
||||
f"Run A expected 5 chunks, got {len(results['A'])}: {results['A']}"
|
||||
)
|
||||
assert len(results["B"]) == 5, (
|
||||
f"Run B expected 5 chunks, got {len(results['B'])}: {results['B']}"
|
||||
)
|
||||
|
||||
def test_run_id_stamped_on_llm_stream_chunk_event(self) -> None:
|
||||
"""Verify that LLMStreamChunkEvent accepts and stores run_id."""
|
||||
event = LLMStreamChunkEvent(
|
||||
chunk="test",
|
||||
call_id="cid",
|
||||
run_id="my-run-id",
|
||||
)
|
||||
assert event.run_id == "my-run-id"
|
||||
|
||||
def test_run_id_defaults_to_none(self) -> None:
|
||||
"""Verify that run_id defaults to None when not provided."""
|
||||
event = LLMStreamChunkEvent(
|
||||
chunk="test",
|
||||
call_id="cid",
|
||||
)
|
||||
assert event.run_id is None
|
||||
|
||||
|
||||
class TestStreamingImports:
|
||||
"""Tests for correct imports of streaming types."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user