Add streaming docs to the navigation (#6403)

This commit is contained in:
Lorenze Jay
2026-06-30 16:00:38 -07:00
committed by GitHub
parent 694881c7bf
commit b37505bcf9
3 changed files with 316 additions and 0 deletions

View File

@@ -159,6 +159,7 @@
"edge/en/concepts/tasks",
"edge/en/concepts/crews",
"edge/en/concepts/flows",
"edge/en/concepts/streaming",
"edge/en/concepts/production-architecture",
"edge/en/concepts/knowledge",
"edge/en/concepts/skills",
@@ -365,6 +366,7 @@
"edge/en/learn/kickoff-async",
"edge/en/learn/kickoff-for-each",
"edge/en/learn/streaming-runtime-contract",
"edge/en/learn/consuming-streams",
"edge/en/learn/llm-connections",
"edge/en/learn/litellm-removal-guide",
"edge/en/learn/multimodal-agents",

View File

@@ -0,0 +1,137 @@
---
title: Streaming
description: Understand CrewAI's streaming model for Flows, direct LLM calls, tools, and conversational turns.
icon: radio
mode: "wide"
---
## Overview
Streaming lets your application receive execution updates while work is still running. Instead of waiting for the final result, you can render LLM tokens, tool activity, Flow lifecycle events, and conversation messages as they happen.
CrewAI has two streaming surfaces:
| Surface | Used by | Output |
|---------|---------|--------|
| Frame streaming | Flows, direct LLM calls, conversational turns | Ordered `StreamFrame` objects |
| Crew chunk streaming | Crews with `stream=True` | `CrewStreamingOutput` chunks |
For new runtime integrations, UIs, terminal apps, service bridges, and conversational surfaces, use frame streaming. It provides one stable event envelope across the runtime.
## StreamFrame
A `StreamFrame` is the common object emitted by streamable runtimes:
```python
frame.id # unique frame id
frame.seq # execution-local order, when available
frame.type # source event type, such as "llm_stream_chunk"
frame.channel # "llm", "flow", "tools", "messages", "lifecycle", or "custom"
frame.namespace # source/runtime namespace
frame.timestamp # event timestamp
frame.parent_id # parent event id, when available
frame.previous_id # previous event id, when available
frame.data # structured event payload
frame.event # alias for frame.data
frame.content # printable text for token-like frames, otherwise ""
```
The important fields for most consumers are:
| Field | Use it for |
|-------|------------|
| `channel` | Routing frames to the right UI region |
| `type` | Handling a specific event inside a channel |
| `content` | Printing token-like text |
| `event` | Reading structured metadata, such as tool names or message roles |
| `seq` | Preserving execution order |
## Channels
Frames are grouped into high-level channels:
| Channel | Contains |
|---------|----------|
| `llm` | LLM call lifecycle, text chunks, and thinking chunks |
| `flow` | Flow lifecycle, method execution, routing, pause, and resume events |
| `tools` | Tool usage start, finish, and error events |
| `messages` | Conversation transcript events |
| `lifecycle` | Runtime lifecycle events that do not belong to another channel |
| `custom` | Events that do not map to a built-in channel |
The stream itself remains one ordered timeline. Channel projections let consumers focus on only part of that timeline.
```mermaid
flowchart LR
A["flow<br/>flow_started"] --> B["llm<br/>llm_call_started"]
B --> C["llm<br/>llm_stream_chunk"]
C --> D["tools<br/>tool_usage_started"]
D --> E["tools<br/>tool_usage_finished"]
E --> F["llm<br/>llm_stream_chunk"]
F --> G["flow<br/>flow_finished"]
```
## Stream Sessions
Frame streaming returns a stream session:
```python
stream = flow.stream_events(inputs={"topic": "AI agents"})
```
The session is both an iterator and the holder for the final result:
```python
with stream:
for frame in stream:
print(frame.content, end="", flush=True)
result = stream.result
```
Consume the stream before reading `stream.result`. Reading the result too early raises an error because the runtime may still be producing frames.
## Channel Projections
Use channel projections when you only need one kind of frame:
```python
with flow.stream_events(inputs={"topic": "AI agents"}) as stream:
for frame in stream.llm:
print(frame.content, end="", flush=True)
result = stream.result
```
Available projections:
| Projection | Frames |
|------------|--------|
| `stream.events` | All frames |
| `stream.llm` | LLM frames |
| `stream.flow` | Flow frames |
| `stream.tools` | Tool frames |
| `stream.messages` | Conversation message frames |
| `stream.interleave([...])` | Selected channels in relative order |
## Entrypoints
Use the entrypoint that matches the runtime you are streaming:
| Runtime | Streaming entrypoint |
|---------|----------------------|
| Flow | `flow.stream_events(...)` |
| Flow with `stream=True` | `flow.kickoff(...)` returns a stream session |
| Async Flow | `flow.astream(...)` or `await flow.kickoff_async(...)` when `stream=True` |
| Direct LLM call | `llm.stream_events(...)` |
| Conversational Flow turn | `flow.stream_turn(...)` |
| Crew | `Crew(..., stream=True).kickoff(...)` returns `CrewStreamingOutput` |
Direct `llm.call(...)` still returns the final assembled LLM result. Use `llm.stream_events(...)` when you want to iterate over LLM chunks as they arrive.
## Related Guides
- [Consuming Streams](/edge/en/learn/consuming-streams)
- [Streaming Runtime Contract](/edge/en/learn/streaming-runtime-contract)
- [Streaming Flow Execution](/edge/en/learn/streaming-flow-execution)
- [Streaming Crew Execution](/edge/en/learn/streaming-crew-execution)

View File

@@ -0,0 +1,177 @@
---
title: Consuming Streams
description: Print LLM chunks, observe tool events, and read final results from CrewAI streams.
icon: square-terminal
mode: "wide"
---
## Overview
Use this guide when you want to subscribe to a CrewAI stream and print or route frames as they arrive.
The basic pattern is:
```python
stream = flow.stream_events(inputs={"topic": "AI agents"})
with stream:
for frame in stream:
...
result = stream.result
```
Always consume the stream before reading `stream.result`.
## Print LLM Output
If you only care about text generated by LLM calls, subscribe to the `llm` projection and print `frame.content`:
```python
stream = flow.stream_events(inputs={"topic": "AI agents"})
with stream:
for frame in stream.llm:
print(frame.content, end="", flush=True)
print()
result = stream.result
```
`frame.content` is an empty string for frames that do not carry printable text, so this is also safe:
```python
with flow.stream_events(inputs={"topic": "AI agents"}) as stream:
for frame in stream.events:
if frame.channel == "llm" and frame.content:
print(frame.content, end="", flush=True)
result = stream.result
```
## Print Tool Activity
Tool events arrive on the `tools` channel. Use `frame.type` to distinguish starts, finishes, and errors.
```python
with flow.stream_events(inputs={"topic": "AI agents"}) as stream:
for frame in stream.events:
if frame.channel == "llm" and frame.content:
print(frame.content, end="", flush=True)
if frame.channel == "tools" and frame.type == "tool_usage_started":
print(f"\nTool started: {frame.event.get('tool_name')}")
if frame.channel == "tools" and frame.type == "tool_usage_finished":
print(f"\nTool finished: {frame.event.get('tool_name')}")
result = stream.result
```
`frame.event` is the structured payload for the source event. Use it for metadata such as tool names, arguments, message roles, and runtime identifiers.
## Watch Flow Progress
Flow lifecycle and method execution frames arrive on the `flow` channel:
```python
with flow.stream_events(inputs={"topic": "AI agents"}) as stream:
for frame in stream.flow:
print(frame.type, frame.namespace)
result = stream.result
```
Use this when you want a progress log instead of token-level output.
## Interleave Selected Channels
Use `interleave()` when you want a subset of channels while preserving their relative order:
```python
with flow.stream_events(inputs={"topic": "AI agents"}) as stream:
for frame in stream.interleave(["llm", "tools"]):
if frame.channel == "llm":
print(frame.content, end="", flush=True)
elif frame.type == "tool_usage_started":
print(f"\nTool: {frame.event.get('tool_name')}")
result = stream.result
```
## Stream a Direct LLM Call
Direct `llm.call(...)` returns the final assembled result. To stream a direct LLM call, use `llm.stream_events(...)`:
```python
from crewai import LLM
llm = LLM(model="gpt-4o-mini")
stream = llm.stream_events("Explain streaming in one sentence.")
with stream:
for frame in stream.llm:
print(frame.content, end="", flush=True)
print()
result = stream.result
```
## Stream a Conversational Turn
Conversational Flows expose `stream_turn()` for one user message:
```python
stream = flow.stream_turn(
"What can you help me with?",
session_id="session-1",
)
with stream:
for frame in stream.interleave(["llm", "messages"]):
if frame.channel == "llm":
print(frame.content, end="", flush=True)
elif frame.channel == "messages":
print(f"\n{frame.event.get('role')}: {frame.event.get('content')}")
reply = stream.result
```
## Async Consumers
Async streams use the same channel projections:
```python
stream = flow.astream(inputs={"topic": "AI agents"})
async with stream:
async for frame in stream.llm:
print(frame.content, end="", flush=True)
result = stream.result
```
## Cleanup
Use the stream as a context manager when possible. If a client disconnects or you stop consuming early, close the stream:
```python
stream = flow.stream_events(inputs={"topic": "AI agents"})
try:
for frame in stream.events:
print(frame.content, end="", flush=True)
finally:
if not stream.is_exhausted:
stream.close()
```
For async streams, call `await stream.aclose()`.
## See Also
- [Streaming](/edge/en/concepts/streaming)
- [Streaming Runtime Contract](/edge/en/learn/streaming-runtime-contract)
- [Streaming Flow Execution](/edge/en/learn/streaming-flow-execution)
- [Streaming Crew Execution](/edge/en/learn/streaming-crew-execution)