From a48f45c9179fff4a2383f3842f40d781deb7cb6b Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Mon, 29 Jun 2026 14:16:08 -0700 Subject: [PATCH] Add direct LLM streaming helpers --- .../en/learn/streaming-runtime-contract.mdx | 47 ++++++++++- examples/stream_frame_debug_runner.py | 31 +++++++ lib/crewai/src/crewai/llms/base_llm.py | 81 ++++++++++++++++++- lib/crewai/src/crewai/types/streaming.py | 9 +++ lib/crewai/tests/test_stream_frames.py | 40 +++++++++ 5 files changed, 203 insertions(+), 5 deletions(-) create mode 100644 examples/stream_frame_debug_runner.py diff --git a/docs/edge/en/learn/streaming-runtime-contract.mdx b/docs/edge/en/learn/streaming-runtime-contract.mdx index 1e7b4da72..77739ab2c 100644 --- a/docs/edge/en/learn/streaming-runtime-contract.mdx +++ b/docs/edge/en/learn/streaming-runtime-contract.mdx @@ -1,15 +1,15 @@ --- title: Streaming Runtime Contract -description: Stream ordered runtime frames from Flows and conversational turns. +description: Stream ordered runtime frames from Flows, direct LLM calls, and conversational turns. icon: tower-broadcast mode: "wide" --- ## Overview -CrewAI exposes a frame-based streaming contract for runtimes that need more than plain text chunks. The contract emits ordered `StreamFrame` objects for Flow lifecycle events, LLM tokens, tool activity, conversation messages, and custom events. +CrewAI exposes a frame-based streaming contract for runtimes that need more than plain text chunks. The contract emits ordered `StreamFrame` objects for Flow lifecycle events, direct LLM tokens, tool activity, conversation messages, and custom events. -Use this API when you are building a UI, service bridge, terminal app, or deployment runtime that needs a stable stream of structured events while a Flow is running. +Use this API when you are building a UI, service bridge, terminal app, or deployment runtime that needs a stable stream of structured events while a Flow, chat turn, or direct LLM call is running. ## StreamFrame @@ -113,6 +113,45 @@ result = stream.result The async session has the same projections as the sync session. +## Stream a Direct LLM Call + +`llm.call(...)` still returns the final assembled result. Use `llm.stream_call(...)` when you want to iterate over chunks as they arrive: + +```python +from crewai import LLM + + +llm = LLM(model="gpt-4o-mini") +chunks = llm.stream_call( + messages=[ + { + "role": "user", + "content": "Explain CrewAI streaming in two short sentences.", + } + ] +) + +for chunk in chunks: + print(chunk.content, end="", flush=True) + +result = chunks.result +``` + +Use `llm.stream_events(...)` when a runtime needs the full `StreamFrame` envelope rather than text chunks: + +```python +stream = llm.stream_events("Explain CrewAI streaming in two short sentences.") + +with stream: + for frame in stream.llm: + if frame.type == "llm_stream_chunk": + print(frame.data.get("chunk", ""), end="", flush=True) + +result = stream.result +``` + +Both methods temporarily enable streaming for the wrapped call and restore the LLM's previous `stream` setting afterward. Provider integrations continue to emit the underlying LLM stream events; these helpers provide a common iterator API over those events for every LLM provider. + ## Conversational Turns Conversational Flows can stream one user turn with `stream_turn()`: @@ -159,4 +198,4 @@ For async streams, use `await stream.aclose()`. ## Legacy Chunk Streaming -Crew streaming with `stream=True` still returns the chunk-oriented `CrewStreamingOutput` API described in [Streaming Crew Execution](/en/learn/streaming-crew-execution). The frame contract is intended for runtimes that need a stable event envelope across Flows, conversational turns, LLM output, tools, and messages. +Crew streaming with `stream=True` still returns the chunk-oriented `CrewStreamingOutput` API described in [Streaming Crew Execution](/en/learn/streaming-crew-execution). Direct `llm.call(...)` still returns the final LLM result. The frame contract is intended for runtimes that need a stable event envelope across Flows, direct LLM calls, conversational turns, tools, and messages. diff --git a/examples/stream_frame_debug_runner.py b/examples/stream_frame_debug_runner.py new file mode 100644 index 000000000..5ff92f57d --- /dev/null +++ b/examples/stream_frame_debug_runner.py @@ -0,0 +1,31 @@ +"""Minimal direct LLM streaming runner. + +Run from the repo root: + + uv run python examples/stream_frame_debug_runner.py +""" + +from __future__ import annotations + +# ruff: noqa: T201 +import os + +from crewai import LLM + + +llm = LLM(model=os.getenv("OPENAI_MODEL", "gpt-4o-mini")) +messages = [ + { + "role": "user", + "content": "Explain CrewAI streaming in two short sentences.", + } +] + +chunks = llm.stream_call(messages=messages) + +print("--- chunks ---") +for chunk in chunks: + print(chunk.content, end="", flush=True) + +# print("\n\n--- result ---") +# print(chunks.result) diff --git a/lib/crewai/src/crewai/llms/base_llm.py b/lib/crewai/src/crewai/llms/base_llm.py index 94d5eb6b9..04b5708c2 100644 --- a/lib/crewai/src/crewai/llms/base_llm.py +++ b/lib/crewai/src/crewai/llms/base_llm.py @@ -7,7 +7,7 @@ in CrewAI, including common functionality for native SDK implementations. from __future__ import annotations from abc import ABC, abstractmethod -from collections.abc import Generator +from collections.abc import Generator, Iterator from contextlib import contextmanager import contextvars from datetime import datetime @@ -42,8 +42,14 @@ from crewai.events.types.tool_usage_events import ( ToolUsageFinishedEvent, ToolUsageStartedEvent, ) +from crewai.types.streaming import LLMStreamingOutput, StreamSession from crewai.types.usage_metrics import UsageMetrics from crewai.utilities.pydantic_schema_utils import serialize_model_class +from crewai.utilities.streaming import ( + create_frame_generator, + create_frame_streaming_state, + stream_frame_to_chunk, +) try: @@ -318,6 +324,79 @@ class BaseLLM(BaseModel, ABC): RuntimeError: If the LLM request fails for other reasons. """ + def stream_events( + self, + messages: str | list[LLMMessage], + tools: list[dict[str, BaseTool]] | None = None, + callbacks: list[Any] | None = None, + available_functions: dict[str, Any] | None = None, + from_task: Task | None = None, + from_agent: BaseAgent | None = None, + response_model: type[BaseModel] | None = None, + ) -> StreamSession[Any]: + """Run the LLM call and stream scoped public ``StreamFrame`` events.""" + result_holder: list[Any] = [] + state = create_frame_streaming_state(result_holder, use_async=False) + output_holder: list[StreamSession[Any]] = [] + + def run_llm_call() -> Any: + original_stream = self.stream + try: + self.stream = True + return self.call( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + finally: + self.stream = original_stream + + stream_session: StreamSession[Any] = StreamSession( + sync_iterator=create_frame_generator(state, run_llm_call, output_holder) + ) + output_holder.append(stream_session) + return stream_session + + def stream_call( + self, + messages: str | list[LLMMessage], + tools: list[dict[str, BaseTool]] | None = None, + callbacks: list[Any] | None = None, + available_functions: dict[str, Any] | None = None, + from_task: Task | None = None, + from_agent: BaseAgent | None = None, + response_model: type[BaseModel] | None = None, + ) -> LLMStreamingOutput: + """Run the LLM call and stream text chunks as they arrive.""" + + def chunk_iterator() -> Iterator[Any]: + stream_session = self.stream_events( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + try: + with stream_session: + for frame in stream_session.llm: + chunk = stream_frame_to_chunk(frame) + if chunk is not None: + yield chunk + streaming_output._set_result(stream_session.result) + finally: + if not stream_session.is_exhausted: + stream_session.close() + + streaming_output = LLMStreamingOutput(sync_iterator=chunk_iterator()) + return streaming_output + async def acall( self, messages: str | list[LLMMessage], diff --git a/lib/crewai/src/crewai/types/streaming.py b/lib/crewai/src/crewai/types/streaming.py index 11baffe90..1b56cd140 100644 --- a/lib/crewai/src/crewai/types/streaming.py +++ b/lib/crewai/src/crewai/types/streaming.py @@ -576,3 +576,12 @@ class FlowStreamingOutput(StreamingOutputBase[Any]): """ self._result = result self._completed = True + + +class LLMStreamingOutput(StreamingOutputBase[Any]): + """Streaming output wrapper for direct LLM calls.""" + + def _set_result(self, result: Any) -> None: + """Set the final LLM call result after streaming completes.""" + self._result = result + self._completed = True diff --git a/lib/crewai/tests/test_stream_frames.py b/lib/crewai/tests/test_stream_frames.py index bc3efedd9..13ab1869b 100644 --- a/lib/crewai/tests/test_stream_frames.py +++ b/lib/crewai/tests/test_stream_frames.py @@ -13,6 +13,7 @@ from crewai.events.types.flow_events import ConversationMessageAddedEvent from crewai.events.types.llm_events import LLMStreamChunkEvent, LLMThinkingChunkEvent from crewai.events.types.tool_usage_events import ToolUsageStartedEvent from crewai.flow.flow import Flow, start +from crewai.llms.base_llm import BaseLLM from crewai.types.streaming import FlowStreamingOutput, StreamFrame @@ -57,6 +58,27 @@ class FrameFlow(Flow): return "done" +class DirectStreamingLLM(BaseLLM): + def call(self, messages: Any, *args: Any, **kwargs: Any) -> str: + crewai_event_bus.emit( + self, + LLMStreamChunkEvent( + type="llm_stream_chunk", + chunk="hel", + call_id="call-1", + ), + ) + crewai_event_bus.emit( + self, + LLMStreamChunkEvent( + type="llm_stream_chunk", + chunk="lo", + call_id="call-1", + ), + ) + return "hello" + + def test_stream_frame_contract_and_ordering() -> None: stream = FrameFlow().stream_events() @@ -119,6 +141,24 @@ def test_legacy_flow_streaming_uses_llm_frame_projection() -> None: assert streaming.result == "done" +def test_direct_llm_stream_events_scope_and_restore_stream_flag() -> None: + llm = DirectStreamingLLM(model="gpt-4o-mini", stream=False) + + with llm.stream_events("hello") as stream: + frames = list(stream.llm) + + assert [frame.data["chunk"] for frame in frames] == ["hel", "lo"] + assert stream.result == "hello" + assert llm.stream is False + + +def test_direct_llm_stream_call_projects_chunks() -> None: + chunks = DirectStreamingLLM(model="gpt-4o-mini").stream_call("hello") + + assert [chunk.content for chunk in chunks] == ["hel", "lo"] + assert chunks.result == "hello" + + @pytest.mark.asyncio async def test_astream_scopes_concurrent_executions() -> None: class ConcurrentFlow(Flow):