From 72d78387bcdae24a552f7ffa1b42a127a4b6ab4a Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Mon, 29 Jun 2026 13:51:37 -0700 Subject: [PATCH] Define stream frame protocol for flows --- docs/docs.json | 1 + .../en/guides/flows/conversational-flows.mdx | 18 ++ .../en/learn/streaming-runtime-contract.mdx | 162 +++++++++++ lib/crewai/src/crewai/events/event_bus.py | 2 + .../src/crewai/events/stream_context.py | 30 ++ .../experimental/conversational_mixin.py | 122 +++++++- .../src/crewai/flow/runtime/__init__.py | 205 +++++++------ lib/crewai/src/crewai/types/streaming.py | 231 ++++++++++++++- lib/crewai/src/crewai/utilities/streaming.py | 269 +++++++++++++++++- lib/crewai/tests/test_flow_conversation.py | 85 +++++- lib/crewai/tests/test_stream_frames.py | 166 +++++++++++ 11 files changed, 1203 insertions(+), 88 deletions(-) create mode 100644 docs/edge/en/learn/streaming-runtime-contract.mdx create mode 100644 lib/crewai/src/crewai/events/stream_context.py create mode 100644 lib/crewai/tests/test_stream_frames.py diff --git a/docs/docs.json b/docs/docs.json index c71b081ee..f722a9ae3 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -364,6 +364,7 @@ "edge/en/learn/human-feedback-in-flows", "edge/en/learn/kickoff-async", "edge/en/learn/kickoff-for-each", + "edge/en/learn/streaming-runtime-contract", "edge/en/learn/llm-connections", "edge/en/learn/litellm-removal-guide", "edge/en/learn/multimodal-agents", diff --git a/docs/edge/en/guides/flows/conversational-flows.mdx b/docs/edge/en/guides/flows/conversational-flows.mdx index c231fe010..0a0d6f8a3 100644 --- a/docs/edge/en/guides/flows/conversational-flows.mdx +++ b/docs/edge/en/guides/flows/conversational-flows.mdx @@ -25,6 +25,7 @@ Use **`flow.handle_turn(message, session_id=...)`** for every user message from | API | Use for | |-----|---------| | `handle_turn(message, session_id=...)` | Ergonomic one-turn wrapper for conversational `Flow` | +| `stream_turn(message, session_id=...)` | Stream one conversational turn as ordered runtime frames | | `chat()` | Local terminal REPL for conversational `Flow` | | `kickoff(inputs={...})` | Advanced flow execution without conversational turn handling | | `ask()` | Blocking prompt **inside** one step (wizard, clarification) | @@ -85,6 +86,23 @@ finally: flow.finalize_session_traces() # one trace link for the whole chat ``` +## Streaming a turn + +Use `stream_turn()` when a UI or runtime needs structured events for one chat turn. It returns a stream session with ordered frames for Flow routing, LLM chunks, tool activity, and conversation messages. + +```python +stream = flow.stream_turn("Where is my order?", session_id=session_id) + +with stream: + for frame in stream.events: + if frame.channel == "llm" and frame.type == "llm_stream_chunk": + print(frame.data.get("chunk", ""), end="", flush=True) + +result = stream.result +``` + +For the full frame contract, channel list, and async API, see [Streaming Runtime Contract](/en/learn/streaming-runtime-contract). + ## Turn lifecycle Each `handle_turn` runs this pipeline: diff --git a/docs/edge/en/learn/streaming-runtime-contract.mdx b/docs/edge/en/learn/streaming-runtime-contract.mdx new file mode 100644 index 000000000..1e7b4da72 --- /dev/null +++ b/docs/edge/en/learn/streaming-runtime-contract.mdx @@ -0,0 +1,162 @@ +--- +title: Streaming Runtime Contract +description: Stream ordered runtime frames from Flows and conversational turns. +icon: tower-broadcast +mode: "wide" +--- + +## Overview + +CrewAI exposes a frame-based streaming contract for runtimes that need more than plain text chunks. The contract emits ordered `StreamFrame` objects for Flow lifecycle events, LLM tokens, tool activity, conversation messages, and custom events. + +Use this API when you are building a UI, service bridge, terminal app, or deployment runtime that needs a stable stream of structured events while a Flow is running. + +## StreamFrame + +Every frame has the same envelope: + +```python +from crewai.types.streaming import StreamFrame + +frame.version # "v1" +frame.id # unique frame id +frame.seq # execution-local order, when available +frame.type # source event type, such as "flow_started" +frame.channel # "llm", "flow", "tools", "messages", "lifecycle", or "custom" +frame.namespace # source/runtime namespace +frame.timestamp # event timestamp +frame.parent_id # parent event id, when available +frame.previous_id # previous event id, when available +frame.data # event payload +``` + +The `channel` field is the fastest way to route frames in consumers: + +| Channel | Contains | +|---------|----------| +| `llm` | Token and thinking chunks from LLM streaming events | +| `flow` | Flow lifecycle, method execution, routing, and pause/resume events | +| `tools` | Tool usage events | +| `messages` | Conversation transcript events | +| `lifecycle` | Runtime lifecycle events that are not specific to another channel | +| `custom` | Events that do not map to a built-in channel | + +`frame.type` preserves the source event type, so consumers can handle specific events inside a channel. + +## Stream a Flow + +Use `stream_events()` to run a Flow and iterate over all frames: + +```python +from crewai.flow import Flow, start + + +class ReportFlow(Flow): + @start() + def generate(self): + return "done" + + +flow = ReportFlow() +stream = flow.stream_events() + +with stream: + for frame in stream.events: + print(frame.seq, frame.channel, frame.type, frame.data) + +result = stream.result +``` + +You must consume the stream before reading `stream.result`. Accessing the result early raises a `RuntimeError` so consumers do not accidentally treat a partial run as complete. + +## Filter by Channel + +`StreamSession` exposes channel projections that preserve global frame order within the selected channel: + +```python +stream = flow.stream_events() + +with stream: + for frame in stream.llm: + print(frame.data.get("chunk", ""), end="", flush=True) + +result = stream.result +``` + +Available projections are: + +| Projection | Frames | +|------------|--------| +| `stream.events` | All frames | +| `stream.llm` | LLM frames | +| `stream.messages` | Conversation message frames | +| `stream.flow` | Flow frames | +| `stream.tools` | Tool frames | +| `stream.interleave([...])` | A selected set of channels | + +Use `stream.interleave(["flow", "llm", "messages"])` when a consumer wants only some channels but still needs their relative order. + +## Async Streaming + +Use `astream()` for async consumers: + +```python +flow = ReportFlow() +stream = flow.astream() + +async with stream: + async for frame in stream.events: + print(frame.channel, frame.type) + +result = stream.result +``` + +The async session has the same projections as the sync session. + +## Conversational Turns + +Conversational Flows can stream one user turn with `stream_turn()`: + +```python +from crewai import Flow +from crewai.experimental.conversational import ConversationConfig, ConversationState + + +@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True) +class ChatFlow(Flow[ConversationState]): + conversational = True + + +flow = ChatFlow() +stream = flow.stream_turn("What can you help me with?", session_id="session-1") + +with stream: + for frame in stream.events: + if frame.channel == "llm" and frame.type == "llm_stream_chunk": + print(frame.data.get("chunk", ""), end="", flush=True) + +reply = stream.result +``` + +During `stream_turn()`, the built-in conversational answer path enables LLM token streaming for that turn and restores the LLM's previous `stream` setting afterward. Custom route handlers that create their own agents or LLM instances should configure those LLMs for streaming if they need token-level output. + +## Cleanup + +Use the session as a context manager when possible. If a client disconnects before the stream is exhausted, close the session explicitly: + +```python +stream = flow.stream_events() + +try: + for frame in stream.events: + print(frame.type) +finally: + if not stream.is_exhausted: + stream.close() +``` + +For async streams, use `await stream.aclose()`. + +## Legacy Chunk Streaming + +Crew streaming with `stream=True` still returns the chunk-oriented `CrewStreamingOutput` API described in [Streaming Crew Execution](/en/learn/streaming-crew-execution). The frame contract is intended for runtimes that need a stable event envelope across Flows, conversational turns, LLM output, tools, and messages. diff --git a/lib/crewai/src/crewai/events/event_bus.py b/lib/crewai/src/crewai/events/event_bus.py index 773ffa5bb..b6536b728 100644 --- a/lib/crewai/src/crewai/events/event_bus.py +++ b/lib/crewai/src/crewai/events/event_bus.py @@ -40,6 +40,7 @@ from crewai.events.event_context import ( set_last_event_id, ) from crewai.events.handler_graph import build_execution_plan +from crewai.events.stream_context import publish_stream_event from crewai.events.types.event_bus_types import ( AsyncHandler, AsyncHandlerSet, @@ -565,6 +566,7 @@ class CrewAIEventsBus: set_last_event_id(event.event_id) + publish_stream_event(source, event) self._record_event(event) def emit(self, source: Any, event: BaseEvent) -> Future[None] | None: diff --git a/lib/crewai/src/crewai/events/stream_context.py b/lib/crewai/src/crewai/events/stream_context.py new file mode 100644 index 000000000..671095807 --- /dev/null +++ b/lib/crewai/src/crewai/events/stream_context.py @@ -0,0 +1,30 @@ +"""Scoped stream sinks for converting emitted events into public frames.""" + +from __future__ import annotations + +from collections.abc import Callable +import contextvars +from typing import Any + + +StreamSink = Callable[[Any, Any], None] + +_stream_sinks: contextvars.ContextVar[tuple[StreamSink, ...]] = contextvars.ContextVar( + "crewai_stream_sinks", default=() +) + + +def add_stream_sink(sink: StreamSink) -> contextvars.Token[tuple[StreamSink, ...]]: + """Register a sink in the current context.""" + return _stream_sinks.set((*_stream_sinks.get(), sink)) + + +def reset_stream_sinks(token: contextvars.Token[tuple[StreamSink, ...]]) -> None: + """Restore the stream sink context.""" + _stream_sinks.reset(token) + + +def publish_stream_event(source: Any, event: Any) -> None: + """Publish a prepared event to sinks scoped to the current execution.""" + for sink in _stream_sinks.get(): + sink(source, event) diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index 95826d9ee..f83db93e0 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -18,7 +18,8 @@ Import surface: from __future__ import annotations -from collections.abc import Callable, Mapping, Sequence +from collections.abc import Callable, Iterator, Mapping, Sequence +from contextlib import contextmanager from enum import Enum import json import logging @@ -221,7 +222,9 @@ class _ConversationalMixin: messages.append({"role": "system", "content": system_prompt}) messages.extend(self.conversation_messages) - response = self._coerce_llm(llm).call(messages=messages) + llm_instance = self._coerce_llm(llm) + with self._conversation_streaming_enabled(llm_instance): + response = llm_instance.call(messages=messages) content = self._stringify_result(response) self.append_assistant_message(content) return content @@ -254,7 +257,8 @@ class _ConversationalMixin: }, *self.build_agent_context("answer_from_history"), ] - response = llm_instance.call(messages=messages) + with self._conversation_streaming_enabled(llm_instance): + response = llm_instance.call(messages=messages) content = self._stringify_result(response) self.append_assistant_message(content) return content @@ -337,6 +341,101 @@ class _ConversationalMixin: ) return result + def stream_turn( + self, + message: str, + *, + session_id: str | None = None, + intents: Sequence[str] | None = None, + intent_llm: str | BaseLLM | None = None, + **kickoff_kwargs: Any, + ) -> Any: + """Append a user message and stream one conversational turn as frames.""" + if not self._is_conversational_enabled(): + raise ValueError( + "Flow.stream_turn() is only available on conversational flows" + ) + + from crewai.types.streaming import StreamSession + from crewai.utilities.streaming import ( + create_frame_generator, + create_frame_streaming_state, + ) + + state = cast(ConversationState, self.state) + sid = session_id or state.id + result_holder: list[Any] = [] + frame_state = create_frame_streaming_state(result_holder, use_async=False) + output_holder: list[StreamSession[Any]] = [] + + def run_turn() -> Any: + crewai_event_bus.emit( + self, + ConversationTurnStartedEvent( + type="conversation_turn_started", + flow_name=self.name or self.__class__.__name__, + session_id=sid, + ), + ) + + self._pending_user_message = message + self._pending_intents = list(intents) if intents else None + self._pending_intent_llm = intent_llm + + failed_event: ConversationTurnFailedEvent | None = None + try: + if "from_checkpoint" not in kickoff_kwargs: + self._reset_turn_execution_state() + + assistant_count = self._assistant_message_count() + original_stream = bool(getattr(self, "stream", False)) + original_streaming_turn = getattr( + self, "_streaming_conversation_turn", False + ) + try: + object.__setattr__(self, "stream", False) + object.__setattr__(self, "_streaming_conversation_turn", True) + result = self.kickoff(inputs={"id": sid}, **kickoff_kwargs) + finally: + object.__setattr__(self, "stream", original_stream) + object.__setattr__( + self, "_streaming_conversation_turn", original_streaming_turn + ) + if ( + result is not None + and self._assistant_message_count() == assistant_count + and self._is_public_turn_result(result) + ): + self.append_assistant_message(self._stringify_result(result)) + except Exception as exc: + failed_event = ConversationTurnFailedEvent( + type="conversation_turn_failed", + flow_name=self.name or self.__class__.__name__, + session_id=sid, + error=exc, + ) + self._emit_terminal_conversation_turn_event(failed_event) + raise + finally: + self._pending_user_message = None + self._pending_intents = None + self._pending_intent_llm = None + + self._emit_terminal_conversation_turn_event( + ConversationTurnCompletedEvent( + type="conversation_turn_completed", + flow_name=self.name or self.__class__.__name__, + session_id=sid, + ), + ) + return result + + stream_session: StreamSession[Any] = StreamSession( + sync_iterator=create_frame_generator(frame_state, run_turn, output_holder) + ) + output_holder.append(stream_session) + return stream_session + def _emit_terminal_conversation_turn_event( self, event: ConversationTurnCompletedEvent | ConversationTurnFailedEvent, @@ -685,6 +784,8 @@ class _ConversationalMixin: object.__setattr__(self, "_pending_intents", None) if not hasattr(self, "_pending_intent_llm"): object.__setattr__(self, "_pending_intent_llm", None) + if not hasattr(self, "_streaming_conversation_turn"): + object.__setattr__(self, "_streaming_conversation_turn", False) def _create_default_extension_state(self) -> ConversationState | None: initial_state_t = getattr(self, "_initial_state_t", None) @@ -1055,6 +1156,21 @@ class _ConversationalMixin: return llm raise ValueError(f"Invalid llm type: {type(llm)}. Expected str or BaseLLM.") + @contextmanager + def _conversation_streaming_enabled(self, llm: Any) -> Iterator[None]: + if not getattr(self, "_streaming_conversation_turn", False) or not hasattr( + llm, "stream" + ): + yield + return + + original_stream = llm.stream + try: + llm.stream = True + yield + finally: + llm.stream = original_stream + def finalize_session_traces(self) -> None: """Emit a final ``FlowFinishedEvent`` and finalize the trace batch. diff --git a/lib/crewai/src/crewai/flow/runtime/__init__.py b/lib/crewai/src/crewai/flow/runtime/__init__.py index b28eb5429..1157bb0b1 100644 --- a/lib/crewai/src/crewai/flow/runtime/__init__.py +++ b/lib/crewai/src/crewai/flow/runtime/__init__.py @@ -10,6 +10,7 @@ from __future__ import annotations import asyncio from collections.abc import ( + AsyncIterator, Callable, Iterator, Sequence, @@ -140,17 +141,18 @@ if TYPE_CHECKING: from crewai.llms.base_llm import BaseLLM from crewai.flow.visualization import build_flow_structure, render_interactive -from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput +from crewai.types.streaming import ( + AsyncStreamSession, + FlowStreamingOutput, + StreamSession, +) from crewai.types.usage_metrics import UsageMetrics from crewai.utilities.env import get_env_context from crewai.utilities.streaming import ( - TaskInfo, - create_async_chunk_generator, - create_chunk_generator, - create_streaming_state, - register_cleanup, - signal_end, - signal_error, + create_async_frame_generator, + create_frame_generator, + create_frame_streaming_state, + stream_frame_to_chunk, ) @@ -1832,6 +1834,87 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): if hasattr(self._state, key): object.__setattr__(self._state, key, value) + def stream_events( + self, + inputs: dict[str, Any] | None = None, + input_files: dict[str, FileInput] | None = None, + from_checkpoint: CheckpointConfig | None = None, + restore_from_state_id: str | None = None, + ) -> StreamSession[Any]: + """Run the flow and stream all scoped public ``StreamFrame`` events.""" + result_holder: list[Any] = [] + state = create_frame_streaming_state(result_holder, use_async=False) + output_holder: list[StreamSession[Any]] = [] + + def run_flow() -> Any: + original_stream = self.stream + try: + self.stream = False + return self.kickoff( + inputs=inputs, + input_files=input_files, + from_checkpoint=from_checkpoint, + restore_from_state_id=restore_from_state_id, + ) + except HumanFeedbackPending as e: + return e + finally: + self.stream = original_stream + + stream_session: StreamSession[Any] = StreamSession( + sync_iterator=create_frame_generator(state, run_flow, output_holder) + ) + output_holder.append(stream_session) + return stream_session + + def stream_frames( + self, + inputs: dict[str, Any] | None = None, + input_files: dict[str, FileInput] | None = None, + from_checkpoint: CheckpointConfig | None = None, + restore_from_state_id: str | None = None, + ) -> StreamSession[Any]: + """Alias for :meth:`stream_events`.""" + return self.stream_events( + inputs=inputs, + input_files=input_files, + from_checkpoint=from_checkpoint, + restore_from_state_id=restore_from_state_id, + ) + + def astream( + self, + inputs: dict[str, Any] | None = None, + input_files: dict[str, FileInput] | None = None, + from_checkpoint: CheckpointConfig | None = None, + restore_from_state_id: str | None = None, + ) -> AsyncStreamSession[Any]: + """Run the flow asynchronously and stream scoped public frames.""" + result_holder: list[Any] = [] + state = create_frame_streaming_state(result_holder, use_async=True) + output_holder: list[AsyncStreamSession[Any]] = [] + + async def run_flow() -> Any: + original_stream = self.stream + try: + self.stream = False + return await self.kickoff_async( + inputs=inputs, + input_files=input_files, + from_checkpoint=from_checkpoint, + restore_from_state_id=restore_from_state_id, + ) + except HumanFeedbackPending as e: + return e + finally: + self.stream = original_stream + + stream_session: AsyncStreamSession[Any] = AsyncStreamSession( + async_iterator=create_async_frame_generator(state, run_flow, output_holder) + ) + output_holder.append(stream_session) + return stream_session + def kickoff( self, inputs: dict[str, Any] | None = None, @@ -1871,44 +1954,26 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): if restored is not None: return restored.kickoff(inputs=inputs, input_files=input_files) if self.stream: - result_holder: list[Any] = [] - current_task_info: TaskInfo = { - "index": 0, - "name": "", - "id": "", - "agent_role": "", - "agent_id": "", - } + streaming_output: FlowStreamingOutput - state = create_streaming_state( - current_task_info, result_holder, use_async=False - ) - output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = [] - - def run_flow() -> None: + def chunk_iterator() -> Iterator[Any]: + stream_session = self.stream_events( + inputs=inputs, + input_files=input_files, + restore_from_state_id=restore_from_state_id, + ) try: - self.stream = False - result = self.kickoff( - inputs=inputs, - input_files=input_files, - restore_from_state_id=restore_from_state_id, - ) - result_holder.append(result) - except Exception as e: - # HumanFeedbackPending is expected control flow, not an error - if isinstance(e, HumanFeedbackPending): - result_holder.append(e) - else: - signal_error(state, e) + with stream_session: + for frame in stream_session.llm: + chunk = stream_frame_to_chunk(frame) + if chunk is not None: + yield chunk + streaming_output._set_result(stream_session.result) finally: - self.stream = True - signal_end(state) + if not stream_session.is_exhausted: + stream_session.close() - streaming_output = FlowStreamingOutput( - sync_iterator=create_chunk_generator(state, run_flow, output_holder) - ) - register_cleanup(streaming_output, state) - output_holder.append(streaming_output) + streaming_output = FlowStreamingOutput(sync_iterator=chunk_iterator()) return streaming_output @@ -1971,46 +2036,26 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): if restored is not None: return await restored.kickoff_async(inputs=inputs, input_files=input_files) if self.stream: - result_holder: list[Any] = [] - current_task_info: TaskInfo = { - "index": 0, - "name": "", - "id": "", - "agent_role": "", - "agent_id": "", - } + streaming_output: FlowStreamingOutput - state = create_streaming_state( - current_task_info, result_holder, use_async=True - ) - output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = [] - - async def run_flow() -> None: - try: - self.stream = False - result = await self.kickoff_async( - inputs=inputs, - input_files=input_files, - restore_from_state_id=restore_from_state_id, - ) - result_holder.append(result) - except Exception as e: - # HumanFeedbackPending is expected control flow, not an error - if isinstance(e, HumanFeedbackPending): - result_holder.append(e) - else: - signal_error(state, e, is_async=True) - finally: - self.stream = True - signal_end(state, is_async=True) - - streaming_output = FlowStreamingOutput( - async_iterator=create_async_chunk_generator( - state, run_flow, output_holder + async def chunk_iterator() -> AsyncIterator[Any]: + stream_session = self.astream( + inputs=inputs, + input_files=input_files, + restore_from_state_id=restore_from_state_id, ) - ) - register_cleanup(streaming_output, state) - output_holder.append(streaming_output) + try: + async with stream_session: + async for frame in stream_session.llm: + chunk = stream_frame_to_chunk(frame) + if chunk is not None: + yield chunk + streaming_output._set_result(stream_session.result) + finally: + if not stream_session.is_exhausted: + await stream_session.aclose() + + streaming_output = FlowStreamingOutput(async_iterator=chunk_iterator()) return streaming_output diff --git a/lib/crewai/src/crewai/types/streaming.py b/lib/crewai/src/crewai/types/streaming.py index eb3ddbde1..11baffe90 100644 --- a/lib/crewai/src/crewai/types/streaming.py +++ b/lib/crewai/src/crewai/types/streaming.py @@ -2,9 +2,10 @@ from __future__ import annotations -from collections.abc import AsyncIterator, Callable, Iterator +from collections.abc import AsyncIterator, Callable, Iterator, Sequence +from datetime import datetime from enum import Enum -from typing import TYPE_CHECKING, Any, Generic, TypeVar +from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar from pydantic import BaseModel, Field from typing_extensions import Self @@ -15,6 +16,232 @@ if TYPE_CHECKING: T = TypeVar("T") +_MISSING = object() + +StreamChannel = Literal[ + "llm", + "flow", + "tools", + "messages", + "lifecycle", + "custom", +] + + +class StreamFrame(BaseModel): + """Stable public stream frame emitted by streamable runtimes.""" + + version: Literal["v1"] = "v1" + id: str = Field(description="Unique frame/event identifier") + seq: int | None = Field(default=None, description="Execution-local order") + type: str = Field(description="Source event type") + channel: StreamChannel = Field(description="High-level stream channel") + namespace: list[str] = Field(default_factory=list) + timestamp: datetime + parent_id: str | None = None + previous_id: str | None = None + data: dict[str, Any] = Field(default_factory=dict) + + +class StreamSessionBase(Generic[T]): + """Base stream session with ordered frame iteration and result access.""" + + def __init__( + self, + sync_iterator: Iterator[StreamFrame] | None = None, + async_iterator: AsyncIterator[StreamFrame] | None = None, + ) -> None: + self._result: T | object = _MISSING + self._completed = False + self._frames: list[StreamFrame] = [] + self._error: Exception | None = None + self._cancelled = False + self._exhausted = False + self._on_cleanup: Callable[[], None] | None = None + self._sync_iterator = sync_iterator + self._async_iterator = async_iterator + + @property + def result(self) -> T: + """Return the final result after stream exhaustion or completion.""" + if not self._completed: + raise RuntimeError( + "Streaming has not completed yet. " + "Iterate over all frames before accessing result." + ) + if self._error is not None: + raise self._error + if self._result is _MISSING: + raise RuntimeError("No result available") + return self._result # type: ignore[return-value] + + @property + def is_completed(self) -> bool: + """Check if the stream has completed.""" + return self._completed + + @property + def is_cancelled(self) -> bool: + """Check if the stream was cancelled.""" + return self._cancelled + + @property + def is_exhausted(self) -> bool: + """Check if the stream iterator was fully consumed.""" + return self._exhausted + + @property + def frames(self) -> list[StreamFrame]: + """Return collected frames.""" + return self._frames.copy() + + def _set_result(self, result: T) -> None: + self._result = result + self._completed = True + + +class StreamSession(StreamSessionBase[T]): + """Synchronous stream session for ordered public frames.""" + + def __enter__(self) -> Self: + return self + + def __exit__(self, *exc_info: Any) -> None: + if not self._exhausted: + self.close() + + @property + def events(self) -> Iterator[StreamFrame]: + """Iterate over all ordered frames.""" + return self.subscribe() + + @property + def llm(self) -> Iterator[StreamFrame]: + """Iterate over LLM token and thinking frames.""" + return self.subscribe(channels=["llm"]) + + @property + def messages(self) -> Iterator[StreamFrame]: + """Iterate over conversation message frames.""" + return self.subscribe(channels=["messages"]) + + @property + def flow(self) -> Iterator[StreamFrame]: + """Iterate over Flow lifecycle and method frames.""" + return self.subscribe(channels=["flow"]) + + @property + def tools(self) -> Iterator[StreamFrame]: + """Iterate over tool execution frames.""" + return self.subscribe(channels=["tools"]) + + def interleave(self, channels: Sequence[StreamChannel]) -> Iterator[StreamFrame]: + """Iterate over selected channels while preserving global order.""" + return self.subscribe(channels=channels) + + def subscribe( + self, channels: Sequence[StreamChannel] | None = None + ) -> Iterator[StreamFrame]: + """Iterate over frames, optionally filtered by channel.""" + if self._sync_iterator is None: + raise RuntimeError("Sync iterator not available") + selected = set(channels) if channels is not None else None + try: + for frame in self._sync_iterator: + self._frames.append(frame) + if selected is None or frame.channel in selected: + yield frame + self._exhausted = True + except Exception as e: + self._error = e + raise + finally: + self._completed = True + + def close(self) -> None: + """Cancel streaming and clean up resources.""" + if self._cancelled or self._exhausted or self._error is not None: + return + self._cancelled = True + self._completed = True + if self._sync_iterator is not None and hasattr(self._sync_iterator, "close"): + self._sync_iterator.close() + if self._on_cleanup is not None: + self._on_cleanup() + self._on_cleanup = None + + +class AsyncStreamSession(StreamSessionBase[T]): + """Asynchronous stream session for ordered public frames.""" + + async def __aenter__(self) -> Self: + return self + + async def __aexit__(self, *exc_info: Any) -> None: + if not self._exhausted: + await self.aclose() + + @property + def events(self) -> AsyncIterator[StreamFrame]: + """Iterate over all ordered frames.""" + return self.subscribe() + + @property + def llm(self) -> AsyncIterator[StreamFrame]: + """Iterate over LLM token and thinking frames.""" + return self.subscribe(channels=["llm"]) + + @property + def messages(self) -> AsyncIterator[StreamFrame]: + """Iterate over conversation message frames.""" + return self.subscribe(channels=["messages"]) + + @property + def flow(self) -> AsyncIterator[StreamFrame]: + """Iterate over Flow lifecycle and method frames.""" + return self.subscribe(channels=["flow"]) + + @property + def tools(self) -> AsyncIterator[StreamFrame]: + """Iterate over tool execution frames.""" + return self.subscribe(channels=["tools"]) + + def interleave( + self, channels: Sequence[StreamChannel] + ) -> AsyncIterator[StreamFrame]: + """Iterate over selected channels while preserving global order.""" + return self.subscribe(channels=channels) + + async def subscribe( + self, channels: Sequence[StreamChannel] | None = None + ) -> AsyncIterator[StreamFrame]: + """Iterate over frames, optionally filtered by channel.""" + if self._async_iterator is None: + raise RuntimeError("Async iterator not available") + selected = set(channels) if channels is not None else None + try: + async for frame in self._async_iterator: + self._frames.append(frame) + if selected is None or frame.channel in selected: + yield frame + self._exhausted = True + except Exception as e: + self._error = e + raise + finally: + self._completed = True + + async def aclose(self) -> None: + """Cancel streaming and clean up resources.""" + if self._cancelled or self._exhausted or self._error is not None: + return + self._cancelled = True + self._completed = True + if self._async_iterator is not None and hasattr(self._async_iterator, "aclose"): + await self._async_iterator.aclose() + if self._on_cleanup is not None: + self._on_cleanup() + self._on_cleanup = None class StreamChunkType(Enum): diff --git a/lib/crewai/src/crewai/utilities/streaming.py b/lib/crewai/src/crewai/utilities/streaming.py index 99bc9b199..766fb00a8 100644 --- a/lib/crewai/src/crewai/utilities/streaming.py +++ b/lib/crewai/src/crewai/utilities/streaming.py @@ -6,19 +6,32 @@ import contextvars import logging import queue import threading -from typing import Any, NamedTuple +from typing import Any, NamedTuple, cast import uuid from typing_extensions import TypedDict from crewai.events.base_events import BaseEvent from crewai.events.event_bus import crewai_event_bus -from crewai.events.types.llm_events import LLMStreamChunkEvent +from crewai.events.stream_context import add_stream_sink, reset_stream_sinks +from crewai.events.types.flow_events import ConversationMessageAddedEvent, FlowEvent +from crewai.events.types.llm_events import ( + LLMEventBase, + LLMStreamChunkEvent, +) +from crewai.events.types.tool_usage_events import ( + ToolExecutionErrorEvent, + ToolUsageEvent, +) from crewai.types.streaming import ( + AsyncStreamSession, CrewStreamingOutput, FlowStreamingOutput, + StreamChannel, StreamChunk, StreamChunkType, + StreamFrame, + StreamSession, ToolCallChunk, ) from crewai.utilities.string_utils import sanitize_tool_name @@ -53,6 +66,16 @@ class StreamingState(NamedTuple): stream_id: str | None = None +class FrameStreamingState(NamedTuple): + """Immutable state for public frame streaming execution.""" + + result_holder: list[Any] + sync_queue: queue.Queue[StreamFrame | None | Exception] + async_queue: asyncio.Queue[StreamFrame | None | Exception] | None + loop: asyncio.AbstractEventLoop | None + sink: Callable[[Any, BaseEvent], None] + + def _extract_tool_call_info( event: LLMStreamChunkEvent, ) -> tuple[StreamChunkType, ToolCallChunk | None]: @@ -80,6 +103,46 @@ def _extract_tool_call_info( return StreamChunkType.TEXT, None +def _extract_tool_call_chunk(data: dict[str, Any]) -> ToolCallChunk | None: + tool_call = data.get("tool_call") + if not isinstance(tool_call, dict): + return None + function = tool_call.get("function") + if not isinstance(function, dict): + function = {} + function_name = function.get("name") + return ToolCallChunk( + tool_id=tool_call.get("id"), + tool_name=sanitize_tool_name(str(function_name)) if function_name else None, + arguments=function.get("arguments") or "", + index=tool_call.get("index") or 0, + ) + + +def stream_frame_to_chunk(frame: StreamFrame) -> StreamChunk | None: + """Project an LLM stream frame into the legacy ``StreamChunk`` shape.""" + if frame.channel != "llm" or frame.type not in { + "llm_stream_chunk", + "llm_thinking_chunk", + }: + return None + + tool_call = _extract_tool_call_chunk(frame.data) + chunk_type = ( + StreamChunkType.TOOL_CALL if tool_call is not None else StreamChunkType.TEXT + ) + return StreamChunk( + content=str(frame.data.get("chunk") or ""), + chunk_type=chunk_type, + task_index=0, + task_name=str(frame.data.get("task_name") or ""), + task_id=str(frame.data.get("task_id") or ""), + agent_role=str(frame.data.get("agent_role") or ""), + agent_id=str(frame.data.get("agent_id") or ""), + tool_call=tool_call, + ) + + def _create_stream_chunk( event: LLMStreamChunkEvent, current_task_info: TaskInfo, @@ -107,6 +170,208 @@ def _create_stream_chunk( ) +_FRAME_DATA_EXCLUDE = { + "timestamp", + "type", + "event_id", + "parent_event_id", + "previous_event_id", + "emission_sequence", +} + + +def _stream_channel(event: BaseEvent) -> StreamChannel: + if isinstance(event, LLMEventBase): + return "llm" + if isinstance(event, ConversationMessageAddedEvent): + return "messages" + if isinstance(event, FlowEvent): + return "flow" + if isinstance(event, ToolUsageEvent | ToolExecutionErrorEvent): + return "tools" + if "error" in event.type or "failed" in event.type: + return "lifecycle" + return "custom" + + +def _stream_namespace(event: BaseEvent, channel: StreamChannel) -> list[str]: + namespace: list[str] = [channel] + for attr in ( + "flow_name", + "method_name", + "session_id", + "call_id", + "tool_name", + "agent_role", + "task_name", + ): + value = getattr(event, attr, None) + if value is not None: + namespace.append(str(value)) + return namespace + + +def stream_frame_from_event(event: BaseEvent) -> StreamFrame: + """Convert an internal CrewAI event into the public stream frame contract.""" + channel = _stream_channel(event) + data = event.to_json(exclude=_FRAME_DATA_EXCLUDE) + if not isinstance(data, dict): + data = {"value": data} + return StreamFrame( + version="v1", + id=event.event_id, + seq=event.emission_sequence, + type=event.type, + channel=channel, + namespace=_stream_namespace(event, channel), + timestamp=event.timestamp, + parent_id=event.parent_event_id, + previous_id=event.previous_event_id, + data=cast(dict[str, Any], data), + ) + + +def _create_frame_sink( + sync_queue: queue.Queue[StreamFrame | None | Exception], + async_queue: asyncio.Queue[StreamFrame | None | Exception] | None = None, + loop: asyncio.AbstractEventLoop | None = None, +) -> Callable[[Any, BaseEvent], None]: + def frame_sink(_: Any, event: BaseEvent) -> None: + frame = stream_frame_from_event(event) + if async_queue is not None and loop is not None: + loop.call_soon_threadsafe(async_queue.put_nowait, frame) + else: + sync_queue.put(frame) + + return frame_sink + + +def create_frame_streaming_state( + result_holder: list[Any], + use_async: bool = False, +) -> FrameStreamingState: + """Create state for a scoped public frame stream.""" + sync_queue: queue.Queue[StreamFrame | None | Exception] = queue.Queue() + async_queue: asyncio.Queue[StreamFrame | None | Exception] | None = None + loop: asyncio.AbstractEventLoop | None = None + if use_async: + async_queue = asyncio.Queue() + loop = asyncio.get_running_loop() + sink = _create_frame_sink(sync_queue, async_queue, loop) + return FrameStreamingState( + result_holder=result_holder, + sync_queue=sync_queue, + async_queue=async_queue, + loop=loop, + sink=sink, + ) + + +def _signal_frame_end(state: FrameStreamingState, is_async: bool = False) -> None: + if is_async and state.async_queue is not None and state.loop is not None: + state.loop.call_soon_threadsafe(state.async_queue.put_nowait, None) + else: + state.sync_queue.put(None) + + +def _signal_frame_error( + state: FrameStreamingState, error: Exception, is_async: bool = False +) -> None: + if is_async and state.async_queue is not None and state.loop is not None: + state.loop.call_soon_threadsafe(state.async_queue.put_nowait, error) + else: + state.sync_queue.put(error) + + +def _finalize_frame_streaming( + state: FrameStreamingState, + stream_session: StreamSession[Any] | AsyncStreamSession[Any], +) -> None: + stream_session._on_cleanup = None + if state.result_holder: + stream_session._set_result(state.result_holder[0]) + + +def create_frame_generator( + state: FrameStreamingState, + run_func: Callable[[], Any], + output_holder: list[StreamSession[Any]], +) -> Iterator[StreamFrame]: + """Create a scoped synchronous public frame generator.""" + + def run_with_sink() -> None: + token = add_stream_sink(state.sink) + try: + result = run_func() + state.result_holder.append(result) + except Exception as e: + _signal_frame_error(state, e) + finally: + reset_stream_sinks(token) + _signal_frame_end(state) + + ctx = contextvars.copy_context() + thread = threading.Thread(target=ctx.run, args=(run_with_sink,), daemon=True) + thread.start() + + try: + while True: + item = state.sync_queue.get() + if item is None: + break + if isinstance(item, Exception): + raise item + yield item + finally: + thread.join() + if output_holder: + _finalize_frame_streaming(state, output_holder[0]) + + +async def create_async_frame_generator( + state: FrameStreamingState, + run_coro: Callable[[], Any], + output_holder: list[AsyncStreamSession[Any]], +) -> AsyncIterator[StreamFrame]: + """Create a scoped asynchronous public frame generator.""" + if state.async_queue is None: + raise RuntimeError( + "Async queue not initialized. Use create_frame_streaming_state(use_async=True)." + ) + + async def run_with_sink() -> None: + token = add_stream_sink(state.sink) + try: + result = await run_coro() + state.result_holder.append(result) + except Exception as e: + _signal_frame_error(state, e, is_async=True) + finally: + reset_stream_sinks(token) + _signal_frame_end(state, is_async=True) + + task = asyncio.create_task(run_with_sink()) + try: + while True: + item = await state.async_queue.get() + if item is None: + break + if isinstance(item, Exception): + raise item + yield item + finally: + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + except Exception: + logger.debug("Background frame streaming task failed", exc_info=True) + if output_holder: + _finalize_frame_streaming(state, output_holder[0]) + + def _create_stream_handler( current_task_info: TaskInfo, sync_queue: queue.Queue[StreamChunk | None | Exception], diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index dd20cc61d..8aa2ca296 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -21,7 +21,7 @@ from crewai.events.types.flow_events import ( MethodExecutionFinishedEvent, MethodExecutionStartedEvent, ) -from crewai.events.types.llm_events import LLMCallStartedEvent +from crewai.events.types.llm_events import LLMCallStartedEvent, LLMStreamChunkEvent from crewai.experimental import ( ConversationConfig, ConversationMessage, @@ -137,6 +137,89 @@ class TestClassifyIntent: class TestConversationalFlow: + def test_stream_turn_emits_ordered_conversation_frames(self) -> None: + flow = ConversationalFlow() + flow.stream = True + stream_values_seen_by_kickoff: list[bool] = [] + + def kickoff_side_effect(*_: Any, **__: Any) -> str: + stream_values_seen_by_kickoff.append(flow.stream) + crewai_event_bus.emit( + flow, + LLMStreamChunkEvent( + type="llm_stream_chunk", + chunk="pong", + call_id="call-1", + ), + ) + return "pong" + + with patch.object(flow, "kickoff", side_effect=kickoff_side_effect): + stream = flow.stream_turn("ping", session_id="session-1") + + with pytest.raises(RuntimeError, match="Streaming has not completed yet"): + _ = stream.result + + frames = list(stream.events) + + assert stream.result == "pong" + assert stream_values_seen_by_kickoff == [False] + assert flow.stream is True + assert [frame.seq for frame in frames] == sorted(frame.seq for frame in frames) + assert [frame.type for frame in frames] == [ + "conversation_turn_started", + "llm_stream_chunk", + "conversation_message_added", + "conversation_turn_completed", + ] + assert [frame.channel for frame in frames] == [ + "flow", + "llm", + "messages", + "flow", + ] + assert frames[1].data["chunk"] == "pong" + assert flow.state.messages[-1].content == "pong" + + def test_stream_turn_enables_streaming_on_conversation_llm(self) -> None: + class FakeLLM: + stream = False + + def __init__(self) -> None: + self.stream_values: list[bool] = [] + + def call(self, *, messages: list[dict[str, Any]]) -> str: + self.stream_values.append(self.stream) + for chunk in ("po", "ng"): + crewai_event_bus.emit( + flow, + LLMStreamChunkEvent( + type="llm_stream_chunk", + chunk=chunk, + call_id="call-1", + ), + ) + return "pong" + + llm = FakeLLM() + + @ConversationConfig(llm=llm) + class StreamingChatFlow(ConversationalFlow): + pass + + flow = StreamingChatFlow() + stream = flow.stream_turn("ping", session_id="session-1") + frames = list(stream.events) + + assert stream.result == "pong" + assert llm.stream_values == [True] + assert llm.stream is False + assert [ + frame.data["chunk"] + for frame in frames + if frame.type == "llm_stream_chunk" + ] == ["po", "ng"] + def test_deferred_multi_turn_emits_single_flow_finished(self) -> None: """A deferred multi-turn session lands as one trace: exactly one ``FlowFinishedEvent`` is emitted at ``finalize_session_traces()``, not diff --git a/lib/crewai/tests/test_stream_frames.py b/lib/crewai/tests/test_stream_frames.py new file mode 100644 index 000000000..bc3efedd9 --- /dev/null +++ b/lib/crewai/tests/test_stream_frames.py @@ -0,0 +1,166 @@ +"""Tests for the public stream frame contract.""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator +from typing import Any + +import pytest + +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.flow_events import ConversationMessageAddedEvent +from crewai.events.types.llm_events import LLMStreamChunkEvent, LLMThinkingChunkEvent +from crewai.events.types.tool_usage_events import ToolUsageStartedEvent +from crewai.flow.flow import Flow, start +from crewai.types.streaming import FlowStreamingOutput, StreamFrame + + +class FrameFlow(Flow): + @start() + def run(self) -> str: + crewai_event_bus.emit( + self, + LLMStreamChunkEvent( + type="llm_stream_chunk", + chunk="hello", + call_id="call-1", + ), + ) + crewai_event_bus.emit( + self, + LLMThinkingChunkEvent( + type="llm_thinking_chunk", + chunk="thinking", + call_id="call-1", + ), + ) + crewai_event_bus.emit( + self, + ConversationMessageAddedEvent( + type="conversation_message_added", + flow_name=self._definition.name, + session_id="session-1", + role="assistant", + content="hello", + message_index=0, + ), + ) + crewai_event_bus.emit( + self, + ToolUsageStartedEvent( + type="tool_usage_started", + tool_name="search", + tool_args={"query": "crew"}, + ), + ) + return "done" + + +def test_stream_frame_contract_and_ordering() -> None: + stream = FrameFlow().stream_events() + + with pytest.raises(RuntimeError, match="Streaming has not completed yet"): + _ = stream.result + + with stream: + frames = list(stream.events) + + assert stream.result == "done" + assert all(isinstance(frame, StreamFrame) for frame in frames) + assert all(frame.version == "v1" for frame in frames) + assert [frame.seq for frame in frames] == sorted(frame.seq for frame in frames) + + by_type = {frame.type: frame for frame in frames} + assert by_type["flow_started"].channel == "flow" + assert by_type["method_execution_started"].parent_id == by_type["flow_started"].id + assert by_type["llm_stream_chunk"].channel == "llm" + assert by_type["llm_thinking_chunk"].channel == "llm" + assert by_type["conversation_message_added"].channel == "messages" + assert by_type["tool_usage_started"].channel == "tools" + assert "FrameFlow" in by_type["method_execution_started"].namespace + assert "run" in by_type["method_execution_started"].namespace + + +def test_stream_subscribe_filters_channels_without_losing_order() -> None: + with FrameFlow().stream_events() as stream: + frames = list(stream.interleave(["messages", "tools"])) + + assert [frame.channel for frame in frames] == ["messages", "tools"] + assert [frame.seq for frame in frames] == sorted(frame.seq for frame in frames) + assert stream.result == "done" + + +def test_stream_errors_surface_after_failed_frame() -> None: + class ErrorFlow(Flow): + @start() + def run(self) -> str: + raise ValueError("boom") + + stream = ErrorFlow().stream_events() + + with pytest.raises(ValueError, match="boom"): + list(stream.events) + + assert any(frame.type == "method_execution_failed" for frame in stream.frames) + with pytest.raises(ValueError, match="boom"): + _ = stream.result + + +def test_legacy_flow_streaming_uses_llm_frame_projection() -> None: + flow = FrameFlow() + flow.stream = True + + streaming = flow.kickoff() + + assert isinstance(streaming, FlowStreamingOutput) + chunks = list(streaming) + assert [chunk.content for chunk in chunks] == ["hello", "thinking"] + assert streaming.result == "done" + + +@pytest.mark.asyncio +async def test_astream_scopes_concurrent_executions() -> None: + class ConcurrentFlow(Flow): + @start() + async def run(self) -> str: + label = str(self.state["label"]) + await asyncio.sleep(0) + crewai_event_bus.emit( + self, + LLMStreamChunkEvent( + type="llm_stream_chunk", + chunk=label, + call_id=label, + ), + ) + return label + + async def collect(label: str) -> tuple[str, list[str]]: + async with ConcurrentFlow().astream(inputs={"label": label}) as stream: + frames = [frame async for frame in stream.llm] + return stream.result, [frame.data["chunk"] for frame in frames] + + first, second = await asyncio.gather(collect("first"), collect("second")) + + assert first == ("first", ["first"]) + assert second == ("second", ["second"]) + + +@pytest.mark.asyncio +async def test_astream_cancellation_cleans_up_task() -> None: + class SlowFlow(Flow): + @start() + async def run(self) -> str: + await asyncio.sleep(10) + return "too late" + + stream = SlowFlow().astream() + events: AsyncIterator[StreamFrame] = stream.events + first_frame = await anext(events) + + assert first_frame.type == "flow_started" + await stream.aclose() + + assert stream.is_cancelled is True + assert stream.is_completed is True