Unify flow streaming frame items

This commit is contained in:
lorenzejay
2026-06-29 14:50:17 -07:00
parent a48f45c917
commit 90b06a4523
15 changed files with 739 additions and 220 deletions

View File

@@ -9,12 +9,7 @@ Structure (see ``flow_definition``) and executed here.
from __future__ import annotations
import asyncio
from collections.abc import (
AsyncIterator,
Callable,
Iterator,
Sequence,
)
from collections.abc import Callable, Iterator, Sequence
from concurrent.futures import Future, ThreadPoolExecutor
import contextvars
import copy
@@ -143,7 +138,6 @@ if TYPE_CHECKING:
from crewai.flow.visualization import build_flow_structure, render_interactive
from crewai.types.streaming import (
AsyncStreamSession,
FlowStreamingOutput,
StreamSession,
)
from crewai.types.usage_metrics import UsageMetrics
@@ -152,7 +146,6 @@ from crewai.utilities.streaming import (
create_async_frame_generator,
create_frame_generator,
create_frame_streaming_state,
stream_frame_to_chunk,
)
@@ -1921,7 +1914,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> Any | FlowStreamingOutput:
) -> Any | StreamSession[Any]:
"""Start the flow execution in a synchronous context.
This method wraps kickoff_async so that all state initialization and event
@@ -1942,7 +1935,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
``from_checkpoint``; passing both raises ``ValueError``.
Returns:
The final output from the flow or FlowStreamingOutput if streaming.
The final output from the flow or StreamSession if streaming.
"""
if from_checkpoint is not None and restore_from_state_id is not None:
raise ValueError(
@@ -1954,28 +1947,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if restored is not None:
return restored.kickoff(inputs=inputs, input_files=input_files)
if self.stream:
streaming_output: FlowStreamingOutput
def chunk_iterator() -> Iterator[Any]:
stream_session = self.stream_events(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
try:
with stream_session:
for frame in stream_session.llm:
chunk = stream_frame_to_chunk(frame)
if chunk is not None:
yield chunk
streaming_output._set_result(stream_session.result)
finally:
if not stream_session.is_exhausted:
stream_session.close()
streaming_output = FlowStreamingOutput(sync_iterator=chunk_iterator())
return streaming_output
return self.stream_events(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
async def _run_flow() -> Any:
return await self.kickoff_async(
@@ -2002,7 +1978,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> Any | FlowStreamingOutput:
) -> Any | AsyncStreamSession[Any]:
"""Start the flow execution asynchronously.
This method performs state restoration (if an 'id' is provided and persistence is available)
@@ -2036,28 +2012,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if restored is not None:
return await restored.kickoff_async(inputs=inputs, input_files=input_files)
if self.stream:
streaming_output: FlowStreamingOutput
async def chunk_iterator() -> AsyncIterator[Any]:
stream_session = self.astream(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
try:
async with stream_session:
async for frame in stream_session.llm:
chunk = stream_frame_to_chunk(frame)
if chunk is not None:
yield chunk
streaming_output._set_result(stream_session.result)
finally:
if not stream_session.is_exhausted:
await stream_session.aclose()
streaming_output = FlowStreamingOutput(async_iterator=chunk_iterator())
return streaming_output
return self.astream(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
ctx = baggage.set_baggage("flow_inputs", inputs or {})
ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx)
@@ -2401,7 +2360,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> Any | FlowStreamingOutput:
) -> Any | AsyncStreamSession[Any]:
"""Native async method to start the flow execution. Alias for kickoff_async.
Args:

View File

@@ -7,7 +7,7 @@ in CrewAI, including common functionality for native SDK implementations.
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Generator, Iterator
from collections.abc import Generator
from contextlib import contextmanager
import contextvars
from datetime import datetime
@@ -42,13 +42,12 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.types.streaming import LLMStreamingOutput, StreamSession
from crewai.types.streaming import StreamSession
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.pydantic_schema_utils import serialize_model_class
from crewai.utilities.streaming import (
create_frame_generator,
create_frame_streaming_state,
stream_frame_to_chunk,
)
@@ -361,42 +360,6 @@ class BaseLLM(BaseModel, ABC):
output_holder.append(stream_session)
return stream_session
def stream_call(
self,
messages: str | list[LLMMessage],
tools: list[dict[str, BaseTool]] | None = None,
callbacks: list[Any] | None = None,
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
) -> LLMStreamingOutput:
"""Run the LLM call and stream text chunks as they arrive."""
def chunk_iterator() -> Iterator[Any]:
stream_session = self.stream_events(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
try:
with stream_session:
for frame in stream_session.llm:
chunk = stream_frame_to_chunk(frame)
if chunk is not None:
yield chunk
streaming_output._set_result(stream_session.result)
finally:
if not stream_session.is_exhausted:
stream_session.close()
streaming_output = LLMStreamingOutput(sync_iterator=chunk_iterator())
return streaming_output
async def acall(
self,
messages: str | list[LLMMessage],

View File

@@ -42,6 +42,19 @@ class StreamFrame(BaseModel):
previous_id: str | None = None
data: dict[str, Any] = Field(default_factory=dict)
@property
def content(self) -> str:
"""Printable text content for chunk-like consumers."""
chunk = self.data.get("chunk")
if isinstance(chunk, str):
return chunk
return ""
@property
def event(self) -> dict[str, Any]:
"""Structured source event payload."""
return self.data
class StreamSessionBase(Generic[T]):
"""Base stream session with ordered frame iteration and result access."""
@@ -115,6 +128,10 @@ class StreamSession(StreamSessionBase[T]):
"""Iterate over all ordered frames."""
return self.subscribe()
def __iter__(self) -> Iterator[StreamFrame]:
"""Iterate over all ordered frames."""
return self.events
@property
def llm(self) -> Iterator[StreamFrame]:
"""Iterate over LLM token and thinking frames."""
@@ -186,6 +203,10 @@ class AsyncStreamSession(StreamSessionBase[T]):
"""Iterate over all ordered frames."""
return self.subscribe()
def __aiter__(self) -> AsyncIterator[StreamFrame]:
"""Iterate over all ordered frames."""
return self.events
@property
def llm(self) -> AsyncIterator[StreamFrame]:
"""Iterate over LLM token and thinking frames."""
@@ -576,12 +597,3 @@ class FlowStreamingOutput(StreamingOutputBase[Any]):
"""
self._result = result
self._completed = True
class LLMStreamingOutput(StreamingOutputBase[Any]):
"""Streaming output wrapper for direct LLM calls."""
def _set_result(self, result: Any) -> None:
"""Set the final LLM call result after streaming completes."""
self._result = result
self._completed = True

View File

@@ -29,7 +29,7 @@ from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.runtime._actions import FlowScriptExecutionDisabledError
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.tools import BaseTool
from crewai.types.streaming import FlowStreamingOutput
from crewai.types.streaming import StreamSession
class StaticSearchTool(BaseTool):
@@ -2485,7 +2485,7 @@ def test_config_max_method_calls_from_declaration():
def test_config_stream_from_declaration():
flow = Flow.from_declaration(contents=STREAMING_CHAIN_YAML)
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, StreamSession)
for _ in streaming:
pass
assert streaming.result == "confirmed:True"

View File

@@ -14,7 +14,7 @@ from crewai.events.types.llm_events import LLMStreamChunkEvent, LLMThinkingChunk
from crewai.events.types.tool_usage_events import ToolUsageStartedEvent
from crewai.flow.flow import Flow, start
from crewai.llms.base_llm import BaseLLM
from crewai.types.streaming import FlowStreamingOutput, StreamFrame
from crewai.types.streaming import StreamFrame
class FrameFlow(Flow):
@@ -129,36 +129,37 @@ def test_stream_errors_surface_after_failed_frame() -> None:
_ = stream.result
def test_legacy_flow_streaming_uses_llm_frame_projection() -> None:
def test_flow_streaming_returns_iterable_frame_session() -> None:
flow = FrameFlow()
flow.stream = True
streaming = flow.kickoff()
stream = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
chunks = list(streaming)
assert [chunk.content for chunk in chunks] == ["hello", "thinking"]
assert streaming.result == "done"
with stream:
frames = list(stream)
assert all(isinstance(frame, StreamFrame) for frame in frames)
assert [frame.content for frame in frames if frame.content] == [
"hello",
"thinking",
]
first_content_frame = next(frame for frame in frames if frame.content)
assert first_content_frame.event["chunk"] == "hello"
assert stream.result == "done"
def test_direct_llm_stream_events_scope_and_restore_stream_flag() -> None:
llm = DirectStreamingLLM(model="gpt-4o-mini", stream=False)
with llm.stream_events("hello") as stream:
frames = list(stream.llm)
frames = list(stream)
assert [frame.data["chunk"] for frame in frames] == ["hel", "lo"]
assert [frame.content for frame in frames] == ["hel", "lo"]
assert frames[0].event["chunk"] == "hel"
assert stream.result == "hello"
assert llm.stream is False
def test_direct_llm_stream_call_projects_chunks() -> None:
chunks = DirectStreamingLLM(model="gpt-4o-mini").stream_call("hello")
assert [chunk.content for chunk in chunks] == ["hel", "lo"]
assert chunks.result == "hello"
@pytest.mark.asyncio
async def test_astream_scopes_concurrent_executions() -> None:
class ConcurrentFlow(Flow):

View File

@@ -12,10 +12,13 @@ from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMStreamChunkEvent, ToolCall, FunctionCall
from crewai.flow.flow import Flow, start
from crewai.types.streaming import (
AsyncStreamSession,
CrewStreamingOutput,
FlowStreamingOutput,
StreamChunk,
StreamChunkType,
StreamFrame,
StreamSession,
ToolCallChunk,
)
@@ -417,8 +420,8 @@ class TestCrewKickoffStreamingAsync:
class TestFlowKickoffStreaming:
"""Tests for Flow(stream=True).kickoff() method."""
def test_kickoff_streaming_returns_streaming_output(self) -> None:
"""Test that flow kickoff with stream=True returns FlowStreamingOutput."""
def test_kickoff_streaming_returns_stream_session(self) -> None:
"""Test that flow kickoff with stream=True returns StreamSession."""
class SimpleFlow(Flow[dict[str, Any]]):
@start()
@@ -428,7 +431,7 @@ class TestFlowKickoffStreaming:
flow = SimpleFlow()
flow.stream = True
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, StreamSession)
def test_flow_kickoff_streaming_captures_chunks(self) -> None:
"""Test that flow streaming captures LLM chunks from crew execution."""
@@ -469,7 +472,7 @@ class TestFlowKickoffStreaming:
with patch.object(Flow, "kickoff", mock_kickoff_fn):
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, StreamSession)
chunks = list(streaming)
assert len(chunks) >= 2
@@ -500,7 +503,7 @@ class TestFlowKickoffStreaming:
with patch.object(Flow, "kickoff", mock_kickoff_fn):
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, StreamSession)
_ = list(streaming)
result = streaming.result
@@ -511,8 +514,8 @@ class TestFlowKickoffStreamingAsync:
"""Tests for Flow(stream=True).kickoff_async() method."""
@pytest.mark.asyncio
async def test_kickoff_streaming_async_returns_streaming_output(self) -> None:
"""Test that flow kickoff_async with stream=True returns FlowStreamingOutput."""
async def test_kickoff_streaming_async_returns_stream_session(self) -> None:
"""Test that flow kickoff_async with stream=True returns AsyncStreamSession."""
class SimpleFlow(Flow[dict[str, Any]]):
@start()
@@ -522,7 +525,7 @@ class TestFlowKickoffStreamingAsync:
flow = SimpleFlow()
flow.stream = True
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, AsyncStreamSession)
@pytest.mark.asyncio
async def test_flow_kickoff_streaming_async_captures_chunks(self) -> None:
@@ -567,8 +570,8 @@ class TestFlowKickoffStreamingAsync:
with patch.object(Flow, "kickoff_async", mock_kickoff_fn):
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
chunks: list[StreamChunk] = []
assert isinstance(streaming, AsyncStreamSession)
chunks: list[StreamFrame] = []
async for chunk in streaming:
chunks.append(chunk)
@@ -601,7 +604,7 @@ class TestFlowKickoffStreamingAsync:
with patch.object(Flow, "kickoff_async", mock_kickoff_fn):
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, AsyncStreamSession)
async for _ in streaming:
pass

View File

@@ -4,7 +4,7 @@ import pytest
from crewai import Agent, Crew, Task
from crewai.flow.flow import Flow, start
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.types.streaming import AsyncStreamSession, CrewStreamingOutput, StreamSession
@pytest.fixture
@@ -212,7 +212,7 @@ class TestStreamingFlowIntegration:
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, StreamSession)
chunks = []
for chunk in streaming:
@@ -281,7 +281,7 @@ class TestStreamingFlowIntegration:
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, AsyncStreamSession)
chunks = []
async for chunk in streaming: