Define stream frame protocol for flows

This commit is contained in:
lorenzejay
2026-06-29 13:51:37 -07:00
parent 2b87098279
commit 72d78387bc
11 changed files with 1203 additions and 88 deletions

View File

@@ -364,6 +364,7 @@
"edge/en/learn/human-feedback-in-flows",
"edge/en/learn/kickoff-async",
"edge/en/learn/kickoff-for-each",
"edge/en/learn/streaming-runtime-contract",
"edge/en/learn/llm-connections",
"edge/en/learn/litellm-removal-guide",
"edge/en/learn/multimodal-agents",

View File

@@ -25,6 +25,7 @@ Use **`flow.handle_turn(message, session_id=...)`** for every user message from
| API | Use for |
|-----|---------|
| `handle_turn(message, session_id=...)` | Ergonomic one-turn wrapper for conversational `Flow` |
| `stream_turn(message, session_id=...)` | Stream one conversational turn as ordered runtime frames |
| `chat()` | Local terminal REPL for conversational `Flow` |
| `kickoff(inputs={...})` | Advanced flow execution without conversational turn handling |
| `ask()` | Blocking prompt **inside** one step (wizard, clarification) |
@@ -85,6 +86,23 @@ finally:
flow.finalize_session_traces() # one trace link for the whole chat
```
## Streaming a turn
Use `stream_turn()` when a UI or runtime needs structured events for one chat turn. It returns a stream session with ordered frames for Flow routing, LLM chunks, tool activity, and conversation messages.
```python
stream = flow.stream_turn("Where is my order?", session_id=session_id)
with stream:
for frame in stream.events:
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
print(frame.data.get("chunk", ""), end="", flush=True)
result = stream.result
```
For the full frame contract, channel list, and async API, see [Streaming Runtime Contract](/en/learn/streaming-runtime-contract).
## Turn lifecycle
Each `handle_turn` runs this pipeline:

View File

@@ -0,0 +1,162 @@
---
title: Streaming Runtime Contract
description: Stream ordered runtime frames from Flows and conversational turns.
icon: tower-broadcast
mode: "wide"
---
## Overview
CrewAI exposes a frame-based streaming contract for runtimes that need more than plain text chunks. The contract emits ordered `StreamFrame` objects for Flow lifecycle events, LLM tokens, tool activity, conversation messages, and custom events.
Use this API when you are building a UI, service bridge, terminal app, or deployment runtime that needs a stable stream of structured events while a Flow is running.
## StreamFrame
Every frame has the same envelope:
```python
from crewai.types.streaming import StreamFrame
frame.version # "v1"
frame.id # unique frame id
frame.seq # execution-local order, when available
frame.type # source event type, such as "flow_started"
frame.channel # "llm", "flow", "tools", "messages", "lifecycle", or "custom"
frame.namespace # source/runtime namespace
frame.timestamp # event timestamp
frame.parent_id # parent event id, when available
frame.previous_id # previous event id, when available
frame.data # event payload
```
The `channel` field is the fastest way to route frames in consumers:
| Channel | Contains |
|---------|----------|
| `llm` | Token and thinking chunks from LLM streaming events |
| `flow` | Flow lifecycle, method execution, routing, and pause/resume events |
| `tools` | Tool usage events |
| `messages` | Conversation transcript events |
| `lifecycle` | Runtime lifecycle events that are not specific to another channel |
| `custom` | Events that do not map to a built-in channel |
`frame.type` preserves the source event type, so consumers can handle specific events inside a channel.
## Stream a Flow
Use `stream_events()` to run a Flow and iterate over all frames:
```python
from crewai.flow import Flow, start
class ReportFlow(Flow):
@start()
def generate(self):
return "done"
flow = ReportFlow()
stream = flow.stream_events()
with stream:
for frame in stream.events:
print(frame.seq, frame.channel, frame.type, frame.data)
result = stream.result
```
You must consume the stream before reading `stream.result`. Accessing the result early raises a `RuntimeError` so consumers do not accidentally treat a partial run as complete.
## Filter by Channel
`StreamSession` exposes channel projections that preserve global frame order within the selected channel:
```python
stream = flow.stream_events()
with stream:
for frame in stream.llm:
print(frame.data.get("chunk", ""), end="", flush=True)
result = stream.result
```
Available projections are:
| Projection | Frames |
|------------|--------|
| `stream.events` | All frames |
| `stream.llm` | LLM frames |
| `stream.messages` | Conversation message frames |
| `stream.flow` | Flow frames |
| `stream.tools` | Tool frames |
| `stream.interleave([...])` | A selected set of channels |
Use `stream.interleave(["flow", "llm", "messages"])` when a consumer wants only some channels but still needs their relative order.
## Async Streaming
Use `astream()` for async consumers:
```python
flow = ReportFlow()
stream = flow.astream()
async with stream:
async for frame in stream.events:
print(frame.channel, frame.type)
result = stream.result
```
The async session has the same projections as the sync session.
## Conversational Turns
Conversational Flows can stream one user turn with `stream_turn()`:
```python
from crewai import Flow
from crewai.experimental.conversational import ConversationConfig, ConversationState
@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
class ChatFlow(Flow[ConversationState]):
conversational = True
flow = ChatFlow()
stream = flow.stream_turn("What can you help me with?", session_id="session-1")
with stream:
for frame in stream.events:
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
print(frame.data.get("chunk", ""), end="", flush=True)
reply = stream.result
```
During `stream_turn()`, the built-in conversational answer path enables LLM token streaming for that turn and restores the LLM's previous `stream` setting afterward. Custom route handlers that create their own agents or LLM instances should configure those LLMs for streaming if they need token-level output.
## Cleanup
Use the session as a context manager when possible. If a client disconnects before the stream is exhausted, close the session explicitly:
```python
stream = flow.stream_events()
try:
for frame in stream.events:
print(frame.type)
finally:
if not stream.is_exhausted:
stream.close()
```
For async streams, use `await stream.aclose()`.
## Legacy Chunk Streaming
Crew streaming with `stream=True` still returns the chunk-oriented `CrewStreamingOutput` API described in [Streaming Crew Execution](/en/learn/streaming-crew-execution). The frame contract is intended for runtimes that need a stable event envelope across Flows, conversational turns, LLM output, tools, and messages.

View File

@@ -40,6 +40,7 @@ from crewai.events.event_context import (
set_last_event_id,
)
from crewai.events.handler_graph import build_execution_plan
from crewai.events.stream_context import publish_stream_event
from crewai.events.types.event_bus_types import (
AsyncHandler,
AsyncHandlerSet,
@@ -565,6 +566,7 @@ class CrewAIEventsBus:
set_last_event_id(event.event_id)
publish_stream_event(source, event)
self._record_event(event)
def emit(self, source: Any, event: BaseEvent) -> Future[None] | None:

View File

@@ -0,0 +1,30 @@
"""Scoped stream sinks for converting emitted events into public frames."""
from __future__ import annotations
from collections.abc import Callable
import contextvars
from typing import Any
StreamSink = Callable[[Any, Any], None]
_stream_sinks: contextvars.ContextVar[tuple[StreamSink, ...]] = contextvars.ContextVar(
"crewai_stream_sinks", default=()
)
def add_stream_sink(sink: StreamSink) -> contextvars.Token[tuple[StreamSink, ...]]:
"""Register a sink in the current context."""
return _stream_sinks.set((*_stream_sinks.get(), sink))
def reset_stream_sinks(token: contextvars.Token[tuple[StreamSink, ...]]) -> None:
"""Restore the stream sink context."""
_stream_sinks.reset(token)
def publish_stream_event(source: Any, event: Any) -> None:
"""Publish a prepared event to sinks scoped to the current execution."""
for sink in _stream_sinks.get():
sink(source, event)

View File

@@ -18,7 +18,8 @@ Import surface:
from __future__ import annotations
from collections.abc import Callable, Mapping, Sequence
from collections.abc import Callable, Iterator, Mapping, Sequence
from contextlib import contextmanager
from enum import Enum
import json
import logging
@@ -221,7 +222,9 @@ class _ConversationalMixin:
messages.append({"role": "system", "content": system_prompt})
messages.extend(self.conversation_messages)
response = self._coerce_llm(llm).call(messages=messages)
llm_instance = self._coerce_llm(llm)
with self._conversation_streaming_enabled(llm_instance):
response = llm_instance.call(messages=messages)
content = self._stringify_result(response)
self.append_assistant_message(content)
return content
@@ -254,7 +257,8 @@ class _ConversationalMixin:
},
*self.build_agent_context("answer_from_history"),
]
response = llm_instance.call(messages=messages)
with self._conversation_streaming_enabled(llm_instance):
response = llm_instance.call(messages=messages)
content = self._stringify_result(response)
self.append_assistant_message(content)
return content
@@ -337,6 +341,101 @@ class _ConversationalMixin:
)
return result
def stream_turn(
self,
message: str,
*,
session_id: str | None = None,
intents: Sequence[str] | None = None,
intent_llm: str | BaseLLM | None = None,
**kickoff_kwargs: Any,
) -> Any:
"""Append a user message and stream one conversational turn as frames."""
if not self._is_conversational_enabled():
raise ValueError(
"Flow.stream_turn() is only available on conversational flows"
)
from crewai.types.streaming import StreamSession
from crewai.utilities.streaming import (
create_frame_generator,
create_frame_streaming_state,
)
state = cast(ConversationState, self.state)
sid = session_id or state.id
result_holder: list[Any] = []
frame_state = create_frame_streaming_state(result_holder, use_async=False)
output_holder: list[StreamSession[Any]] = []
def run_turn() -> Any:
crewai_event_bus.emit(
self,
ConversationTurnStartedEvent(
type="conversation_turn_started",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
),
)
self._pending_user_message = message
self._pending_intents = list(intents) if intents else None
self._pending_intent_llm = intent_llm
failed_event: ConversationTurnFailedEvent | None = None
try:
if "from_checkpoint" not in kickoff_kwargs:
self._reset_turn_execution_state()
assistant_count = self._assistant_message_count()
original_stream = bool(getattr(self, "stream", False))
original_streaming_turn = getattr(
self, "_streaming_conversation_turn", False
)
try:
object.__setattr__(self, "stream", False)
object.__setattr__(self, "_streaming_conversation_turn", True)
result = self.kickoff(inputs={"id": sid}, **kickoff_kwargs)
finally:
object.__setattr__(self, "stream", original_stream)
object.__setattr__(
self, "_streaming_conversation_turn", original_streaming_turn
)
if (
result is not None
and self._assistant_message_count() == assistant_count
and self._is_public_turn_result(result)
):
self.append_assistant_message(self._stringify_result(result))
except Exception as exc:
failed_event = ConversationTurnFailedEvent(
type="conversation_turn_failed",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
error=exc,
)
self._emit_terminal_conversation_turn_event(failed_event)
raise
finally:
self._pending_user_message = None
self._pending_intents = None
self._pending_intent_llm = None
self._emit_terminal_conversation_turn_event(
ConversationTurnCompletedEvent(
type="conversation_turn_completed",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
),
)
return result
stream_session: StreamSession[Any] = StreamSession(
sync_iterator=create_frame_generator(frame_state, run_turn, output_holder)
)
output_holder.append(stream_session)
return stream_session
def _emit_terminal_conversation_turn_event(
self,
event: ConversationTurnCompletedEvent | ConversationTurnFailedEvent,
@@ -685,6 +784,8 @@ class _ConversationalMixin:
object.__setattr__(self, "_pending_intents", None)
if not hasattr(self, "_pending_intent_llm"):
object.__setattr__(self, "_pending_intent_llm", None)
if not hasattr(self, "_streaming_conversation_turn"):
object.__setattr__(self, "_streaming_conversation_turn", False)
def _create_default_extension_state(self) -> ConversationState | None:
initial_state_t = getattr(self, "_initial_state_t", None)
@@ -1055,6 +1156,21 @@ class _ConversationalMixin:
return llm
raise ValueError(f"Invalid llm type: {type(llm)}. Expected str or BaseLLM.")
@contextmanager
def _conversation_streaming_enabled(self, llm: Any) -> Iterator[None]:
if not getattr(self, "_streaming_conversation_turn", False) or not hasattr(
llm, "stream"
):
yield
return
original_stream = llm.stream
try:
llm.stream = True
yield
finally:
llm.stream = original_stream
def finalize_session_traces(self) -> None:
"""Emit a final ``FlowFinishedEvent`` and finalize the trace batch.

View File

@@ -10,6 +10,7 @@ from __future__ import annotations
import asyncio
from collections.abc import (
AsyncIterator,
Callable,
Iterator,
Sequence,
@@ -140,17 +141,18 @@ if TYPE_CHECKING:
from crewai.llms.base_llm import BaseLLM
from crewai.flow.visualization import build_flow_structure, render_interactive
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.types.streaming import (
AsyncStreamSession,
FlowStreamingOutput,
StreamSession,
)
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.env import get_env_context
from crewai.utilities.streaming import (
TaskInfo,
create_async_chunk_generator,
create_chunk_generator,
create_streaming_state,
register_cleanup,
signal_end,
signal_error,
create_async_frame_generator,
create_frame_generator,
create_frame_streaming_state,
stream_frame_to_chunk,
)
@@ -1832,6 +1834,87 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if hasattr(self._state, key):
object.__setattr__(self._state, key, value)
def stream_events(
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> StreamSession[Any]:
"""Run the flow and stream all scoped public ``StreamFrame`` events."""
result_holder: list[Any] = []
state = create_frame_streaming_state(result_holder, use_async=False)
output_holder: list[StreamSession[Any]] = []
def run_flow() -> Any:
original_stream = self.stream
try:
self.stream = False
return self.kickoff(
inputs=inputs,
input_files=input_files,
from_checkpoint=from_checkpoint,
restore_from_state_id=restore_from_state_id,
)
except HumanFeedbackPending as e:
return e
finally:
self.stream = original_stream
stream_session: StreamSession[Any] = StreamSession(
sync_iterator=create_frame_generator(state, run_flow, output_holder)
)
output_holder.append(stream_session)
return stream_session
def stream_frames(
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> StreamSession[Any]:
"""Alias for :meth:`stream_events`."""
return self.stream_events(
inputs=inputs,
input_files=input_files,
from_checkpoint=from_checkpoint,
restore_from_state_id=restore_from_state_id,
)
def astream(
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> AsyncStreamSession[Any]:
"""Run the flow asynchronously and stream scoped public frames."""
result_holder: list[Any] = []
state = create_frame_streaming_state(result_holder, use_async=True)
output_holder: list[AsyncStreamSession[Any]] = []
async def run_flow() -> Any:
original_stream = self.stream
try:
self.stream = False
return await self.kickoff_async(
inputs=inputs,
input_files=input_files,
from_checkpoint=from_checkpoint,
restore_from_state_id=restore_from_state_id,
)
except HumanFeedbackPending as e:
return e
finally:
self.stream = original_stream
stream_session: AsyncStreamSession[Any] = AsyncStreamSession(
async_iterator=create_async_frame_generator(state, run_flow, output_holder)
)
output_holder.append(stream_session)
return stream_session
def kickoff(
self,
inputs: dict[str, Any] | None = None,
@@ -1871,44 +1954,26 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if restored is not None:
return restored.kickoff(inputs=inputs, input_files=input_files)
if self.stream:
result_holder: list[Any] = []
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}
streaming_output: FlowStreamingOutput
state = create_streaming_state(
current_task_info, result_holder, use_async=False
)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
def run_flow() -> None:
def chunk_iterator() -> Iterator[Any]:
stream_session = self.stream_events(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
try:
self.stream = False
result = self.kickoff(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
signal_error(state, e)
with stream_session:
for frame in stream_session.llm:
chunk = stream_frame_to_chunk(frame)
if chunk is not None:
yield chunk
streaming_output._set_result(stream_session.result)
finally:
self.stream = True
signal_end(state)
if not stream_session.is_exhausted:
stream_session.close()
streaming_output = FlowStreamingOutput(
sync_iterator=create_chunk_generator(state, run_flow, output_holder)
)
register_cleanup(streaming_output, state)
output_holder.append(streaming_output)
streaming_output = FlowStreamingOutput(sync_iterator=chunk_iterator())
return streaming_output
@@ -1971,46 +2036,26 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if restored is not None:
return await restored.kickoff_async(inputs=inputs, input_files=input_files)
if self.stream:
result_holder: list[Any] = []
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}
streaming_output: FlowStreamingOutput
state = create_streaming_state(
current_task_info, result_holder, use_async=True
)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
async def run_flow() -> None:
try:
self.stream = False
result = await self.kickoff_async(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
signal_error(state, e, is_async=True)
finally:
self.stream = True
signal_end(state, is_async=True)
streaming_output = FlowStreamingOutput(
async_iterator=create_async_chunk_generator(
state, run_flow, output_holder
async def chunk_iterator() -> AsyncIterator[Any]:
stream_session = self.astream(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
)
register_cleanup(streaming_output, state)
output_holder.append(streaming_output)
try:
async with stream_session:
async for frame in stream_session.llm:
chunk = stream_frame_to_chunk(frame)
if chunk is not None:
yield chunk
streaming_output._set_result(stream_session.result)
finally:
if not stream_session.is_exhausted:
await stream_session.aclose()
streaming_output = FlowStreamingOutput(async_iterator=chunk_iterator())
return streaming_output

View File

@@ -2,9 +2,10 @@
from __future__ import annotations
from collections.abc import AsyncIterator, Callable, Iterator
from collections.abc import AsyncIterator, Callable, Iterator, Sequence
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar
from pydantic import BaseModel, Field
from typing_extensions import Self
@@ -15,6 +16,232 @@ if TYPE_CHECKING:
T = TypeVar("T")
_MISSING = object()
StreamChannel = Literal[
"llm",
"flow",
"tools",
"messages",
"lifecycle",
"custom",
]
class StreamFrame(BaseModel):
"""Stable public stream frame emitted by streamable runtimes."""
version: Literal["v1"] = "v1"
id: str = Field(description="Unique frame/event identifier")
seq: int | None = Field(default=None, description="Execution-local order")
type: str = Field(description="Source event type")
channel: StreamChannel = Field(description="High-level stream channel")
namespace: list[str] = Field(default_factory=list)
timestamp: datetime
parent_id: str | None = None
previous_id: str | None = None
data: dict[str, Any] = Field(default_factory=dict)
class StreamSessionBase(Generic[T]):
"""Base stream session with ordered frame iteration and result access."""
def __init__(
self,
sync_iterator: Iterator[StreamFrame] | None = None,
async_iterator: AsyncIterator[StreamFrame] | None = None,
) -> None:
self._result: T | object = _MISSING
self._completed = False
self._frames: list[StreamFrame] = []
self._error: Exception | None = None
self._cancelled = False
self._exhausted = False
self._on_cleanup: Callable[[], None] | None = None
self._sync_iterator = sync_iterator
self._async_iterator = async_iterator
@property
def result(self) -> T:
"""Return the final result after stream exhaustion or completion."""
if not self._completed:
raise RuntimeError(
"Streaming has not completed yet. "
"Iterate over all frames before accessing result."
)
if self._error is not None:
raise self._error
if self._result is _MISSING:
raise RuntimeError("No result available")
return self._result # type: ignore[return-value]
@property
def is_completed(self) -> bool:
"""Check if the stream has completed."""
return self._completed
@property
def is_cancelled(self) -> bool:
"""Check if the stream was cancelled."""
return self._cancelled
@property
def is_exhausted(self) -> bool:
"""Check if the stream iterator was fully consumed."""
return self._exhausted
@property
def frames(self) -> list[StreamFrame]:
"""Return collected frames."""
return self._frames.copy()
def _set_result(self, result: T) -> None:
self._result = result
self._completed = True
class StreamSession(StreamSessionBase[T]):
"""Synchronous stream session for ordered public frames."""
def __enter__(self) -> Self:
return self
def __exit__(self, *exc_info: Any) -> None:
if not self._exhausted:
self.close()
@property
def events(self) -> Iterator[StreamFrame]:
"""Iterate over all ordered frames."""
return self.subscribe()
@property
def llm(self) -> Iterator[StreamFrame]:
"""Iterate over LLM token and thinking frames."""
return self.subscribe(channels=["llm"])
@property
def messages(self) -> Iterator[StreamFrame]:
"""Iterate over conversation message frames."""
return self.subscribe(channels=["messages"])
@property
def flow(self) -> Iterator[StreamFrame]:
"""Iterate over Flow lifecycle and method frames."""
return self.subscribe(channels=["flow"])
@property
def tools(self) -> Iterator[StreamFrame]:
"""Iterate over tool execution frames."""
return self.subscribe(channels=["tools"])
def interleave(self, channels: Sequence[StreamChannel]) -> Iterator[StreamFrame]:
"""Iterate over selected channels while preserving global order."""
return self.subscribe(channels=channels)
def subscribe(
self, channels: Sequence[StreamChannel] | None = None
) -> Iterator[StreamFrame]:
"""Iterate over frames, optionally filtered by channel."""
if self._sync_iterator is None:
raise RuntimeError("Sync iterator not available")
selected = set(channels) if channels is not None else None
try:
for frame in self._sync_iterator:
self._frames.append(frame)
if selected is None or frame.channel in selected:
yield frame
self._exhausted = True
except Exception as e:
self._error = e
raise
finally:
self._completed = True
def close(self) -> None:
"""Cancel streaming and clean up resources."""
if self._cancelled or self._exhausted or self._error is not None:
return
self._cancelled = True
self._completed = True
if self._sync_iterator is not None and hasattr(self._sync_iterator, "close"):
self._sync_iterator.close()
if self._on_cleanup is not None:
self._on_cleanup()
self._on_cleanup = None
class AsyncStreamSession(StreamSessionBase[T]):
"""Asynchronous stream session for ordered public frames."""
async def __aenter__(self) -> Self:
return self
async def __aexit__(self, *exc_info: Any) -> None:
if not self._exhausted:
await self.aclose()
@property
def events(self) -> AsyncIterator[StreamFrame]:
"""Iterate over all ordered frames."""
return self.subscribe()
@property
def llm(self) -> AsyncIterator[StreamFrame]:
"""Iterate over LLM token and thinking frames."""
return self.subscribe(channels=["llm"])
@property
def messages(self) -> AsyncIterator[StreamFrame]:
"""Iterate over conversation message frames."""
return self.subscribe(channels=["messages"])
@property
def flow(self) -> AsyncIterator[StreamFrame]:
"""Iterate over Flow lifecycle and method frames."""
return self.subscribe(channels=["flow"])
@property
def tools(self) -> AsyncIterator[StreamFrame]:
"""Iterate over tool execution frames."""
return self.subscribe(channels=["tools"])
def interleave(
self, channels: Sequence[StreamChannel]
) -> AsyncIterator[StreamFrame]:
"""Iterate over selected channels while preserving global order."""
return self.subscribe(channels=channels)
async def subscribe(
self, channels: Sequence[StreamChannel] | None = None
) -> AsyncIterator[StreamFrame]:
"""Iterate over frames, optionally filtered by channel."""
if self._async_iterator is None:
raise RuntimeError("Async iterator not available")
selected = set(channels) if channels is not None else None
try:
async for frame in self._async_iterator:
self._frames.append(frame)
if selected is None or frame.channel in selected:
yield frame
self._exhausted = True
except Exception as e:
self._error = e
raise
finally:
self._completed = True
async def aclose(self) -> None:
"""Cancel streaming and clean up resources."""
if self._cancelled or self._exhausted or self._error is not None:
return
self._cancelled = True
self._completed = True
if self._async_iterator is not None and hasattr(self._async_iterator, "aclose"):
await self._async_iterator.aclose()
if self._on_cleanup is not None:
self._on_cleanup()
self._on_cleanup = None
class StreamChunkType(Enum):

View File

@@ -6,19 +6,32 @@ import contextvars
import logging
import queue
import threading
from typing import Any, NamedTuple
from typing import Any, NamedTuple, cast
import uuid
from typing_extensions import TypedDict
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMStreamChunkEvent
from crewai.events.stream_context import add_stream_sink, reset_stream_sinks
from crewai.events.types.flow_events import ConversationMessageAddedEvent, FlowEvent
from crewai.events.types.llm_events import (
LLMEventBase,
LLMStreamChunkEvent,
)
from crewai.events.types.tool_usage_events import (
ToolExecutionErrorEvent,
ToolUsageEvent,
)
from crewai.types.streaming import (
AsyncStreamSession,
CrewStreamingOutput,
FlowStreamingOutput,
StreamChannel,
StreamChunk,
StreamChunkType,
StreamFrame,
StreamSession,
ToolCallChunk,
)
from crewai.utilities.string_utils import sanitize_tool_name
@@ -53,6 +66,16 @@ class StreamingState(NamedTuple):
stream_id: str | None = None
class FrameStreamingState(NamedTuple):
"""Immutable state for public frame streaming execution."""
result_holder: list[Any]
sync_queue: queue.Queue[StreamFrame | None | Exception]
async_queue: asyncio.Queue[StreamFrame | None | Exception] | None
loop: asyncio.AbstractEventLoop | None
sink: Callable[[Any, BaseEvent], None]
def _extract_tool_call_info(
event: LLMStreamChunkEvent,
) -> tuple[StreamChunkType, ToolCallChunk | None]:
@@ -80,6 +103,46 @@ def _extract_tool_call_info(
return StreamChunkType.TEXT, None
def _extract_tool_call_chunk(data: dict[str, Any]) -> ToolCallChunk | None:
tool_call = data.get("tool_call")
if not isinstance(tool_call, dict):
return None
function = tool_call.get("function")
if not isinstance(function, dict):
function = {}
function_name = function.get("name")
return ToolCallChunk(
tool_id=tool_call.get("id"),
tool_name=sanitize_tool_name(str(function_name)) if function_name else None,
arguments=function.get("arguments") or "",
index=tool_call.get("index") or 0,
)
def stream_frame_to_chunk(frame: StreamFrame) -> StreamChunk | None:
"""Project an LLM stream frame into the legacy ``StreamChunk`` shape."""
if frame.channel != "llm" or frame.type not in {
"llm_stream_chunk",
"llm_thinking_chunk",
}:
return None
tool_call = _extract_tool_call_chunk(frame.data)
chunk_type = (
StreamChunkType.TOOL_CALL if tool_call is not None else StreamChunkType.TEXT
)
return StreamChunk(
content=str(frame.data.get("chunk") or ""),
chunk_type=chunk_type,
task_index=0,
task_name=str(frame.data.get("task_name") or ""),
task_id=str(frame.data.get("task_id") or ""),
agent_role=str(frame.data.get("agent_role") or ""),
agent_id=str(frame.data.get("agent_id") or ""),
tool_call=tool_call,
)
def _create_stream_chunk(
event: LLMStreamChunkEvent,
current_task_info: TaskInfo,
@@ -107,6 +170,208 @@ def _create_stream_chunk(
)
_FRAME_DATA_EXCLUDE = {
"timestamp",
"type",
"event_id",
"parent_event_id",
"previous_event_id",
"emission_sequence",
}
def _stream_channel(event: BaseEvent) -> StreamChannel:
if isinstance(event, LLMEventBase):
return "llm"
if isinstance(event, ConversationMessageAddedEvent):
return "messages"
if isinstance(event, FlowEvent):
return "flow"
if isinstance(event, ToolUsageEvent | ToolExecutionErrorEvent):
return "tools"
if "error" in event.type or "failed" in event.type:
return "lifecycle"
return "custom"
def _stream_namespace(event: BaseEvent, channel: StreamChannel) -> list[str]:
namespace: list[str] = [channel]
for attr in (
"flow_name",
"method_name",
"session_id",
"call_id",
"tool_name",
"agent_role",
"task_name",
):
value = getattr(event, attr, None)
if value is not None:
namespace.append(str(value))
return namespace
def stream_frame_from_event(event: BaseEvent) -> StreamFrame:
"""Convert an internal CrewAI event into the public stream frame contract."""
channel = _stream_channel(event)
data = event.to_json(exclude=_FRAME_DATA_EXCLUDE)
if not isinstance(data, dict):
data = {"value": data}
return StreamFrame(
version="v1",
id=event.event_id,
seq=event.emission_sequence,
type=event.type,
channel=channel,
namespace=_stream_namespace(event, channel),
timestamp=event.timestamp,
parent_id=event.parent_event_id,
previous_id=event.previous_event_id,
data=cast(dict[str, Any], data),
)
def _create_frame_sink(
sync_queue: queue.Queue[StreamFrame | None | Exception],
async_queue: asyncio.Queue[StreamFrame | None | Exception] | None = None,
loop: asyncio.AbstractEventLoop | None = None,
) -> Callable[[Any, BaseEvent], None]:
def frame_sink(_: Any, event: BaseEvent) -> None:
frame = stream_frame_from_event(event)
if async_queue is not None and loop is not None:
loop.call_soon_threadsafe(async_queue.put_nowait, frame)
else:
sync_queue.put(frame)
return frame_sink
def create_frame_streaming_state(
result_holder: list[Any],
use_async: bool = False,
) -> FrameStreamingState:
"""Create state for a scoped public frame stream."""
sync_queue: queue.Queue[StreamFrame | None | Exception] = queue.Queue()
async_queue: asyncio.Queue[StreamFrame | None | Exception] | None = None
loop: asyncio.AbstractEventLoop | None = None
if use_async:
async_queue = asyncio.Queue()
loop = asyncio.get_running_loop()
sink = _create_frame_sink(sync_queue, async_queue, loop)
return FrameStreamingState(
result_holder=result_holder,
sync_queue=sync_queue,
async_queue=async_queue,
loop=loop,
sink=sink,
)
def _signal_frame_end(state: FrameStreamingState, is_async: bool = False) -> None:
if is_async and state.async_queue is not None and state.loop is not None:
state.loop.call_soon_threadsafe(state.async_queue.put_nowait, None)
else:
state.sync_queue.put(None)
def _signal_frame_error(
state: FrameStreamingState, error: Exception, is_async: bool = False
) -> None:
if is_async and state.async_queue is not None and state.loop is not None:
state.loop.call_soon_threadsafe(state.async_queue.put_nowait, error)
else:
state.sync_queue.put(error)
def _finalize_frame_streaming(
state: FrameStreamingState,
stream_session: StreamSession[Any] | AsyncStreamSession[Any],
) -> None:
stream_session._on_cleanup = None
if state.result_holder:
stream_session._set_result(state.result_holder[0])
def create_frame_generator(
state: FrameStreamingState,
run_func: Callable[[], Any],
output_holder: list[StreamSession[Any]],
) -> Iterator[StreamFrame]:
"""Create a scoped synchronous public frame generator."""
def run_with_sink() -> None:
token = add_stream_sink(state.sink)
try:
result = run_func()
state.result_holder.append(result)
except Exception as e:
_signal_frame_error(state, e)
finally:
reset_stream_sinks(token)
_signal_frame_end(state)
ctx = contextvars.copy_context()
thread = threading.Thread(target=ctx.run, args=(run_with_sink,), daemon=True)
thread.start()
try:
while True:
item = state.sync_queue.get()
if item is None:
break
if isinstance(item, Exception):
raise item
yield item
finally:
thread.join()
if output_holder:
_finalize_frame_streaming(state, output_holder[0])
async def create_async_frame_generator(
state: FrameStreamingState,
run_coro: Callable[[], Any],
output_holder: list[AsyncStreamSession[Any]],
) -> AsyncIterator[StreamFrame]:
"""Create a scoped asynchronous public frame generator."""
if state.async_queue is None:
raise RuntimeError(
"Async queue not initialized. Use create_frame_streaming_state(use_async=True)."
)
async def run_with_sink() -> None:
token = add_stream_sink(state.sink)
try:
result = await run_coro()
state.result_holder.append(result)
except Exception as e:
_signal_frame_error(state, e, is_async=True)
finally:
reset_stream_sinks(token)
_signal_frame_end(state, is_async=True)
task = asyncio.create_task(run_with_sink())
try:
while True:
item = await state.async_queue.get()
if item is None:
break
if isinstance(item, Exception):
raise item
yield item
finally:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception:
logger.debug("Background frame streaming task failed", exc_info=True)
if output_holder:
_finalize_frame_streaming(state, output_holder[0])
def _create_stream_handler(
current_task_info: TaskInfo,
sync_queue: queue.Queue[StreamChunk | None | Exception],

View File

@@ -21,7 +21,7 @@ from crewai.events.types.flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import LLMCallStartedEvent
from crewai.events.types.llm_events import LLMCallStartedEvent, LLMStreamChunkEvent
from crewai.experimental import (
ConversationConfig,
ConversationMessage,
@@ -137,6 +137,89 @@ class TestClassifyIntent:
class TestConversationalFlow:
def test_stream_turn_emits_ordered_conversation_frames(self) -> None:
flow = ConversationalFlow()
flow.stream = True
stream_values_seen_by_kickoff: list[bool] = []
def kickoff_side_effect(*_: Any, **__: Any) -> str:
stream_values_seen_by_kickoff.append(flow.stream)
crewai_event_bus.emit(
flow,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="pong",
call_id="call-1",
),
)
return "pong"
with patch.object(flow, "kickoff", side_effect=kickoff_side_effect):
stream = flow.stream_turn("ping", session_id="session-1")
with pytest.raises(RuntimeError, match="Streaming has not completed yet"):
_ = stream.result
frames = list(stream.events)
assert stream.result == "pong"
assert stream_values_seen_by_kickoff == [False]
assert flow.stream is True
assert [frame.seq for frame in frames] == sorted(frame.seq for frame in frames)
assert [frame.type for frame in frames] == [
"conversation_turn_started",
"llm_stream_chunk",
"conversation_message_added",
"conversation_turn_completed",
]
assert [frame.channel for frame in frames] == [
"flow",
"llm",
"messages",
"flow",
]
assert frames[1].data["chunk"] == "pong"
assert flow.state.messages[-1].content == "pong"
def test_stream_turn_enables_streaming_on_conversation_llm(self) -> None:
class FakeLLM:
stream = False
def __init__(self) -> None:
self.stream_values: list[bool] = []
def call(self, *, messages: list[dict[str, Any]]) -> str:
self.stream_values.append(self.stream)
for chunk in ("po", "ng"):
crewai_event_bus.emit(
flow,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk=chunk,
call_id="call-1",
),
)
return "pong"
llm = FakeLLM()
@ConversationConfig(llm=llm)
class StreamingChatFlow(ConversationalFlow):
pass
flow = StreamingChatFlow()
stream = flow.stream_turn("ping", session_id="session-1")
frames = list(stream.events)
assert stream.result == "pong"
assert llm.stream_values == [True]
assert llm.stream is False
assert [
frame.data["chunk"]
for frame in frames
if frame.type == "llm_stream_chunk"
] == ["po", "ng"]
def test_deferred_multi_turn_emits_single_flow_finished(self) -> None:
"""A deferred multi-turn session lands as one trace: exactly one
``FlowFinishedEvent`` is emitted at ``finalize_session_traces()``, not

View File

@@ -0,0 +1,166 @@
"""Tests for the public stream frame contract."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from typing import Any
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import ConversationMessageAddedEvent
from crewai.events.types.llm_events import LLMStreamChunkEvent, LLMThinkingChunkEvent
from crewai.events.types.tool_usage_events import ToolUsageStartedEvent
from crewai.flow.flow import Flow, start
from crewai.types.streaming import FlowStreamingOutput, StreamFrame
class FrameFlow(Flow):
@start()
def run(self) -> str:
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="hello",
call_id="call-1",
),
)
crewai_event_bus.emit(
self,
LLMThinkingChunkEvent(
type="llm_thinking_chunk",
chunk="thinking",
call_id="call-1",
),
)
crewai_event_bus.emit(
self,
ConversationMessageAddedEvent(
type="conversation_message_added",
flow_name=self._definition.name,
session_id="session-1",
role="assistant",
content="hello",
message_index=0,
),
)
crewai_event_bus.emit(
self,
ToolUsageStartedEvent(
type="tool_usage_started",
tool_name="search",
tool_args={"query": "crew"},
),
)
return "done"
def test_stream_frame_contract_and_ordering() -> None:
stream = FrameFlow().stream_events()
with pytest.raises(RuntimeError, match="Streaming has not completed yet"):
_ = stream.result
with stream:
frames = list(stream.events)
assert stream.result == "done"
assert all(isinstance(frame, StreamFrame) for frame in frames)
assert all(frame.version == "v1" for frame in frames)
assert [frame.seq for frame in frames] == sorted(frame.seq for frame in frames)
by_type = {frame.type: frame for frame in frames}
assert by_type["flow_started"].channel == "flow"
assert by_type["method_execution_started"].parent_id == by_type["flow_started"].id
assert by_type["llm_stream_chunk"].channel == "llm"
assert by_type["llm_thinking_chunk"].channel == "llm"
assert by_type["conversation_message_added"].channel == "messages"
assert by_type["tool_usage_started"].channel == "tools"
assert "FrameFlow" in by_type["method_execution_started"].namespace
assert "run" in by_type["method_execution_started"].namespace
def test_stream_subscribe_filters_channels_without_losing_order() -> None:
with FrameFlow().stream_events() as stream:
frames = list(stream.interleave(["messages", "tools"]))
assert [frame.channel for frame in frames] == ["messages", "tools"]
assert [frame.seq for frame in frames] == sorted(frame.seq for frame in frames)
assert stream.result == "done"
def test_stream_errors_surface_after_failed_frame() -> None:
class ErrorFlow(Flow):
@start()
def run(self) -> str:
raise ValueError("boom")
stream = ErrorFlow().stream_events()
with pytest.raises(ValueError, match="boom"):
list(stream.events)
assert any(frame.type == "method_execution_failed" for frame in stream.frames)
with pytest.raises(ValueError, match="boom"):
_ = stream.result
def test_legacy_flow_streaming_uses_llm_frame_projection() -> None:
flow = FrameFlow()
flow.stream = True
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
chunks = list(streaming)
assert [chunk.content for chunk in chunks] == ["hello", "thinking"]
assert streaming.result == "done"
@pytest.mark.asyncio
async def test_astream_scopes_concurrent_executions() -> None:
class ConcurrentFlow(Flow):
@start()
async def run(self) -> str:
label = str(self.state["label"])
await asyncio.sleep(0)
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk=label,
call_id=label,
),
)
return label
async def collect(label: str) -> tuple[str, list[str]]:
async with ConcurrentFlow().astream(inputs={"label": label}) as stream:
frames = [frame async for frame in stream.llm]
return stream.result, [frame.data["chunk"] for frame in frames]
first, second = await asyncio.gather(collect("first"), collect("second"))
assert first == ("first", ["first"])
assert second == ("second", ["second"])
@pytest.mark.asyncio
async def test_astream_cancellation_cleans_up_task() -> None:
class SlowFlow(Flow):
@start()
async def run(self) -> str:
await asyncio.sleep(10)
return "too late"
stream = SlowFlow().astream()
events: AsyncIterator[StreamFrame] = stream.events
first_frame = await anext(events)
assert first_frame.type == "flow_started"
await stream.aclose()
assert stream.is_cancelled is True
assert stream.is_completed is True