Compare commits

..

11 Commits

Author SHA1 Message Date
lorenzejay
bb290fa967 Preserve LLM instance state for stream events 2026-06-29 16:50:46 -07:00
lorenzejay
5e6fdc8374 Fix streaming contract docs link 2026-06-29 16:36:42 -07:00
lorenzejay
4fda0c0eec Remove stream frame version field 2026-06-29 16:09:23 -07:00
lorenzejay
926057635d Replay cached stream frame projections 2026-06-29 15:54:37 -07:00
lorenzejay
7de7e32bb2 Address streaming contract review feedback 2026-06-29 15:35:18 -07:00
lorenzejay
01c7915528 Drop stream frame debug runner example 2026-06-29 15:30:31 -07:00
lorenzejay
71edf39fee Update flow streaming integration properties 2026-06-29 15:06:21 -07:00
lorenzejay
90b06a4523 Unify flow streaming frame items 2026-06-29 14:50:17 -07:00
lorenzejay
a48f45c917 Add direct LLM streaming helpers 2026-06-29 14:16:08 -07:00
lorenzejay
5ebf686254 Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/imp/streaming-contract 2026-06-29 13:57:25 -07:00
lorenzejay
72d78387bc Define stream frame protocol for flows 2026-06-29 13:51:37 -07:00
28 changed files with 2319 additions and 299 deletions

View File

@@ -364,6 +364,7 @@
"edge/en/learn/human-feedback-in-flows",
"edge/en/learn/kickoff-async",
"edge/en/learn/kickoff-for-each",
"edge/en/learn/streaming-runtime-contract",
"edge/en/learn/llm-connections",
"edge/en/learn/litellm-removal-guide",
"edge/en/learn/multimodal-agents",
@@ -9587,6 +9588,7 @@
"edge/pt-BR/learn/human-feedback-in-flows",
"edge/pt-BR/learn/kickoff-async",
"edge/pt-BR/learn/kickoff-for-each",
"edge/pt-BR/learn/streaming-runtime-contract",
"edge/pt-BR/learn/llm-connections",
"edge/pt-BR/learn/multimodal-agents",
"edge/pt-BR/learn/replay-tasks-from-latest-crew-kickoff",
@@ -18477,6 +18479,7 @@
"edge/ko/learn/human-feedback-in-flows",
"edge/ko/learn/kickoff-async",
"edge/ko/learn/kickoff-for-each",
"edge/ko/learn/streaming-runtime-contract",
"edge/ko/learn/llm-connections",
"edge/ko/learn/multimodal-agents",
"edge/ko/learn/replay-tasks-from-latest-crew-kickoff",
@@ -27583,6 +27586,7 @@
"edge/ar/learn/human-feedback-in-flows",
"edge/ar/learn/kickoff-async",
"edge/ar/learn/kickoff-for-each",
"edge/ar/learn/streaming-runtime-contract",
"edge/ar/learn/llm-connections",
"edge/ar/learn/multimodal-agents",
"edge/ar/learn/replay-tasks-from-latest-crew-kickoff",

View File

@@ -11,7 +11,7 @@ mode: "wide"
## كيف يعمل بث التدفق
عند تفعيل البث في تدفق، يلتقط CrewAI ويبث المخرجات من أي أطقم أو استدعاءات LLM داخل التدفق. يقدم البث أجزاء منظمة تحتوي على المحتوى وسياق المهمة ومعلومات الوكيل مع تقدم التنفيذ.
عند تفعيل البث في تدفق، يلتقط CrewAI ويبث المخرجات من أي أطقم أو استدعاءات LLM أو أدوات أو أحداث دورة حياة داخل التدفق. يقدم البث عناصر `StreamFrame` مرتبة تحتوي على محتوى قابل للطباعة وبيانات حدث مهيكلة مع تقدم التنفيذ.
## تفعيل البث
@@ -52,7 +52,7 @@ class ResearchFlow(Flow):
## البث المتزامن
عند استدعاء `kickoff()` على تدفق مع تفعيل البث، يُرجع كائن `FlowStreamingOutput` يمكنك التكرار عليه:
عند استدعاء `kickoff()` على تدفق مع تفعيل البث، يُرجع جلسة stream تنتج عناصر `StreamFrame` مرتبة:
```python Code
flow = ResearchFlow()
@@ -60,44 +60,43 @@ flow = ResearchFlow()
# Start streaming execution
streaming = flow.kickoff()
# Iterate over chunks as they arrive
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Iterate over stream items as they arrive
for item in streaming:
print(item.content, end="", flush=True)
# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result}")
```
### معلومات جزء البث
### معلومات عنصر البث
يوفر كل جزء سياقاً حول مصدره في التدفق:
يوفر كل عنصر محتوى قابلاً للطباعة وبيانات حدث مهيكلة:
```python Code
streaming = flow.kickoff()
for chunk in streaming:
print(f"Agent: {chunk.agent_role}")
print(f"Task: {chunk.task_name}")
print(f"Content: {chunk.content}")
print(f"Type: {chunk.chunk_type}") # TEXT or TOOL_CALL
for item in streaming:
print(f"Channel: {item.channel}")
print(f"Type: {item.type}")
print(f"Content: {item.content}")
print(f"Event payload: {item.event}")
```
### الوصول إلى خصائص البث
يوفر كائن `FlowStreamingOutput` خصائص وطرق مفيدة:
توفر جلسة stream خصائص وطرق مفيدة:
```python Code
streaming = flow.kickoff()
# Iterate and collect chunks
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Iterate and collect items
for item in streaming:
print(item.content, end="", flush=True)
# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"Total chunks: {len(streaming.chunks)}")
print(f"Total frames: {len(streaming.frames)}")
print(f"Final result: {streaming.result}")
```
@@ -114,9 +113,9 @@ async def stream_flow():
# Start async streaming
streaming = await flow.kickoff_async()
# Async iteration over chunks
async for chunk in streaming:
print(chunk.content, end="", flush=True)
# Async iteration over stream items
async for item in streaming:
print(item.content, end="", flush=True)
# Access final result
result = streaming.result
@@ -181,13 +180,14 @@ flow = MultiStepFlow()
streaming = flow.kickoff()
current_step = ""
for chunk in streaming:
for item in streaming:
# Track which flow step is executing
if chunk.task_name != current_step:
current_step = chunk.task_name
print(f"\n\n=== {chunk.task_name} ===\n")
step_name = item.event.get("method_name") or item.event.get("task_name")
if step_name and step_name != current_step:
current_step = step_name
print(f"\n\n=== {step_name} ===\n")
print(chunk.content, end="", flush=True)
print(item.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal analysis: {result}")
@@ -201,7 +201,6 @@ print(f"\n\nFinal analysis: {result}")
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType
class ResearchPipeline(Flow):
stream = True
@@ -254,33 +253,35 @@ async def run_with_dashboard():
current_agent = ""
current_task = ""
chunk_count = 0
frame_count = 0
async for chunk in streaming:
chunk_count += 1
async for item in streaming:
frame_count += 1
# Display phase transitions
if chunk.task_name != current_task:
current_task = chunk.task_name
current_agent = chunk.agent_role
task_name = item.event.get("task_name", "")
agent_role = item.event.get("agent_role", "")
if task_name and task_name != current_task:
current_task = task_name
current_agent = agent_role
print(f"\n\n📋 Phase: {current_task}")
print(f"👤 Agent: {current_agent}")
print("-" * 60)
# Display text output
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
if item.content:
print(item.content, end="", flush=True)
# Display tool usage
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\n🔧 Tool: {chunk.tool_call.tool_name}")
elif item.channel == "tools":
print(f"\n🔧 Tool event: {item.type}")
# Show completion summary
result = streaming.result
print(f"\n\n{'='*60}")
print("PIPELINE COMPLETE")
print(f"{'='*60}")
print(f"Total chunks: {chunk_count}")
print(f"Total frames: {frame_count}")
print(f"Final output length: {len(str(result))} characters")
asyncio.run(run_with_dashboard())
@@ -353,8 +354,8 @@ class StatefulStreamingFlow(Flow[AnalysisState]):
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})
for chunk in streaming:
print(chunk.content, end="", flush=True)
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal state:")
@@ -374,29 +375,29 @@ print(f"Insights length: {len(flow.state.insights)}")
- **تتبع التقدم**: إظهار المرحلة الحالية من سير العمل للمستخدمين
- **لوحات المعلومات الحية**: إنشاء واجهات مراقبة لتدفقات الإنتاج
## أنواع أجزاء البث
## قنوات إطارات البث
مثل بث الطاقم، يمكن أن تكون أجزاء التدفق من أنواع مختلفة:
ينتج بث التدفق عناصر `StreamFrame` عبر عدة قنوات:
### أجزاء TEXT
### إطارات LLM
محتوى نصي قياسي من استجابات LLM:
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
for item in streaming:
if item.channel == "llm" and item.content:
print(item.content, end="", flush=True)
```
### أجزاء TOOL_CALL
### إطارات الأدوات
معلومات حول استدعاءات الأدوات داخل التدفق:
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\nTool: {chunk.tool_call.tool_name}")
print(f"Args: {chunk.tool_call.arguments}")
for item in streaming:
if item.channel == "tools":
print(f"\nTool event: {item.type}")
print(f"Payload: {item.event}")
```
## معالجة الأخطاء
@@ -408,8 +409,8 @@ flow = ResearchFlow()
streaming = flow.kickoff()
try:
for chunk in streaming:
print(chunk.content, end="", flush=True)
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\nSuccess! Result: {result}")
@@ -422,7 +423,7 @@ except Exception as e:
## الإلغاء وتنظيف الموارد
يدعم `FlowStreamingOutput` الإلغاء السلس بحيث يتوقف العمل الجاري فوراً عند انقطاع اتصال المستهلك.
تدعم جلسة stream الإلغاء السلس بحيث يتوقف العمل الجاري فوراً عند انقطاع اتصال المستهلك.
### مدير السياق غير المتزامن
@@ -430,8 +431,8 @@ except Exception as e:
streaming = await flow.kickoff_async()
async with streaming:
async for chunk in streaming:
print(chunk.content, end="", flush=True)
async for item in streaming:
print(item.content, end="", flush=True)
```
### الإلغاء الصريح
@@ -439,8 +440,8 @@ async with streaming:
```python Code
streaming = await flow.kickoff_async()
try:
async for chunk in streaming:
print(chunk.content, end="", flush=True)
async for item in streaming:
print(item.content, end="", flush=True)
finally:
await streaming.aclose() # غير متزامن
# streaming.close() # المكافئ المتزامن
@@ -451,10 +452,10 @@ finally:
## ملاحظات مهمة
- يفعّل البث تلقائياً بث LLM لأي أطقم مستخدمة داخل التدفق
- يجب التكرار عبر جميع الأجزاء قبل الوصول إلى خاصية `.result`
- يجب التكرار عبر جميع عناصر stream قبل الوصول إلى خاصية `.result`
- يعمل البث مع كل من حالة التدفق المنظمة وغير المنظمة
- يلتقط بث التدفق المخرجات من جميع الأطقم واستدعاءات LLM في التدفق
- يتضمن كل جزء سياقاً حول الوكيل والمهمة التي ولدته
- يتضمن كل إطار سياق حدث مهيكلاً مثل القناة والنوع والنطاق والحمولة
- يضيف البث حملاً ضئيلاً لتنفيذ التدفق
## الدمج مع تصور التدفق
@@ -468,8 +469,8 @@ flow.plot("research_flow") # Creates HTML visualization
# Run with streaming
streaming = flow.kickoff()
for chunk in streaming:
print(chunk.content, end="", flush=True)
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")

View File

@@ -0,0 +1,194 @@
---
title: عقد بث وقت التشغيل
description: بث إطارات وقت تشغيل مرتبة من التدفقات واستدعاءات LLM المباشرة ودورات المحادثة.
icon: tower-broadcast
mode: "wide"
---
## نظرة عامة
يوفر CrewAI عقد بث قائمًا على الإطارات للأنظمة التي تحتاج إلى أكثر من أجزاء نصية بسيطة. يصدر العقد كائنات `StreamFrame` مرتبة لأحداث دورة حياة Flow، وتوكنات LLM المباشرة، ونشاط الأدوات، ورسائل المحادثة، والأحداث المخصصة.
استخدم هذه الواجهة عندما تبني واجهة مستخدم، أو جسر خدمة، أو تطبيق طرفية، أو وقت تشغيل نشر يحتاج إلى تدفق ثابت من الأحداث المهيكلة أثناء تشغيل Flow أو دورة محادثة أو استدعاء LLM مباشر.
## StreamFrame
لكل إطار نفس الغلاف:
```python
from crewai.types.streaming import StreamFrame
frame.id # معرف إطار فريد
frame.seq # ترتيب محلي للتنفيذ، عند توفره
frame.type # نوع الحدث المصدر، مثل "flow_started"
frame.channel # "llm", "flow", "tools", "messages", "lifecycle", or "custom"
frame.namespace # نطاق المصدر/وقت التشغيل
frame.timestamp # طابع وقت الحدث
frame.parent_id # معرف الحدث الأب، عند توفره
frame.previous_id # معرف الحدث السابق، عند توفره
frame.data # حمولة الحدث
frame.event # اسم بديل لـ frame.data
frame.content # نص قابل للطباعة لإطارات التوكن، وإلا ""
```
حقل `channel` هو أسرع طريقة لتوجيه الإطارات في المستهلكين:
| القناة | تحتوي على |
|--------|-----------|
| `llm` | توكنات وأجزاء التفكير من أحداث بث LLM |
| `flow` | دورة حياة Flow، وتنفيذ الدوال، والتوجيه، وأحداث الإيقاف/الاستئناف |
| `tools` | أحداث استخدام الأدوات |
| `messages` | أحداث سجل المحادثة |
| `lifecycle` | أحداث دورة حياة وقت التشغيل التي لا تخص قناة أخرى |
| `custom` | أحداث لا تُطابق قناة مدمجة |
يحافظ `frame.type` على نوع الحدث المصدر، حتى يتمكن المستهلكون من التعامل مع أحداث محددة داخل القناة.
## بث Flow
عيّن `stream=True` على Flow لجعل `kickoff()` يعيد جلسة stream:
```python
from crewai.flow import Flow, start
class ReportFlow(Flow):
@start()
def generate(self):
return "done"
flow = ReportFlow(stream=True)
stream = flow.kickoff()
with stream:
for chunk in stream:
print(chunk.content, end="", flush=True)
if chunk.type == "tool_usage_started":
print(chunk.event["tool_name"])
result = stream.result
```
يجب استهلاك stream قبل قراءة `stream.result`. يؤدي الوصول إلى النتيجة مبكرًا إلى رفع `RuntimeError` حتى لا يتعامل المستهلكون بالخطأ مع تشغيل جزئي على أنه مكتمل.
يمكنك أيضًا استدعاء `flow.stream_events(...)` مباشرة عندما تريد البث لاستدعاء واحد بدون تعيين `stream=True` على مثيل Flow.
## التصفية حسب القناة
يوفر `StreamSession` إسقاطات حسب القناة تحافظ على ترتيب الإطارات العالمي داخل القناة المحددة:
```python
stream = flow.stream_events()
with stream:
for frame in stream.llm:
print(frame.content, end="", flush=True)
result = stream.result
```
الإسقاطات المتاحة هي:
| الإسقاط | الإطارات |
|---------|----------|
| `stream.events` | كل الإطارات |
| `stream.llm` | إطارات LLM |
| `stream.messages` | إطارات رسائل المحادثة |
| `stream.flow` | إطارات Flow |
| `stream.tools` | إطارات الأدوات |
| `stream.interleave([...])` | مجموعة مختارة من القنوات |
استخدم `stream.interleave(["flow", "llm", "messages"])` عندما يريد المستهلك بعض القنوات فقط لكنه ما زال يحتاج إلى ترتيبها النسبي.
## البث غير المتزامن
استخدم `astream()` للمستهلكين غير المتزامنين:
```python
flow = ReportFlow()
stream = flow.astream()
async with stream:
async for chunk in stream.events:
print(chunk.channel, chunk.type, chunk.content)
result = stream.result
```
تملك الجلسة غير المتزامنة نفس إسقاطات الجلسة المتزامنة.
## بث استدعاء LLM مباشر
ما زال `llm.call(...)` يعيد النتيجة النهائية المجمعة. استخدم `llm.stream_events(...)` عندما تريد التكرار على الأجزاء فور وصولها مع الحفاظ على حمولة الحدث المهيكلة:
```python
from crewai import LLM
llm = LLM(model="gpt-4o-mini")
stream = llm.stream_events(
messages=[
{
"role": "user",
"content": "Explain CrewAI streaming in two short sentences.",
}
]
)
with stream:
for chunk in stream:
print(chunk.content, end="", flush=True)
result = stream.result
```
يفعل `llm.stream_events(...)` البث مؤقتًا للاستدعاء المغلف ثم يستعيد إعداد `stream` السابق في LLM بعد ذلك. تستمر تكاملات المزودين في إصدار أحداث بث LLM الأساسية؛ يوفر هذا المساعد واجهة مكرر مشتركة فوق تلك الأحداث لكل مزودي LLM.
## دورات المحادثة
يمكن للتدفقات المحادثية بث دورة مستخدم واحدة باستخدام `stream_turn()`:
```python
from crewai import Flow
from crewai.experimental.conversational import ConversationConfig, ConversationState
@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
class ChatFlow(Flow[ConversationState]):
conversational = True
flow = ChatFlow()
stream = flow.stream_turn("What can you help me with?", session_id="session-1")
with stream:
for frame in stream.events:
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
print(frame.content, end="", flush=True)
reply = stream.result
```
أثناء `stream_turn()`، يفعّل مسار الإجابة المحادثية المدمج بث توكنات LLM لذلك الدور ثم يستعيد إعداد `stream` السابق في LLM بعد ذلك. يجب على معالجات المسارات المخصصة التي تنشئ Agents أو مثيلات LLM خاصة بها تهيئة تلك النماذج للبث إذا احتاجت إلى إخراج على مستوى التوكن.
## التنظيف
استخدم الجلسة كمدير سياق عندما يكون ذلك ممكنًا. إذا انقطع اتصال العميل قبل استهلاك stream بالكامل، فأغلق الجلسة صراحة:
```python
stream = flow.stream_events()
try:
for frame in stream.events:
print(frame.type)
finally:
if not stream.is_exhausted:
stream.close()
```
للتدفقات غير المتزامنة، استخدم `await stream.aclose()`.
## بث الأجزاء القديم
ما زال بث Crew مع `stream=True` يعيد واجهة `CrewStreamingOutput` المعتمدة على الأجزاء والموضحة في [بث تنفيذ Crew](/ar/learn/streaming-crew-execution). وما زالت استدعاءات `llm.call(...)` المباشرة تعيد نتيجة LLM النهائية. عقد الإطارات مخصص لأوقات التشغيل التي تحتاج إلى غلاف حدث ثابت عبر Flows، واستدعاءات LLM المباشرة، ودورات المحادثة، والأدوات، والرسائل.

View File

@@ -25,6 +25,7 @@ Use **`flow.handle_turn(message, session_id=...)`** for every user message from
| API | Use for |
|-----|---------|
| `handle_turn(message, session_id=...)` | Ergonomic one-turn wrapper for conversational `Flow` |
| `stream_turn(message, session_id=...)` | Stream one conversational turn as ordered runtime frames |
| `chat()` | Local terminal REPL for conversational `Flow` |
| `kickoff(inputs={...})` | Advanced flow execution without conversational turn handling |
| `ask()` | Blocking prompt **inside** one step (wizard, clarification) |
@@ -85,6 +86,23 @@ finally:
flow.finalize_session_traces() # one trace link for the whole chat
```
## Streaming a turn
Use `stream_turn()` when a UI or runtime needs structured events for one chat turn. It returns a stream session with ordered frames for Flow routing, LLM chunks, tool activity, and conversation messages.
```python
stream = flow.stream_turn("Where is my order?", session_id=session_id)
with stream:
for frame in stream.events:
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
print(frame.data.get("chunk", ""), end="", flush=True)
result = stream.result
```
For the full frame contract, channel list, and async API, see [Streaming Runtime Contract](/edge/en/learn/streaming-runtime-contract).
## Turn lifecycle
Each `handle_turn` runs this pipeline:

View File

@@ -11,7 +11,7 @@ CrewAI Flows support streaming output, allowing you to receive real-time updates
## How Flow Streaming Works
When streaming is enabled on a Flow, CrewAI captures and streams output from any crews or LLM calls within the flow. The stream delivers structured chunks containing the content, task context, and agent information as execution progresses.
When streaming is enabled on a Flow, CrewAI captures and streams output from any crews, LLM calls, tools, and lifecycle events within the flow. The stream delivers ordered `StreamFrame` items with printable content plus structured event data as execution progresses.
## Enabling Streaming
@@ -52,7 +52,7 @@ class ResearchFlow(Flow):
## Synchronous Streaming
When you call `kickoff()` on a flow with streaming enabled, it returns a `FlowStreamingOutput` object that you can iterate over:
When you call `kickoff()` on a flow with streaming enabled, it returns a stream session that yields ordered `StreamFrame` items:
```python Code
flow = ResearchFlow()
@@ -60,44 +60,43 @@ flow = ResearchFlow()
# Start streaming execution
streaming = flow.kickoff()
# Iterate over chunks as they arrive
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Iterate over stream items as they arrive
for item in streaming:
print(item.content, end="", flush=True)
# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result}")
```
### Stream Chunk Information
### Stream Item Information
Each chunk provides context about where it originated in the flow:
Each item provides both printable content and structured event data:
```python Code
streaming = flow.kickoff()
for chunk in streaming:
print(f"Agent: {chunk.agent_role}")
print(f"Task: {chunk.task_name}")
print(f"Content: {chunk.content}")
print(f"Type: {chunk.chunk_type}") # TEXT or TOOL_CALL
for item in streaming:
print(f"Channel: {item.channel}")
print(f"Type: {item.type}")
print(f"Content: {item.content}")
print(f"Event payload: {item.event}")
```
### Accessing Streaming Properties
The `FlowStreamingOutput` object provides useful properties and methods:
The stream session provides useful properties and methods:
```python Code
streaming = flow.kickoff()
# Iterate and collect chunks
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Iterate and collect items
for item in streaming:
print(item.content, end="", flush=True)
# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"Total chunks: {len(streaming.chunks)}")
print(f"Total frames: {len(streaming.frames)}")
print(f"Final result: {streaming.result}")
```
@@ -114,9 +113,9 @@ async def stream_flow():
# Start async streaming
streaming = await flow.kickoff_async()
# Async iteration over chunks
async for chunk in streaming:
print(chunk.content, end="", flush=True)
# Async iteration over stream items
async for item in streaming:
print(item.content, end="", flush=True)
# Access final result
result = streaming.result
@@ -181,13 +180,14 @@ flow = MultiStepFlow()
streaming = flow.kickoff()
current_step = ""
for chunk in streaming:
for item in streaming:
# Track which flow step is executing
if chunk.task_name != current_step:
current_step = chunk.task_name
print(f"\n\n=== {chunk.task_name} ===\n")
step_name = item.event.get("method_name") or item.event.get("task_name")
if step_name and step_name != current_step:
current_step = step_name
print(f"\n\n=== {step_name} ===\n")
print(chunk.content, end="", flush=True)
print(item.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal analysis: {result}")
@@ -201,7 +201,6 @@ Here's a complete example showing how to build a progress dashboard with streami
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType
class ResearchPipeline(Flow):
stream = True
@@ -254,33 +253,35 @@ async def run_with_dashboard():
current_agent = ""
current_task = ""
chunk_count = 0
frame_count = 0
async for chunk in streaming:
chunk_count += 1
async for item in streaming:
frame_count += 1
# Display phase transitions
if chunk.task_name != current_task:
current_task = chunk.task_name
current_agent = chunk.agent_role
task_name = item.event.get("task_name", "")
agent_role = item.event.get("agent_role", "")
if task_name and task_name != current_task:
current_task = task_name
current_agent = agent_role
print(f"\n\n📋 Phase: {current_task}")
print(f"👤 Agent: {current_agent}")
print("-" * 60)
# Display text output
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
if item.content:
print(item.content, end="", flush=True)
# Display tool usage
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\n🔧 Tool: {chunk.tool_call.tool_name}")
elif item.channel == "tools":
print(f"\n🔧 Tool event: {item.type}")
# Show completion summary
result = streaming.result
print(f"\n\n{'='*60}")
print("PIPELINE COMPLETE")
print(f"{'='*60}")
print(f"Total chunks: {chunk_count}")
print(f"Total frames: {frame_count}")
print(f"Final output length: {len(str(result))} characters")
asyncio.run(run_with_dashboard())
@@ -353,8 +354,8 @@ class StatefulStreamingFlow(Flow[AnalysisState]):
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})
for chunk in streaming:
print(chunk.content, end="", flush=True)
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal state:")
@@ -374,29 +375,29 @@ Flow streaming is particularly valuable for:
- **Progress Tracking**: Show users which stage of the workflow is currently executing
- **Live Dashboards**: Create monitoring interfaces for production flows
## Stream Chunk Types
## Stream Frame Channels
Like crew streaming, flow chunks can be of different types:
Flow streaming yields `StreamFrame` items across several channels:
### TEXT Chunks
### LLM Frames
Standard text content from LLM responses:
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
for item in streaming:
if item.channel == "llm" and item.content:
print(item.content, end="", flush=True)
```
### TOOL_CALL Chunks
### Tool Frames
Information about tool calls within the flow:
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\nTool: {chunk.tool_call.tool_name}")
print(f"Args: {chunk.tool_call.arguments}")
for item in streaming:
if item.channel == "tools":
print(f"\nTool event: {item.type}")
print(f"Payload: {item.event}")
```
## Error Handling
@@ -408,8 +409,8 @@ flow = ResearchFlow()
streaming = flow.kickoff()
try:
for chunk in streaming:
print(chunk.content, end="", flush=True)
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\nSuccess! Result: {result}")
@@ -422,7 +423,7 @@ except Exception as e:
## Cancellation and Resource Cleanup
`FlowStreamingOutput` supports graceful cancellation so that in-flight work stops promptly when the consumer disconnects.
The stream session supports graceful cancellation so that in-flight work stops promptly when the consumer disconnects.
### Async Context Manager
@@ -430,8 +431,8 @@ except Exception as e:
streaming = await flow.kickoff_async()
async with streaming:
async for chunk in streaming:
print(chunk.content, end="", flush=True)
async for item in streaming:
print(item.content, end="", flush=True)
```
### Explicit Cancellation
@@ -439,8 +440,8 @@ async with streaming:
```python Code
streaming = await flow.kickoff_async()
try:
async for chunk in streaming:
print(chunk.content, end="", flush=True)
async for item in streaming:
print(item.content, end="", flush=True)
finally:
await streaming.aclose() # async
# streaming.close() # sync equivalent
@@ -451,10 +452,10 @@ After cancellation, `streaming.is_cancelled` and `streaming.is_completed` are bo
## Important Notes
- Streaming automatically enables LLM streaming for any crews used within the flow
- You must iterate through all chunks before accessing the `.result` property
- You must iterate through all stream items before accessing the `.result` property
- Streaming works with both structured and unstructured flow state
- Flow streaming captures output from all crews and LLM calls in the flow
- Each chunk includes context about which agent and task generated it
- Each frame includes structured event context such as channel, type, namespace, and payload
- Streaming adds minimal overhead to flow execution
## Combining with Flow Visualization
@@ -468,11 +469,11 @@ flow.plot("research_flow") # Creates HTML visualization
# Run with streaming
streaming = flow.kickoff()
for chunk in streaming:
print(chunk.content, end="", flush=True)
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")
```
By leveraging flow streaming, you can build sophisticated, responsive applications that provide users with real-time visibility into complex multi-stage workflows, making your AI automations more transparent and engaging.
By leveraging flow streaming, you can build sophisticated, responsive applications that provide users with real-time visibility into complex multi-stage workflows, making your AI automations more transparent and engaging.

View File

@@ -0,0 +1,194 @@
---
title: Streaming Runtime Contract
description: Stream ordered runtime frames from Flows, direct LLM calls, and conversational turns.
icon: tower-broadcast
mode: "wide"
---
## Overview
CrewAI exposes a frame-based streaming contract for runtimes that need more than plain text chunks. The contract emits ordered `StreamFrame` objects for Flow lifecycle events, direct LLM tokens, tool activity, conversation messages, and custom events.
Use this API when you are building a UI, service bridge, terminal app, or deployment runtime that needs a stable stream of structured events while a Flow, chat turn, or direct LLM call is running.
## StreamFrame
Every frame has the same envelope:
```python
from crewai.types.streaming import StreamFrame
frame.id # unique frame id
frame.seq # execution-local order, when available
frame.type # source event type, such as "flow_started"
frame.channel # "llm", "flow", "tools", "messages", "lifecycle", or "custom"
frame.namespace # source/runtime namespace
frame.timestamp # event timestamp
frame.parent_id # parent event id, when available
frame.previous_id # previous event id, when available
frame.data # event payload
frame.event # alias for frame.data
frame.content # printable text for token-like frames, otherwise ""
```
The `channel` field is the fastest way to route frames in consumers:
| Channel | Contains |
|---------|----------|
| `llm` | Token and thinking chunks from LLM streaming events |
| `flow` | Flow lifecycle, method execution, routing, and pause/resume events |
| `tools` | Tool usage events |
| `messages` | Conversation transcript events |
| `lifecycle` | Runtime lifecycle events that are not specific to another channel |
| `custom` | Events that do not map to a built-in channel |
`frame.type` preserves the source event type, so consumers can handle specific events inside a channel.
## Stream a Flow
Set `stream=True` on a Flow to make `kickoff()` return a stream session:
```python
from crewai.flow import Flow, start
class ReportFlow(Flow):
@start()
def generate(self):
return "done"
flow = ReportFlow(stream=True)
stream = flow.kickoff()
with stream:
for chunk in stream:
print(chunk.content, end="", flush=True)
if chunk.type == "tool_usage_started":
print(chunk.event["tool_name"])
result = stream.result
```
You must consume the stream before reading `stream.result`. Accessing the result early raises a `RuntimeError` so consumers do not accidentally treat a partial run as complete.
You can also call `flow.stream_events(...)` directly when you want streaming for a single invocation without setting `stream=True` on the Flow instance.
## Filter by Channel
`StreamSession` exposes channel projections that preserve global frame order within the selected channel:
```python
stream = flow.stream_events()
with stream:
for frame in stream.llm:
print(frame.content, end="", flush=True)
result = stream.result
```
Available projections are:
| Projection | Frames |
|------------|--------|
| `stream.events` | All frames |
| `stream.llm` | LLM frames |
| `stream.messages` | Conversation message frames |
| `stream.flow` | Flow frames |
| `stream.tools` | Tool frames |
| `stream.interleave([...])` | A selected set of channels |
Use `stream.interleave(["flow", "llm", "messages"])` when a consumer wants only some channels but still needs their relative order.
## Async Streaming
Use `astream()` for async consumers:
```python
flow = ReportFlow()
stream = flow.astream()
async with stream:
async for chunk in stream.events:
print(chunk.channel, chunk.type, chunk.content)
result = stream.result
```
The async session has the same projections as the sync session.
## Stream a Direct LLM Call
`llm.call(...)` still returns the final assembled result. Use `llm.stream_events(...)` when you want to iterate over chunks as they arrive while keeping the structured event payload:
```python
from crewai import LLM
llm = LLM(model="gpt-4o-mini")
stream = llm.stream_events(
messages=[
{
"role": "user",
"content": "Explain CrewAI streaming in two short sentences.",
}
]
)
with stream:
for chunk in stream:
print(chunk.content, end="", flush=True)
result = stream.result
```
`llm.stream_events(...)` temporarily enables streaming for the wrapped call and restores the LLM's previous `stream` setting afterward. Provider integrations continue to emit the underlying LLM stream events; this helper provides a common iterator API over those events for every LLM provider.
## Conversational Turns
Conversational Flows can stream one user turn with `stream_turn()`:
```python
from crewai import Flow
from crewai.experimental.conversational import ConversationConfig, ConversationState
@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
class ChatFlow(Flow[ConversationState]):
conversational = True
flow = ChatFlow()
stream = flow.stream_turn("What can you help me with?", session_id="session-1")
with stream:
for frame in stream.events:
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
print(frame.content, end="", flush=True)
reply = stream.result
```
During `stream_turn()`, the built-in conversational answer path enables LLM token streaming for that turn and restores the LLM's previous `stream` setting afterward. Custom route handlers that create their own agents or LLM instances should configure those LLMs for streaming if they need token-level output.
## Cleanup
Use the session as a context manager when possible. If a client disconnects before the stream is exhausted, close the session explicitly:
```python
stream = flow.stream_events()
try:
for frame in stream.events:
print(frame.type)
finally:
if not stream.is_exhausted:
stream.close()
```
For async streams, use `await stream.aclose()`.
## Legacy Chunk Streaming
Crew streaming with `stream=True` still returns the chunk-oriented `CrewStreamingOutput` API described in [Streaming Crew Execution](/en/learn/streaming-crew-execution). Direct `llm.call(...)` still returns the final LLM result. The frame contract is intended for runtimes that need a stable event envelope across Flows, direct LLM calls, conversational turns, tools, and messages.

View File

@@ -0,0 +1,194 @@
---
title: 스트리밍 런타임 계약
description: Flow, 직접 LLM 호출, 대화 턴에서 정렬된 런타임 프레임을 스트리밍합니다.
icon: tower-broadcast
mode: "wide"
---
## 개요
CrewAI는 단순한 텍스트 청크보다 더 많은 정보가 필요한 런타임을 위해 프레임 기반 스트리밍 계약을 제공합니다. 이 계약은 Flow 생명주기 이벤트, 직접 LLM 토큰, 도구 활동, 대화 메시지, 사용자 지정 이벤트에 대해 정렬된 `StreamFrame` 객체를 방출합니다.
UI, 서비스 브리지, 터미널 앱, 배포 런타임을 만들 때 Flow, 채팅 턴, 직접 LLM 호출이 실행되는 동안 안정적인 구조화 이벤트 스트림이 필요하다면 이 API를 사용하세요.
## StreamFrame
모든 프레임은 같은 envelope를 가집니다:
```python
from crewai.types.streaming import StreamFrame
frame.id # 고유 프레임 id
frame.seq # 사용 가능한 경우 실행 로컬 순서
frame.type # "flow_started" 같은 원본 이벤트 타입
frame.channel # "llm", "flow", "tools", "messages", "lifecycle", "custom"
frame.namespace # 소스/런타임 namespace
frame.timestamp # 이벤트 timestamp
frame.parent_id # 사용 가능한 경우 부모 이벤트 id
frame.previous_id # 사용 가능한 경우 이전 이벤트 id
frame.data # 이벤트 payload
frame.event # frame.data의 alias
frame.content # 토큰류 프레임의 출력 가능한 텍스트, 그 외에는 ""
```
`channel` 필드는 소비자에서 프레임을 라우팅하는 가장 빠른 방법입니다:
| 채널 | 포함 내용 |
|------|-----------|
| `llm` | LLM 스트리밍 이벤트의 토큰 및 thinking 청크 |
| `flow` | Flow 생명주기, 메서드 실행, 라우팅, pause/resume 이벤트 |
| `tools` | 도구 사용 이벤트 |
| `messages` | 대화 transcript 이벤트 |
| `lifecycle` | 다른 채널에 속하지 않는 런타임 생명주기 이벤트 |
| `custom` | 내장 채널에 매핑되지 않는 이벤트 |
`frame.type`은 원본 이벤트 타입을 보존하므로, 소비자는 채널 안에서 특정 이벤트를 처리할 수 있습니다.
## Flow 스트리밍
Flow에 `stream=True`를 설정하면 `kickoff()`가 stream session을 반환합니다:
```python
from crewai.flow import Flow, start
class ReportFlow(Flow):
@start()
def generate(self):
return "done"
flow = ReportFlow(stream=True)
stream = flow.kickoff()
with stream:
for chunk in stream:
print(chunk.content, end="", flush=True)
if chunk.type == "tool_usage_started":
print(chunk.event["tool_name"])
result = stream.result
```
`stream.result`를 읽기 전에 stream을 소비해야 합니다. 결과를 너무 일찍 접근하면 `RuntimeError`가 발생하여, 소비자가 부분 실행을 완료된 실행으로 잘못 처리하지 않도록 합니다.
Flow 인스턴스에 `stream=True`를 설정하지 않고 단일 호출만 스트리밍하려면 `flow.stream_events(...)`를 직접 호출할 수도 있습니다.
## 채널별 필터링
`StreamSession`은 선택한 채널 안에서 전역 프레임 순서를 보존하는 채널 projection을 제공합니다:
```python
stream = flow.stream_events()
with stream:
for frame in stream.llm:
print(frame.content, end="", flush=True)
result = stream.result
```
사용 가능한 projection은 다음과 같습니다:
| Projection | 프레임 |
|------------|--------|
| `stream.events` | 모든 프레임 |
| `stream.llm` | LLM 프레임 |
| `stream.messages` | 대화 메시지 프레임 |
| `stream.flow` | Flow 프레임 |
| `stream.tools` | 도구 프레임 |
| `stream.interleave([...])` | 선택한 채널 집합 |
소비자가 일부 채널만 원하지만 상대 순서도 필요하다면 `stream.interleave(["flow", "llm", "messages"])`를 사용하세요.
## 비동기 스트리밍
비동기 소비자는 `astream()`을 사용하세요:
```python
flow = ReportFlow()
stream = flow.astream()
async with stream:
async for chunk in stream.events:
print(chunk.channel, chunk.type, chunk.content)
result = stream.result
```
비동기 세션은 동기 세션과 같은 projection을 제공합니다.
## 직접 LLM 호출 스트리밍
`llm.call(...)`은 계속 최종 조립 결과를 반환합니다. 구조화된 이벤트 payload를 유지하면서 청크가 도착하는 대로 반복 처리하려면 `llm.stream_events(...)`를 사용하세요:
```python
from crewai import LLM
llm = LLM(model="gpt-4o-mini")
stream = llm.stream_events(
messages=[
{
"role": "user",
"content": "Explain CrewAI streaming in two short sentences.",
}
]
)
with stream:
for chunk in stream:
print(chunk.content, end="", flush=True)
result = stream.result
```
`llm.stream_events(...)`는 감싼 호출 동안 일시적으로 streaming을 활성화하고, 이후 LLM의 이전 `stream` 설정을 복원합니다. provider 통합은 계속 기본 LLM stream 이벤트를 방출하며, 이 helper는 모든 LLM provider에서 그 이벤트 위에 공통 iterator API를 제공합니다.
## 대화 턴
대화형 Flow는 `stream_turn()`으로 사용자 턴 하나를 스트리밍할 수 있습니다:
```python
from crewai import Flow
from crewai.experimental.conversational import ConversationConfig, ConversationState
@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
class ChatFlow(Flow[ConversationState]):
conversational = True
flow = ChatFlow()
stream = flow.stream_turn("What can you help me with?", session_id="session-1")
with stream:
for frame in stream.events:
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
print(frame.content, end="", flush=True)
reply = stream.result
```
`stream_turn()` 중에는 내장 대화 응답 경로가 해당 턴에 대해 LLM 토큰 스트리밍을 활성화하고 이후 LLM의 이전 `stream` 설정을 복원합니다. 자체 agent 또는 LLM 인스턴스를 만드는 사용자 지정 route handler는 토큰 단위 출력이 필요하다면 해당 LLM을 streaming으로 구성해야 합니다.
## 정리
가능하면 세션을 context manager로 사용하세요. stream이 끝나기 전에 클라이언트 연결이 끊기면 세션을 명시적으로 닫으세요:
```python
stream = flow.stream_events()
try:
for frame in stream.events:
print(frame.type)
finally:
if not stream.is_exhausted:
stream.close()
```
비동기 stream에서는 `await stream.aclose()`를 사용하세요.
## 레거시 청크 스트리밍
`stream=True`를 사용하는 Crew 스트리밍은 계속 [스트리밍 Crew 실행](/ko/learn/streaming-crew-execution)에 설명된 청크 중심 `CrewStreamingOutput` API를 반환합니다. 직접 `llm.call(...)` 호출도 계속 최종 LLM 결과를 반환합니다. 프레임 계약은 Flow, 직접 LLM 호출, 대화 턴, 도구, 메시지 전반에서 안정적인 이벤트 envelope가 필요한 런타임을 위한 것입니다.

View File

@@ -0,0 +1,194 @@
---
title: Contrato de Streaming do Runtime
description: Transmita frames ordenados do runtime a partir de Flows, chamadas diretas de LLM e turnos conversacionais.
icon: tower-broadcast
mode: "wide"
---
## Visão geral
O CrewAI expõe um contrato de streaming baseado em frames para runtimes que precisam de mais do que chunks de texto simples. O contrato emite objetos `StreamFrame` ordenados para eventos de ciclo de vida de Flow, tokens de LLM diretos, atividade de ferramentas, mensagens de conversa e eventos personalizados.
Use esta API ao criar uma UI, ponte de serviço, aplicativo de terminal ou runtime de implantação que precise de um fluxo estável de eventos estruturados enquanto um Flow, turno de chat ou chamada direta de LLM está em execução.
## StreamFrame
Todo frame tem o mesmo envelope:
```python
from crewai.types.streaming import StreamFrame
frame.id # id único do frame
frame.seq # ordem local da execução, quando disponível
frame.type # tipo do evento de origem, como "flow_started"
frame.channel # "llm", "flow", "tools", "messages", "lifecycle" ou "custom"
frame.namespace # namespace de origem/runtime
frame.timestamp # timestamp do evento
frame.parent_id # id do evento pai, quando disponível
frame.previous_id # id do evento anterior, quando disponível
frame.data # payload do evento
frame.event # alias para frame.data
frame.content # texto imprimível para frames de token, caso contrário ""
```
O campo `channel` é a forma mais rápida de rotear frames em consumidores:
| Canal | Contém |
|-------|--------|
| `llm` | Tokens e chunks de raciocínio de eventos de streaming de LLM |
| `flow` | Ciclo de vida do Flow, execução de métodos, roteamento e eventos de pausa/retomada |
| `tools` | Eventos de uso de ferramentas |
| `messages` | Eventos do transcript da conversa |
| `lifecycle` | Eventos de ciclo de vida do runtime que não pertencem a outro canal |
| `custom` | Eventos que não mapeiam para um canal integrado |
`frame.type` preserva o tipo do evento de origem, para que consumidores possam tratar eventos específicos dentro de um canal.
## Transmitir um Flow
Defina `stream=True` em um Flow para fazer `kickoff()` retornar uma sessão de stream:
```python
from crewai.flow import Flow, start
class ReportFlow(Flow):
@start()
def generate(self):
return "done"
flow = ReportFlow(stream=True)
stream = flow.kickoff()
with stream:
for chunk in stream:
print(chunk.content, end="", flush=True)
if chunk.type == "tool_usage_started":
print(chunk.event["tool_name"])
result = stream.result
```
Você deve consumir o stream antes de ler `stream.result`. Acessar o resultado cedo demais gera um `RuntimeError`, para que consumidores não tratem uma execução parcial como concluída.
Você também pode chamar `flow.stream_events(...)` diretamente quando quiser streaming para uma única invocação sem definir `stream=True` na instância do Flow.
## Filtrar por canal
`StreamSession` expõe projeções por canal que preservam a ordem global dos frames dentro do canal selecionado:
```python
stream = flow.stream_events()
with stream:
for frame in stream.llm:
print(frame.content, end="", flush=True)
result = stream.result
```
As projeções disponíveis são:
| Projeção | Frames |
|----------|--------|
| `stream.events` | Todos os frames |
| `stream.llm` | Frames de LLM |
| `stream.messages` | Frames de mensagens de conversa |
| `stream.flow` | Frames de Flow |
| `stream.tools` | Frames de ferramentas |
| `stream.interleave([...])` | Um conjunto selecionado de canais |
Use `stream.interleave(["flow", "llm", "messages"])` quando um consumidor quiser apenas alguns canais, mas ainda precisar da ordem relativa entre eles.
## Streaming assíncrono
Use `astream()` para consumidores assíncronos:
```python
flow = ReportFlow()
stream = flow.astream()
async with stream:
async for chunk in stream.events:
print(chunk.channel, chunk.type, chunk.content)
result = stream.result
```
A sessão assíncrona tem as mesmas projeções da sessão síncrona.
## Transmitir uma chamada direta de LLM
`llm.call(...)` ainda retorna o resultado final montado. Use `llm.stream_events(...)` quando quiser iterar pelos chunks conforme eles chegam, mantendo o payload estruturado do evento:
```python
from crewai import LLM
llm = LLM(model="gpt-4o-mini")
stream = llm.stream_events(
messages=[
{
"role": "user",
"content": "Explain CrewAI streaming in two short sentences.",
}
]
)
with stream:
for chunk in stream:
print(chunk.content, end="", flush=True)
result = stream.result
```
`llm.stream_events(...)` ativa temporariamente o streaming para a chamada encapsulada e restaura a configuração anterior de `stream` do LLM depois. As integrações de provedores continuam emitindo os eventos de stream de LLM subjacentes; esse helper fornece uma API de iterador comum sobre esses eventos para todos os provedores de LLM.
## Turnos conversacionais
Flows conversacionais podem transmitir um turno de usuário com `stream_turn()`:
```python
from crewai import Flow
from crewai.experimental.conversational import ConversationConfig, ConversationState
@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
class ChatFlow(Flow[ConversationState]):
conversational = True
flow = ChatFlow()
stream = flow.stream_turn("What can you help me with?", session_id="session-1")
with stream:
for frame in stream.events:
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
print(frame.content, end="", flush=True)
reply = stream.result
```
Durante `stream_turn()`, o caminho de resposta conversacional integrado ativa o streaming de tokens de LLM para esse turno e restaura a configuração anterior de `stream` do LLM depois. Handlers de rota personalizados que criam seus próprios agentes ou instâncias de LLM devem configurar esses LLMs para streaming se precisarem de saída em nível de token.
## Limpeza
Use a sessão como gerenciador de contexto quando possível. Se um cliente se desconectar antes de o stream ser esgotado, feche a sessão explicitamente:
```python
stream = flow.stream_events()
try:
for frame in stream.events:
print(frame.type)
finally:
if not stream.is_exhausted:
stream.close()
```
Para streams assíncronos, use `await stream.aclose()`.
## Streaming de chunks legado
O streaming de Crew com `stream=True` ainda retorna a API orientada a chunks `CrewStreamingOutput` descrita em [Streaming da Execução de Crew](/pt-BR/learn/streaming-crew-execution). Chamadas diretas `llm.call(...)` ainda retornam o resultado final do LLM. O contrato de frames é destinado a runtimes que precisam de um envelope de evento estável em Flows, chamadas diretas de LLM, turnos conversacionais, ferramentas e mensagens.

View File

@@ -164,13 +164,7 @@ def test_navigate_command(mock_run, stagehand_tool):
)
# Assertions
assert result == "Successfully navigated to https://example.com"
mock_run.assert_called_once_with(
stagehand_tool,
instruction="Go to example.com",
url="https://example.com",
command_type="navigate",
)
assert "https://example.com" in result
@patch(

View File

@@ -40,6 +40,7 @@ from crewai.events.event_context import (
set_last_event_id,
)
from crewai.events.handler_graph import build_execution_plan
from crewai.events.stream_context import publish_stream_event
from crewai.events.types.event_bus_types import (
AsyncHandler,
AsyncHandlerSet,
@@ -565,6 +566,7 @@ class CrewAIEventsBus:
set_last_event_id(event.event_id)
publish_stream_event(source, event)
self._record_event(event)
def emit(self, source: Any, event: BaseEvent) -> Future[None] | None:
@@ -775,9 +777,7 @@ class CrewAIEventsBus:
source: The object emitting the event
event: The event instance to emit
"""
self._register_source(source)
event.emission_sequence = get_next_emission_sequence()
self._record_event(event)
self._prepare_event(source, event)
event_type = type(event)

View File

@@ -0,0 +1,30 @@
"""Scoped stream sinks for converting emitted events into public frames."""
from __future__ import annotations
from collections.abc import Callable
import contextvars
from typing import Any
StreamSink = Callable[[Any, Any], None]
_stream_sinks: contextvars.ContextVar[tuple[StreamSink, ...]] = contextvars.ContextVar(
"crewai_stream_sinks", default=()
)
def add_stream_sink(sink: StreamSink) -> contextvars.Token[tuple[StreamSink, ...]]:
"""Register a sink in the current context."""
return _stream_sinks.set((*_stream_sinks.get(), sink))
def reset_stream_sinks(token: contextvars.Token[tuple[StreamSink, ...]]) -> None:
"""Restore the stream sink context."""
_stream_sinks.reset(token)
def publish_stream_event(source: Any, event: Any) -> None:
"""Publish a prepared event to sinks scoped to the current execution."""
for sink in _stream_sinks.get():
sink(source, event)

View File

@@ -18,7 +18,8 @@ Import surface:
from __future__ import annotations
from collections.abc import Callable, Mapping, Sequence
from collections.abc import Callable, Iterator, Mapping, Sequence
from contextlib import contextmanager
from enum import Enum
import json
import logging
@@ -44,6 +45,7 @@ from crewai.experimental.conversational import (
_conversational_only,
message_to_llm_dict,
)
from crewai.flow.async_feedback import HumanFeedbackPending
from crewai.flow.conversation import (
append_message as _append_conversation_message,
get_conversation_messages,
@@ -221,7 +223,9 @@ class _ConversationalMixin:
messages.append({"role": "system", "content": system_prompt})
messages.extend(self.conversation_messages)
response = self._coerce_llm(llm).call(messages=messages)
llm_instance = self._coerce_llm(llm)
with self._conversation_streaming_enabled(llm_instance):
response = llm_instance.call(messages=messages)
content = self._stringify_result(response)
self.append_assistant_message(content)
return content
@@ -254,7 +258,8 @@ class _ConversationalMixin:
},
*self.build_agent_context("answer_from_history"),
]
response = llm_instance.call(messages=messages)
with self._conversation_streaming_enabled(llm_instance):
response = llm_instance.call(messages=messages)
content = self._stringify_result(response)
self.append_assistant_message(content)
return content
@@ -337,6 +342,102 @@ class _ConversationalMixin:
)
return result
def stream_turn(
self,
message: str,
*,
session_id: str | None = None,
intents: Sequence[str] | None = None,
intent_llm: str | BaseLLM | None = None,
**kickoff_kwargs: Any,
) -> Any:
"""Append a user message and stream one conversational turn as frames."""
if not self._is_conversational_enabled():
raise ValueError(
"Flow.stream_turn() is only available on conversational flows"
)
from crewai.types.streaming import StreamSession
from crewai.utilities.streaming import (
create_frame_generator,
create_frame_streaming_state,
)
state = cast(ConversationState, self.state)
sid = session_id or state.id
result_holder: list[Any] = []
frame_state = create_frame_streaming_state(result_holder, use_async=False)
output_holder: list[StreamSession[Any]] = []
def run_turn() -> Any:
crewai_event_bus.emit(
self,
ConversationTurnStartedEvent(
type="conversation_turn_started",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
),
)
self._pending_user_message = message
self._pending_intents = list(intents) if intents else None
self._pending_intent_llm = intent_llm
try:
if "from_checkpoint" not in kickoff_kwargs:
self._reset_turn_execution_state()
assistant_count = self._assistant_message_count()
original_stream = bool(getattr(self, "stream", False))
original_streaming_turn = getattr(
self, "_streaming_conversation_turn", False
)
try:
object.__setattr__(self, "stream", False)
object.__setattr__(self, "_streaming_conversation_turn", True)
result = self.kickoff(inputs={"id": sid}, **kickoff_kwargs)
finally:
object.__setattr__(self, "stream", original_stream)
object.__setattr__(
self, "_streaming_conversation_turn", original_streaming_turn
)
if (
result is not None
and self._assistant_message_count() == assistant_count
and self._is_public_turn_result(result)
):
self.append_assistant_message(self._stringify_result(result))
except HumanFeedbackPending as exc:
return exc
except Exception as exc:
failed_event = ConversationTurnFailedEvent(
type="conversation_turn_failed",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
error=exc,
)
self._emit_terminal_conversation_turn_event(failed_event)
raise
finally:
self._pending_user_message = None
self._pending_intents = None
self._pending_intent_llm = None
self._emit_terminal_conversation_turn_event(
ConversationTurnCompletedEvent(
type="conversation_turn_completed",
flow_name=self.name or self.__class__.__name__,
session_id=sid,
),
)
return result
stream_session: StreamSession[Any] = StreamSession(
sync_iterator=create_frame_generator(frame_state, run_turn, output_holder)
)
output_holder.append(stream_session)
return stream_session
def _emit_terminal_conversation_turn_event(
self,
event: ConversationTurnCompletedEvent | ConversationTurnFailedEvent,
@@ -685,6 +786,8 @@ class _ConversationalMixin:
object.__setattr__(self, "_pending_intents", None)
if not hasattr(self, "_pending_intent_llm"):
object.__setattr__(self, "_pending_intent_llm", None)
if not hasattr(self, "_streaming_conversation_turn"):
object.__setattr__(self, "_streaming_conversation_turn", False)
def _create_default_extension_state(self) -> ConversationState | None:
initial_state_t = getattr(self, "_initial_state_t", None)
@@ -1055,6 +1158,21 @@ class _ConversationalMixin:
return llm
raise ValueError(f"Invalid llm type: {type(llm)}. Expected str or BaseLLM.")
@contextmanager
def _conversation_streaming_enabled(self, llm: Any) -> Iterator[None]:
if not getattr(self, "_streaming_conversation_turn", False) or not hasattr(
llm, "stream"
):
yield
return
original_stream = llm.stream
try:
llm.stream = True
yield
finally:
llm.stream = original_stream
def finalize_session_traces(self) -> None:
"""Emit a final ``FlowFinishedEvent`` and finalize the trace batch.

View File

@@ -9,11 +9,7 @@ Structure (see ``flow_definition``) and executed here.
from __future__ import annotations
import asyncio
from collections.abc import (
Callable,
Iterator,
Sequence,
)
from collections.abc import Callable, Iterator, Sequence
from concurrent.futures import Future, ThreadPoolExecutor
import contextvars
import copy
@@ -140,17 +136,16 @@ if TYPE_CHECKING:
from crewai.llms.base_llm import BaseLLM
from crewai.flow.visualization import build_flow_structure, render_interactive
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.types.streaming import (
AsyncStreamSession,
StreamSession,
)
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.env import get_env_context
from crewai.utilities.streaming import (
TaskInfo,
create_async_chunk_generator,
create_chunk_generator,
create_streaming_state,
register_cleanup,
signal_end,
signal_error,
create_async_frame_generator,
create_frame_generator,
create_frame_streaming_state,
)
@@ -1832,13 +1827,94 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if hasattr(self._state, key):
object.__setattr__(self._state, key, value)
def stream_events(
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> StreamSession[Any]:
"""Run the flow and stream all scoped public ``StreamFrame`` events."""
result_holder: list[Any] = []
state = create_frame_streaming_state(result_holder, use_async=False)
output_holder: list[StreamSession[Any]] = []
def run_flow() -> Any:
original_stream = self.stream
try:
self.stream = False
return self.kickoff(
inputs=inputs,
input_files=input_files,
from_checkpoint=from_checkpoint,
restore_from_state_id=restore_from_state_id,
)
except HumanFeedbackPending as e:
return e
finally:
self.stream = original_stream
stream_session: StreamSession[Any] = StreamSession(
sync_iterator=create_frame_generator(state, run_flow, output_holder)
)
output_holder.append(stream_session)
return stream_session
def stream_frames(
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> StreamSession[Any]:
"""Alias for :meth:`stream_events`."""
return self.stream_events(
inputs=inputs,
input_files=input_files,
from_checkpoint=from_checkpoint,
restore_from_state_id=restore_from_state_id,
)
def astream(
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> AsyncStreamSession[Any]:
"""Run the flow asynchronously and stream scoped public frames."""
result_holder: list[Any] = []
state = create_frame_streaming_state(result_holder, use_async=True)
output_holder: list[AsyncStreamSession[Any]] = []
async def run_flow() -> Any:
original_stream = self.stream
try:
self.stream = False
return await self.kickoff_async(
inputs=inputs,
input_files=input_files,
from_checkpoint=from_checkpoint,
restore_from_state_id=restore_from_state_id,
)
except HumanFeedbackPending as e:
return e
finally:
self.stream = original_stream
stream_session: AsyncStreamSession[Any] = AsyncStreamSession(
async_iterator=create_async_frame_generator(state, run_flow, output_holder)
)
output_holder.append(stream_session)
return stream_session
def kickoff(
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> Any | FlowStreamingOutput:
) -> Any | StreamSession[Any]:
"""Start the flow execution in a synchronous context.
This method wraps kickoff_async so that all state initialization and event
@@ -1859,7 +1935,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
``from_checkpoint``; passing both raises ``ValueError``.
Returns:
The final output from the flow or FlowStreamingOutput if streaming.
The final output from the flow or StreamSession if streaming.
"""
if from_checkpoint is not None and restore_from_state_id is not None:
raise ValueError(
@@ -1871,46 +1947,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if restored is not None:
return restored.kickoff(inputs=inputs, input_files=input_files)
if self.stream:
result_holder: list[Any] = []
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}
state = create_streaming_state(
current_task_info, result_holder, use_async=False
return self.stream_events(
inputs=inputs,
input_files=input_files,
from_checkpoint=from_checkpoint,
restore_from_state_id=restore_from_state_id,
)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
def run_flow() -> None:
try:
self.stream = False
result = self.kickoff(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
signal_error(state, e)
finally:
self.stream = True
signal_end(state)
streaming_output = FlowStreamingOutput(
sync_iterator=create_chunk_generator(state, run_flow, output_holder)
)
register_cleanup(streaming_output, state)
output_holder.append(streaming_output)
return streaming_output
async def _run_flow() -> Any:
return await self.kickoff_async(
@@ -1937,7 +1979,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> Any | FlowStreamingOutput:
) -> Any | AsyncStreamSession[Any]:
"""Start the flow execution asynchronously.
This method performs state restoration (if an 'id' is provided and persistence is available)
@@ -1971,48 +2013,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if restored is not None:
return await restored.kickoff_async(inputs=inputs, input_files=input_files)
if self.stream:
result_holder: list[Any] = []
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}
state = create_streaming_state(
current_task_info, result_holder, use_async=True
return self.astream(
inputs=inputs,
input_files=input_files,
from_checkpoint=from_checkpoint,
restore_from_state_id=restore_from_state_id,
)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
async def run_flow() -> None:
try:
self.stream = False
result = await self.kickoff_async(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
signal_error(state, e, is_async=True)
finally:
self.stream = True
signal_end(state, is_async=True)
streaming_output = FlowStreamingOutput(
async_iterator=create_async_chunk_generator(
state, run_flow, output_holder
)
)
register_cleanup(streaming_output, state)
output_holder.append(streaming_output)
return streaming_output
ctx = baggage.set_baggage("flow_inputs", inputs or {})
ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx)
@@ -2356,7 +2362,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> Any | FlowStreamingOutput:
) -> Any | AsyncStreamSession[Any]:
"""Native async method to start the flow execution. Alias for kickoff_async.
Args:

View File

@@ -749,7 +749,7 @@ class LLM(BaseLLM):
"base_url": self.base_url,
"api_version": self.api_version,
"api_key": self.api_key,
"stream": self.stream,
"stream": self._effective_stream(),
"tools": tools,
"reasoning_effort": self.reasoning_effort,
**self.additional_params,
@@ -1841,7 +1841,7 @@ class LLM(BaseLLM):
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(messages, tools)
if self.stream:
if self._effective_stream():
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
@@ -1983,7 +1983,7 @@ class LLM(BaseLLM):
messages, tools, skip_file_processing=True
)
if self.stream:
if self._effective_stream():
return await self._ahandle_streaming_response(
params=params,
callbacks=callbacks,

View File

@@ -42,8 +42,13 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.types.streaming import StreamSession
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.pydantic_schema_utils import serialize_model_class
from crewai.utilities.streaming import (
create_frame_generator,
create_frame_streaming_state,
)
try:
@@ -77,6 +82,9 @@ _current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
_call_stop_override_var: contextvars.ContextVar[dict[int, list[str]] | None] = (
contextvars.ContextVar("_call_stop_override_var", default=None)
)
_call_stream_override_var: contextvars.ContextVar[dict[int, bool] | None] = (
contextvars.ContextVar("_call_stream_override_var", default=None)
)
@contextmanager
@@ -115,6 +123,19 @@ def call_stop_override(
_call_stop_override_var.reset(token)
@contextmanager
def call_stream_override(llm: BaseLLM, stream: bool) -> Generator[None, None, None]:
"""Override streaming for ``llm`` within the current call scope."""
current = _call_stream_override_var.get()
new_overrides: dict[int, bool] = dict(current) if current else {}
new_overrides[id(llm)] = stream
token = _call_stream_override_var.set(new_overrides)
try:
yield
finally:
_call_stream_override_var.reset(token)
def get_current_call_id() -> str:
"""Get current call_id from context"""
call_id = _current_call_id.get()
@@ -213,6 +234,13 @@ class BaseLLM(BaseModel, ABC):
return override
return self.stop
def _effective_stream(self) -> bool | None:
"""Return the call-scoped streaming mode for this instance."""
overrides = _call_stream_override_var.get()
if overrides is not None and id(self) in overrides:
return overrides[id(self)]
return self.stream
_token_usage: dict[str, int] = PrivateAttr(
default_factory=lambda: {
"total_tokens": 0,
@@ -318,6 +346,39 @@ class BaseLLM(BaseModel, ABC):
RuntimeError: If the LLM request fails for other reasons.
"""
def stream_events(
self,
messages: str | list[LLMMessage],
tools: list[dict[str, BaseTool]] | None = None,
callbacks: list[Any] | None = None,
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
) -> StreamSession[Any]:
"""Run the LLM call and stream scoped public ``StreamFrame`` events."""
result_holder: list[Any] = []
state = create_frame_streaming_state(result_holder, use_async=False)
output_holder: list[StreamSession[Any]] = []
def run_llm_call() -> Any:
with call_stream_override(self, True):
return self.call(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
stream_session: StreamSession[Any] = StreamSession(
sync_iterator=create_frame_generator(state, run_llm_call, output_holder)
)
output_holder.append(stream_session)
return stream_session
async def acall(
self,
messages: str | list[LLMMessage],
@@ -509,7 +570,7 @@ class BaseLLM(BaseModel, ABC):
if max_tokens is None:
max_tokens = self._effective_max_tokens()
if stream is None:
stream = self.stream
stream = self._effective_stream()
if seed is None:
seed = self.seed
if stop_sequences is None:

View File

@@ -323,7 +323,7 @@ class AnthropicCompletion(BaseLLM):
effective_response_model = response_model or self.response_format
if self.stream:
if self._effective_stream():
return self._handle_streaming_completion(
completion_params,
available_functions,
@@ -393,7 +393,7 @@ class AnthropicCompletion(BaseLLM):
effective_response_model = response_model or self.response_format
if self.stream:
if self._effective_stream():
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
@@ -441,7 +441,7 @@ class AnthropicCompletion(BaseLLM):
"model": self.model,
"messages": messages,
"max_tokens": self.max_tokens,
"stream": self.stream,
"stream": self._effective_stream(),
}
if system_message:

View File

@@ -42,7 +42,7 @@ try:
)
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.base_llm import BaseLLM, call_stream_override, llm_call_context
except ImportError:
raise ImportError(
@@ -493,15 +493,18 @@ class AzureCompletion(BaseLLM):
Completion response or tool call result
"""
if self.api == "responses":
return self._responses_delegate.call(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
with call_stream_override(
self._responses_delegate, bool(self._effective_stream())
):
return self._responses_delegate.call(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
with llm_call_context():
try:
@@ -527,7 +530,7 @@ class AzureCompletion(BaseLLM):
formatted_messages, tools, effective_response_model
)
if self.stream:
if self._effective_stream():
return self._handle_streaming_completion(
completion_params,
available_functions,
@@ -572,15 +575,18 @@ class AzureCompletion(BaseLLM):
Completion response or tool call result
"""
if self.api == "responses":
return await self._responses_delegate.acall(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
with call_stream_override(
self._responses_delegate, bool(self._effective_stream())
):
return await self._responses_delegate.acall(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
with llm_call_context():
try:
@@ -601,7 +607,7 @@ class AzureCompletion(BaseLLM):
formatted_messages, tools, effective_response_model
)
if self.stream:
if self._effective_stream():
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
@@ -639,11 +645,11 @@ class AzureCompletion(BaseLLM):
"""
params: AzureCompletionParams = {
"messages": messages,
"stream": self.stream,
"stream": bool(self._effective_stream()),
}
model_extras: dict[str, Any] = {}
if self.stream:
if self._effective_stream():
model_extras["stream_options"] = {"include_usage": True}
if response_model and self.is_openai_model:

View File

@@ -428,7 +428,7 @@ class BedrockCompletion(BaseLLM):
self.additional_model_response_field_paths
)
if self.stream:
if self._effective_stream():
return self._handle_streaming_converse(
formatted_messages,
body,
@@ -556,7 +556,7 @@ class BedrockCompletion(BaseLLM):
self.additional_model_response_field_paths
)
if self.stream:
if self._effective_stream():
return await self._ahandle_streaming_converse(
formatted_messages,
body,

View File

@@ -322,7 +322,7 @@ class GeminiCompletion(BaseLLM):
system_instruction, tools, effective_response_model
)
if self.stream:
if self._effective_stream():
return self._handle_streaming_completion(
formatted_content,
config,
@@ -401,7 +401,7 @@ class GeminiCompletion(BaseLLM):
system_instruction, tools, effective_response_model
)
if self.stream:
if self._effective_stream():
return await self._ahandle_streaming_completion(
formatted_content,
config,

View File

@@ -469,7 +469,7 @@ class OpenAICompletion(BaseLLM):
messages=messages, tools=tools
)
if self.stream:
if self._effective_stream():
return self._handle_streaming_completion(
params=completion_params,
available_functions=available_functions,
@@ -564,7 +564,7 @@ class OpenAICompletion(BaseLLM):
messages=messages, tools=tools
)
if self.stream:
if self._effective_stream():
return await self._ahandle_streaming_completion(
params=completion_params,
available_functions=available_functions,
@@ -595,7 +595,7 @@ class OpenAICompletion(BaseLLM):
messages=messages, tools=tools, response_model=response_model
)
if self.stream:
if self._effective_stream():
return self._handle_streaming_responses(
params=params,
available_functions=available_functions,
@@ -626,7 +626,7 @@ class OpenAICompletion(BaseLLM):
messages=messages, tools=tools, response_model=response_model
)
if self.stream:
if self._effective_stream():
return await self._ahandle_streaming_responses(
params=params,
available_functions=available_functions,
@@ -685,7 +685,7 @@ class OpenAICompletion(BaseLLM):
if instructions:
params["instructions"] = instructions
if self.stream:
if self._effective_stream():
params["stream"] = True
if self.store is not None:
@@ -1540,8 +1540,8 @@ class OpenAICompletion(BaseLLM):
"model": self.model,
"messages": messages,
}
if self.stream:
params["stream"] = self.stream
if self._effective_stream():
params["stream"] = self._effective_stream()
params["stream_options"] = {"include_usage": True}
params.update(self.additional_params)

View File

@@ -2,9 +2,10 @@
from __future__ import annotations
from collections.abc import AsyncIterator, Callable, Iterator
from collections.abc import AsyncIterator, Callable, Iterator, Sequence
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar
from pydantic import BaseModel, Field
from typing_extensions import Self
@@ -15,6 +16,262 @@ if TYPE_CHECKING:
T = TypeVar("T")
_MISSING = object()
StreamChannel = Literal[
"llm",
"flow",
"tools",
"messages",
"lifecycle",
"custom",
]
class StreamFrame(BaseModel):
"""Stable public stream frame emitted by streamable runtimes."""
id: str = Field(description="Unique frame/event identifier")
seq: int | None = Field(default=None, description="Execution-local order")
type: str = Field(description="Source event type")
channel: StreamChannel = Field(description="High-level stream channel")
namespace: list[str] = Field(default_factory=list)
timestamp: datetime
parent_id: str | None = None
previous_id: str | None = None
data: dict[str, Any] = Field(default_factory=dict)
@property
def content(self) -> str:
"""Printable text content for chunk-like consumers."""
chunk = self.data.get("chunk")
if isinstance(chunk, str):
return chunk
return ""
@property
def event(self) -> dict[str, Any]:
"""Structured source event payload."""
return self.data
class StreamSessionBase(Generic[T]):
"""Base stream session with ordered frame iteration and result access."""
def __init__(
self,
sync_iterator: Iterator[StreamFrame] | None = None,
async_iterator: AsyncIterator[StreamFrame] | None = None,
) -> None:
self._result: T | object = _MISSING
self._completed = False
self._frames: list[StreamFrame] = []
self._error: Exception | None = None
self._cancelled = False
self._exhausted = False
self._on_cleanup: Callable[[], None] | None = None
self._sync_iterator = sync_iterator
self._async_iterator = async_iterator
@property
def result(self) -> T:
"""Return the final result after stream exhaustion or completion."""
if not self._completed:
raise RuntimeError(
"Streaming has not completed yet. "
"Iterate over all frames before accessing result."
)
if self._error is not None:
raise self._error
if self._result is _MISSING:
raise RuntimeError("No result available")
return self._result # type: ignore[return-value]
@property
def is_completed(self) -> bool:
"""Check if the stream has completed."""
return self._completed
@property
def is_cancelled(self) -> bool:
"""Check if the stream was cancelled."""
return self._cancelled
@property
def is_exhausted(self) -> bool:
"""Check if the stream iterator was fully consumed."""
return self._exhausted
@property
def frames(self) -> list[StreamFrame]:
"""Return collected frames."""
return self._frames.copy()
def _set_result(self, result: T) -> None:
self._result = result
self._completed = True
class StreamSession(StreamSessionBase[T]):
"""Synchronous stream session for ordered public frames."""
def __enter__(self) -> Self:
return self
def __exit__(self, *exc_info: Any) -> None:
if not self._exhausted:
self.close()
@property
def events(self) -> Iterator[StreamFrame]:
"""Iterate over all ordered frames."""
return self.subscribe()
def __iter__(self) -> Iterator[StreamFrame]:
"""Iterate over all ordered frames."""
return self.events
@property
def llm(self) -> Iterator[StreamFrame]:
"""Iterate over LLM token and thinking frames."""
return self.subscribe(channels=["llm"])
@property
def messages(self) -> Iterator[StreamFrame]:
"""Iterate over conversation message frames."""
return self.subscribe(channels=["messages"])
@property
def flow(self) -> Iterator[StreamFrame]:
"""Iterate over Flow lifecycle and method frames."""
return self.subscribe(channels=["flow"])
@property
def tools(self) -> Iterator[StreamFrame]:
"""Iterate over tool execution frames."""
return self.subscribe(channels=["tools"])
def interleave(self, channels: Sequence[StreamChannel]) -> Iterator[StreamFrame]:
"""Iterate over selected channels while preserving global order."""
return self.subscribe(channels=channels)
def subscribe(
self, channels: Sequence[StreamChannel] | None = None
) -> Iterator[StreamFrame]:
"""Iterate over frames, optionally filtered by channel."""
selected = set(channels) if channels is not None else None
if self._exhausted:
for frame in self._frames:
if selected is None or frame.channel in selected:
yield frame
return
if self._sync_iterator is None:
raise RuntimeError("Sync iterator not available")
try:
for frame in self._sync_iterator:
self._frames.append(frame)
if selected is None or frame.channel in selected:
yield frame
self._exhausted = True
except Exception as e:
self._error = e
raise
finally:
self._completed = True
def close(self) -> None:
"""Cancel streaming and clean up resources."""
if self._cancelled or self._exhausted or self._error is not None:
return
self._cancelled = True
self._completed = True
if self._sync_iterator is not None and hasattr(self._sync_iterator, "close"):
self._sync_iterator.close()
if self._on_cleanup is not None:
self._on_cleanup()
self._on_cleanup = None
class AsyncStreamSession(StreamSessionBase[T]):
"""Asynchronous stream session for ordered public frames."""
async def __aenter__(self) -> Self:
return self
async def __aexit__(self, *exc_info: Any) -> None:
if not self._exhausted:
await self.aclose()
@property
def events(self) -> AsyncIterator[StreamFrame]:
"""Iterate over all ordered frames."""
return self.subscribe()
def __aiter__(self) -> AsyncIterator[StreamFrame]:
"""Iterate over all ordered frames."""
return self.events
@property
def llm(self) -> AsyncIterator[StreamFrame]:
"""Iterate over LLM token and thinking frames."""
return self.subscribe(channels=["llm"])
@property
def messages(self) -> AsyncIterator[StreamFrame]:
"""Iterate over conversation message frames."""
return self.subscribe(channels=["messages"])
@property
def flow(self) -> AsyncIterator[StreamFrame]:
"""Iterate over Flow lifecycle and method frames."""
return self.subscribe(channels=["flow"])
@property
def tools(self) -> AsyncIterator[StreamFrame]:
"""Iterate over tool execution frames."""
return self.subscribe(channels=["tools"])
def interleave(
self, channels: Sequence[StreamChannel]
) -> AsyncIterator[StreamFrame]:
"""Iterate over selected channels while preserving global order."""
return self.subscribe(channels=channels)
async def subscribe(
self, channels: Sequence[StreamChannel] | None = None
) -> AsyncIterator[StreamFrame]:
"""Iterate over frames, optionally filtered by channel."""
selected = set(channels) if channels is not None else None
if self._exhausted:
for frame in self._frames:
if selected is None or frame.channel in selected:
yield frame
return
if self._async_iterator is None:
raise RuntimeError("Async iterator not available")
try:
async for frame in self._async_iterator:
self._frames.append(frame)
if selected is None or frame.channel in selected:
yield frame
self._exhausted = True
except Exception as e:
self._error = e
raise
finally:
self._completed = True
async def aclose(self) -> None:
"""Cancel streaming and clean up resources."""
if self._cancelled or self._exhausted or self._error is not None:
return
self._cancelled = True
self._completed = True
if self._async_iterator is not None and hasattr(self._async_iterator, "aclose"):
await self._async_iterator.aclose()
if self._on_cleanup is not None:
self._on_cleanup()
self._on_cleanup = None
class StreamChunkType(Enum):

View File

@@ -6,19 +6,32 @@ import contextvars
import logging
import queue
import threading
from typing import Any, NamedTuple
from typing import Any, NamedTuple, cast
import uuid
from typing_extensions import TypedDict
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMStreamChunkEvent
from crewai.events.stream_context import add_stream_sink, reset_stream_sinks
from crewai.events.types.flow_events import ConversationMessageAddedEvent, FlowEvent
from crewai.events.types.llm_events import (
LLMEventBase,
LLMStreamChunkEvent,
)
from crewai.events.types.tool_usage_events import (
ToolExecutionErrorEvent,
ToolUsageEvent,
)
from crewai.types.streaming import (
AsyncStreamSession,
CrewStreamingOutput,
FlowStreamingOutput,
StreamChannel,
StreamChunk,
StreamChunkType,
StreamFrame,
StreamSession,
ToolCallChunk,
)
from crewai.utilities.string_utils import sanitize_tool_name
@@ -53,6 +66,16 @@ class StreamingState(NamedTuple):
stream_id: str | None = None
class FrameStreamingState(NamedTuple):
"""Immutable state for public frame streaming execution."""
result_holder: list[Any]
sync_queue: queue.Queue[StreamFrame | None | Exception]
async_queue: asyncio.Queue[StreamFrame | None | Exception] | None
loop: asyncio.AbstractEventLoop | None
sink: Callable[[Any, BaseEvent], None]
def _extract_tool_call_info(
event: LLMStreamChunkEvent,
) -> tuple[StreamChunkType, ToolCallChunk | None]:
@@ -80,6 +103,46 @@ def _extract_tool_call_info(
return StreamChunkType.TEXT, None
def _extract_tool_call_chunk(data: dict[str, Any]) -> ToolCallChunk | None:
tool_call = data.get("tool_call")
if not isinstance(tool_call, dict):
return None
function = tool_call.get("function")
if not isinstance(function, dict):
function = {}
function_name = function.get("name")
return ToolCallChunk(
tool_id=tool_call.get("id"),
tool_name=sanitize_tool_name(str(function_name)) if function_name else None,
arguments=function.get("arguments") or "",
index=tool_call.get("index") or 0,
)
def stream_frame_to_chunk(frame: StreamFrame) -> StreamChunk | None:
"""Project an LLM stream frame into the legacy ``StreamChunk`` shape."""
if frame.channel != "llm" or frame.type not in {
"llm_stream_chunk",
"llm_thinking_chunk",
}:
return None
tool_call = _extract_tool_call_chunk(frame.data)
chunk_type = (
StreamChunkType.TOOL_CALL if tool_call is not None else StreamChunkType.TEXT
)
return StreamChunk(
content=str(frame.data.get("chunk") or ""),
chunk_type=chunk_type,
task_index=0,
task_name=str(frame.data.get("task_name") or ""),
task_id=str(frame.data.get("task_id") or ""),
agent_role=str(frame.data.get("agent_role") or ""),
agent_id=str(frame.data.get("agent_id") or ""),
tool_call=tool_call,
)
def _create_stream_chunk(
event: LLMStreamChunkEvent,
current_task_info: TaskInfo,
@@ -107,6 +170,207 @@ def _create_stream_chunk(
)
_FRAME_DATA_EXCLUDE = {
"timestamp",
"type",
"event_id",
"parent_event_id",
"previous_event_id",
"emission_sequence",
}
def _stream_channel(event: BaseEvent) -> StreamChannel:
if isinstance(event, LLMEventBase):
return "llm"
if isinstance(event, ConversationMessageAddedEvent):
return "messages"
if isinstance(event, FlowEvent):
return "flow"
if isinstance(event, ToolUsageEvent | ToolExecutionErrorEvent):
return "tools"
if "error" in event.type or "failed" in event.type:
return "lifecycle"
return "custom"
def _stream_namespace(event: BaseEvent, channel: StreamChannel) -> list[str]:
namespace: list[str] = [channel]
for attr in (
"flow_name",
"method_name",
"session_id",
"call_id",
"tool_name",
"agent_role",
"task_name",
):
value = getattr(event, attr, None)
if value is not None:
namespace.append(str(value))
return namespace
def stream_frame_from_event(event: BaseEvent) -> StreamFrame:
"""Convert an internal CrewAI event into the public stream frame contract."""
channel = _stream_channel(event)
data = event.to_json(exclude=_FRAME_DATA_EXCLUDE)
if not isinstance(data, dict):
data = {"value": data}
return StreamFrame(
id=event.event_id,
seq=event.emission_sequence,
type=event.type,
channel=channel,
namespace=_stream_namespace(event, channel),
timestamp=event.timestamp,
parent_id=event.parent_event_id,
previous_id=event.previous_event_id,
data=cast(dict[str, Any], data),
)
def _create_frame_sink(
sync_queue: queue.Queue[StreamFrame | None | Exception],
async_queue: asyncio.Queue[StreamFrame | None | Exception] | None = None,
loop: asyncio.AbstractEventLoop | None = None,
) -> Callable[[Any, BaseEvent], None]:
def frame_sink(_: Any, event: BaseEvent) -> None:
frame = stream_frame_from_event(event)
if async_queue is not None and loop is not None:
loop.call_soon_threadsafe(async_queue.put_nowait, frame)
else:
sync_queue.put(frame)
return frame_sink
def create_frame_streaming_state(
result_holder: list[Any],
use_async: bool = False,
) -> FrameStreamingState:
"""Create state for a scoped public frame stream."""
sync_queue: queue.Queue[StreamFrame | None | Exception] = queue.Queue()
async_queue: asyncio.Queue[StreamFrame | None | Exception] | None = None
loop: asyncio.AbstractEventLoop | None = None
if use_async:
async_queue = asyncio.Queue()
loop = asyncio.get_running_loop()
sink = _create_frame_sink(sync_queue, async_queue, loop)
return FrameStreamingState(
result_holder=result_holder,
sync_queue=sync_queue,
async_queue=async_queue,
loop=loop,
sink=sink,
)
def _signal_frame_end(state: FrameStreamingState, is_async: bool = False) -> None:
if is_async and state.async_queue is not None and state.loop is not None:
state.loop.call_soon_threadsafe(state.async_queue.put_nowait, None)
else:
state.sync_queue.put(None)
def _signal_frame_error(
state: FrameStreamingState, error: Exception, is_async: bool = False
) -> None:
if is_async and state.async_queue is not None and state.loop is not None:
state.loop.call_soon_threadsafe(state.async_queue.put_nowait, error)
else:
state.sync_queue.put(error)
def _finalize_frame_streaming(
state: FrameStreamingState,
stream_session: StreamSession[Any] | AsyncStreamSession[Any],
) -> None:
stream_session._on_cleanup = None
if state.result_holder:
stream_session._set_result(state.result_holder[0])
def create_frame_generator(
state: FrameStreamingState,
run_func: Callable[[], Any],
output_holder: list[StreamSession[Any]],
) -> Iterator[StreamFrame]:
"""Create a scoped synchronous public frame generator."""
def run_with_sink() -> None:
token = add_stream_sink(state.sink)
try:
result = run_func()
state.result_holder.append(result)
except Exception as e:
_signal_frame_error(state, e)
finally:
reset_stream_sinks(token)
_signal_frame_end(state)
ctx = contextvars.copy_context()
thread = threading.Thread(target=ctx.run, args=(run_with_sink,), daemon=True)
thread.start()
try:
while True:
item = state.sync_queue.get()
if item is None:
break
if isinstance(item, Exception):
raise item
yield item
finally:
thread.join()
if output_holder:
_finalize_frame_streaming(state, output_holder[0])
async def create_async_frame_generator(
state: FrameStreamingState,
run_coro: Callable[[], Any],
output_holder: list[AsyncStreamSession[Any]],
) -> AsyncIterator[StreamFrame]:
"""Create a scoped asynchronous public frame generator."""
if state.async_queue is None:
raise RuntimeError(
"Async queue not initialized. Use create_frame_streaming_state(use_async=True)."
)
async def run_with_sink() -> None:
token = add_stream_sink(state.sink)
try:
result = await run_coro()
state.result_holder.append(result)
except Exception as e:
_signal_frame_error(state, e, is_async=True)
finally:
reset_stream_sinks(token)
_signal_frame_end(state, is_async=True)
task = asyncio.create_task(run_with_sink())
try:
while True:
item = await state.async_queue.get()
if item is None:
break
if isinstance(item, Exception):
raise item
yield item
finally:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception:
logger.debug("Background frame streaming task failed", exc_info=True)
if output_holder:
_finalize_frame_streaming(state, output_holder[0])
def _create_stream_handler(
current_task_info: TaskInfo,
sync_queue: queue.Queue[StreamChunk | None | Exception],

View File

@@ -21,7 +21,7 @@ from crewai.events.types.flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import LLMCallStartedEvent
from crewai.events.types.llm_events import LLMCallStartedEvent, LLMStreamChunkEvent
from crewai.experimental import (
ConversationConfig,
ConversationMessage,
@@ -29,6 +29,7 @@ from crewai.experimental import (
RouterConfig,
)
from crewai.flow import Flow, ChatState, listen, start
from crewai.flow.async_feedback import HumanFeedbackPending, PendingFeedbackContext
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
@@ -137,6 +138,111 @@ class TestClassifyIntent:
class TestConversationalFlow:
def test_stream_turn_emits_ordered_conversation_frames(self) -> None:
flow = ConversationalFlow()
flow.stream = True
stream_values_seen_by_kickoff: list[bool] = []
def kickoff_side_effect(*_: Any, **__: Any) -> str:
stream_values_seen_by_kickoff.append(flow.stream)
crewai_event_bus.emit(
flow,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="pong",
call_id="call-1",
),
)
return "pong"
with patch.object(flow, "kickoff", side_effect=kickoff_side_effect):
stream = flow.stream_turn("ping", session_id="session-1")
with pytest.raises(RuntimeError, match="Streaming has not completed yet"):
_ = stream.result
frames = list(stream.events)
assert stream.result == "pong"
assert stream_values_seen_by_kickoff == [False]
assert flow.stream is True
assert [frame.seq for frame in frames] == sorted(frame.seq for frame in frames)
assert [frame.type for frame in frames] == [
"conversation_turn_started",
"llm_stream_chunk",
"conversation_message_added",
"conversation_turn_completed",
]
assert [frame.channel for frame in frames] == [
"flow",
"llm",
"messages",
"flow",
]
assert frames[1].data["chunk"] == "pong"
assert flow.state.messages[-1].content == "pong"
def test_stream_turn_enables_streaming_on_conversation_llm(self) -> None:
class FakeLLM:
stream = False
def __init__(self) -> None:
self.stream_values: list[bool] = []
def call(self, *, messages: list[dict[str, Any]]) -> str:
self.stream_values.append(self.stream)
for chunk in ("po", "ng"):
crewai_event_bus.emit(
flow,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk=chunk,
call_id="call-1",
),
)
return "pong"
llm = FakeLLM()
@ConversationConfig(llm=llm)
class StreamingChatFlow(ConversationalFlow):
pass
flow = StreamingChatFlow()
stream = flow.stream_turn("ping", session_id="session-1")
frames = list(stream.events)
assert stream.result == "pong"
assert llm.stream_values == [True]
assert llm.stream is False
assert [
frame.data["chunk"]
for frame in frames
if frame.type == "llm_stream_chunk"
] == ["po", "ng"]
def test_stream_turn_returns_pending_feedback_without_failure_event(self) -> None:
flow = ConversationalFlow()
pending = HumanFeedbackPending(
context=PendingFeedbackContext(
flow_id="session-1",
flow_class="tests.PendingFeedbackFlow",
method_name="review",
method_output="draft",
message="Please review",
)
)
def kickoff_side_effect(*_: Any, **__: Any) -> None:
raise pending
with patch.object(flow, "kickoff", side_effect=kickoff_side_effect):
stream = flow.stream_turn("review this", session_id="session-1")
frames = list(stream.events)
assert stream.result is pending
assert [frame.type for frame in frames] == ["conversation_turn_started"]
def test_deferred_multi_turn_emits_single_flow_finished(self) -> None:
"""A deferred multi-turn session lands as one trace: exactly one
``FlowFinishedEvent`` is emitted at ``finalize_session_traces()``, not

View File

@@ -29,7 +29,7 @@ from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.runtime._actions import FlowScriptExecutionDisabledError
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.tools import BaseTool
from crewai.types.streaming import FlowStreamingOutput
from crewai.types.streaming import StreamSession
class StaticSearchTool(BaseTool):
@@ -2485,7 +2485,7 @@ def test_config_max_method_calls_from_declaration():
def test_config_stream_from_declaration():
flow = Flow.from_declaration(contents=STREAMING_CHAIN_YAML)
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, StreamSession)
for _ in streaming:
pass
assert streaming.result == "confirmed:True"

View File

@@ -0,0 +1,300 @@
"""Tests for the public stream frame contract."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from typing import Any, ClassVar
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import ConversationMessageAddedEvent
from crewai.events.types.llm_events import LLMStreamChunkEvent, LLMThinkingChunkEvent
from crewai.events.types.tool_usage_events import ToolUsageStartedEvent
from crewai.flow.flow import Flow, start
from crewai.llms.base_llm import BaseLLM, call_stop_override
from crewai.types.streaming import StreamFrame
class FrameFlow(Flow):
@start()
def run(self) -> str:
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="hello",
call_id="call-1",
),
)
crewai_event_bus.emit(
self,
LLMThinkingChunkEvent(
type="llm_thinking_chunk",
chunk="thinking",
call_id="call-1",
),
)
crewai_event_bus.emit(
self,
ConversationMessageAddedEvent(
type="conversation_message_added",
flow_name=self._definition.name,
session_id="session-1",
role="assistant",
content="hello",
message_index=0,
),
)
crewai_event_bus.emit(
self,
ToolUsageStartedEvent(
type="tool_usage_started",
tool_name="search",
tool_args={"query": "crew"},
),
)
return "done"
class DirectStreamingLLM(BaseLLM):
call_stream_values: ClassVar[list[bool | None]] = []
raw_stream_values: ClassVar[list[bool | None]] = []
call_instance_ids: ClassVar[list[int]] = []
call_stop_values: ClassVar[list[list[str]]] = []
def call(self, messages: Any, *args: Any, **kwargs: Any) -> str:
self.call_stream_values.append(self._effective_stream())
self.raw_stream_values.append(self.stream)
self.call_instance_ids.append(id(self))
self.call_stop_values.append(list(self.stop_sequences))
self._track_token_usage_internal(
{
"prompt_tokens": 1,
"completion_tokens": 2,
"total_tokens": 3,
}
)
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="hel",
call_id="call-1",
),
)
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="lo",
call_id="call-1",
),
)
return "hello"
def test_stream_frame_contract_and_ordering() -> None:
stream = FrameFlow().stream_events()
with pytest.raises(RuntimeError, match="Streaming has not completed yet"):
_ = stream.result
with stream:
frames = list(stream.events)
assert stream.result == "done"
assert all(isinstance(frame, StreamFrame) for frame in frames)
assert [frame.seq for frame in frames] == sorted(frame.seq for frame in frames)
by_type = {frame.type: frame for frame in frames}
assert by_type["flow_started"].channel == "flow"
assert by_type["method_execution_started"].parent_id == by_type["flow_started"].id
assert by_type["llm_stream_chunk"].channel == "llm"
assert by_type["llm_thinking_chunk"].channel == "llm"
assert by_type["conversation_message_added"].channel == "messages"
assert by_type["tool_usage_started"].channel == "tools"
assert "FrameFlow" in by_type["method_execution_started"].namespace
assert "run" in by_type["method_execution_started"].namespace
def test_stream_subscribe_filters_channels_without_losing_order() -> None:
with FrameFlow().stream_events() as stream:
frames = list(stream.interleave(["messages", "tools"]))
assert [frame.channel for frame in frames] == ["messages", "tools"]
assert [frame.seq for frame in frames] == sorted(frame.seq for frame in frames)
assert stream.result == "done"
def test_stream_projections_replay_cached_frames_after_exhaustion() -> None:
with FrameFlow().stream_events() as stream:
all_frames = list(stream.events)
assert [frame.content for frame in stream.llm if frame.content] == [
"hello",
"thinking",
]
assert [frame.type for frame in stream.tools] == ["tool_usage_started"]
assert list(stream.events) == all_frames
def test_stream_channel_projection_can_be_followed_by_cached_projection() -> None:
with FrameFlow().stream_events() as stream:
llm_frames = list(stream.llm)
assert [frame.content for frame in llm_frames if frame.content] == [
"hello",
"thinking",
]
assert [frame.type for frame in stream.flow] == [
"flow_started",
"method_execution_started",
"method_execution_finished",
"flow_finished",
]
def test_stream_errors_surface_after_failed_frame() -> None:
class ErrorFlow(Flow):
@start()
def run(self) -> str:
raise ValueError("boom")
stream = ErrorFlow().stream_events()
with pytest.raises(ValueError, match="boom"):
list(stream.events)
assert any(frame.type == "method_execution_failed" for frame in stream.frames)
with pytest.raises(ValueError, match="boom"):
_ = stream.result
def test_flow_streaming_returns_iterable_frame_session() -> None:
flow = FrameFlow()
flow.stream = True
stream = flow.kickoff()
with stream:
frames = list(stream)
assert all(isinstance(frame, StreamFrame) for frame in frames)
assert [frame.content for frame in frames if frame.content] == [
"hello",
"thinking",
]
first_content_frame = next(frame for frame in frames if frame.content)
assert first_content_frame.event["chunk"] == "hello"
assert stream.result == "done"
def test_direct_llm_stream_events_scope_and_restore_stream_flag() -> None:
DirectStreamingLLM.call_stream_values = []
DirectStreamingLLM.raw_stream_values = []
DirectStreamingLLM.call_instance_ids = []
DirectStreamingLLM.call_stop_values = []
llm = DirectStreamingLLM(model="gpt-4o-mini", stream=False)
with call_stop_override(llm, ["STOP"]):
with llm.stream_events("hello") as stream:
frames = list(stream)
assert [frame.content for frame in frames] == ["hel", "lo"]
assert frames[0].event["chunk"] == "hel"
assert stream.result == "hello"
assert llm.stream is False
assert DirectStreamingLLM.call_stream_values == [True]
assert DirectStreamingLLM.raw_stream_values == [False]
assert DirectStreamingLLM.call_instance_ids == [id(llm)]
assert DirectStreamingLLM.call_stop_values == [["STOP"]]
usage = llm.get_token_usage_summary()
assert usage.total_tokens == 3
assert usage.prompt_tokens == 1
assert usage.completion_tokens == 2
assert usage.successful_requests == 1
@pytest.mark.asyncio
async def test_astream_scopes_concurrent_executions() -> None:
class ConcurrentFlow(Flow):
@start()
async def run(self) -> str:
label = str(self.state["label"])
await asyncio.sleep(0)
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk=label,
call_id=label,
),
)
return label
async def collect(label: str) -> tuple[str, list[str]]:
async with ConcurrentFlow().astream(inputs={"label": label}) as stream:
frames = [frame async for frame in stream.llm]
return stream.result, [frame.data["chunk"] for frame in frames]
first, second = await asyncio.gather(collect("first"), collect("second"))
assert first == ("first", ["first"])
assert second == ("second", ["second"])
@pytest.mark.asyncio
async def test_async_stream_projections_replay_cached_frames_after_exhaustion() -> None:
async with FrameFlow().astream() as stream:
all_frames = [frame async for frame in stream.events]
llm_frames = [frame async for frame in stream.llm]
tool_frames = [frame async for frame in stream.tools]
replayed_frames = [frame async for frame in stream.events]
assert [frame.content for frame in llm_frames if frame.content] == [
"hello",
"thinking",
]
assert [frame.type for frame in tool_frames] == ["tool_usage_started"]
assert replayed_frames == all_frames
@pytest.mark.asyncio
async def test_async_stream_channel_projection_can_be_followed_by_cached_projection() -> None:
async with FrameFlow().astream() as stream:
llm_frames = [frame async for frame in stream.llm]
flow_frames = [frame async for frame in stream.flow]
assert [frame.content for frame in llm_frames if frame.content] == [
"hello",
"thinking",
]
assert [frame.type for frame in flow_frames] == [
"flow_started",
"method_execution_started",
"method_execution_finished",
"flow_finished",
]
@pytest.mark.asyncio
async def test_astream_cancellation_cleans_up_task() -> None:
class SlowFlow(Flow):
@start()
async def run(self) -> str:
await asyncio.sleep(10)
return "too late"
stream = SlowFlow().astream()
events: AsyncIterator[StreamFrame] = stream.events
first_frame = await anext(events)
assert first_frame.type == "flow_started"
await stream.aclose()
assert stream.is_cancelled is True
assert stream.is_completed is True

View File

@@ -11,11 +11,15 @@ from crewai import Agent, Crew, Task
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMStreamChunkEvent, ToolCall, FunctionCall
from crewai.flow.flow import Flow, start
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.types.streaming import (
AsyncStreamSession,
CrewStreamingOutput,
FlowStreamingOutput,
StreamChunk,
StreamChunkType,
StreamFrame,
StreamSession,
ToolCallChunk,
)
@@ -417,8 +421,8 @@ class TestCrewKickoffStreamingAsync:
class TestFlowKickoffStreaming:
"""Tests for Flow(stream=True).kickoff() method."""
def test_kickoff_streaming_returns_streaming_output(self) -> None:
"""Test that flow kickoff with stream=True returns FlowStreamingOutput."""
def test_kickoff_streaming_returns_stream_session(self) -> None:
"""Test that flow kickoff with stream=True returns StreamSession."""
class SimpleFlow(Flow[dict[str, Any]]):
@start()
@@ -428,7 +432,7 @@ class TestFlowKickoffStreaming:
flow = SimpleFlow()
flow.stream = True
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, StreamSession)
def test_flow_kickoff_streaming_captures_chunks(self) -> None:
"""Test that flow streaming captures LLM chunks from crew execution."""
@@ -469,7 +473,7 @@ class TestFlowKickoffStreaming:
with patch.object(Flow, "kickoff", mock_kickoff_fn):
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, StreamSession)
chunks = list(streaming)
assert len(chunks) >= 2
@@ -500,19 +504,38 @@ class TestFlowKickoffStreaming:
with patch.object(Flow, "kickoff", mock_kickoff_fn):
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, StreamSession)
_ = list(streaming)
result = streaming.result
assert result == "flow result"
def test_streaming_kickoff_passes_checkpoint_config_to_stream_events(self) -> None:
"""stream=True preserves checkpoint config when routing to stream_events."""
class TestFlow(Flow[dict[str, Any]]):
@start()
def generate(self) -> str:
return "flow result"
flow = TestFlow()
flow.stream = True
checkpoint = CheckpointConfig()
with patch.object(flow, "stream_events", wraps=flow.stream_events) as spy:
streaming = flow.kickoff(from_checkpoint=checkpoint)
list(streaming)
assert spy.call_args.kwargs["from_checkpoint"] is checkpoint
assert streaming.result == "flow result"
class TestFlowKickoffStreamingAsync:
"""Tests for Flow(stream=True).kickoff_async() method."""
@pytest.mark.asyncio
async def test_kickoff_streaming_async_returns_streaming_output(self) -> None:
"""Test that flow kickoff_async with stream=True returns FlowStreamingOutput."""
async def test_kickoff_streaming_async_returns_stream_session(self) -> None:
"""Test that flow kickoff_async with stream=True returns AsyncStreamSession."""
class SimpleFlow(Flow[dict[str, Any]]):
@start()
@@ -522,7 +545,7 @@ class TestFlowKickoffStreamingAsync:
flow = SimpleFlow()
flow.stream = True
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, AsyncStreamSession)
@pytest.mark.asyncio
async def test_flow_kickoff_streaming_async_captures_chunks(self) -> None:
@@ -567,8 +590,8 @@ class TestFlowKickoffStreamingAsync:
with patch.object(Flow, "kickoff_async", mock_kickoff_fn):
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
chunks: list[StreamChunk] = []
assert isinstance(streaming, AsyncStreamSession)
chunks: list[StreamFrame] = []
async for chunk in streaming:
chunks.append(chunk)
@@ -601,13 +624,36 @@ class TestFlowKickoffStreamingAsync:
with patch.object(Flow, "kickoff_async", mock_kickoff_fn):
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, AsyncStreamSession)
async for _ in streaming:
pass
result = streaming.result
assert result == "async flow result"
@pytest.mark.asyncio
async def test_streaming_kickoff_async_passes_checkpoint_config_to_astream(
self,
) -> None:
"""stream=True preserves checkpoint config when routing to astream."""
class TestFlow(Flow[dict[str, Any]]):
@start()
async def generate(self) -> str:
return "async flow result"
flow = TestFlow()
flow.stream = True
checkpoint = CheckpointConfig()
with patch.object(flow, "astream", wraps=flow.astream) as spy:
streaming = await flow.kickoff_async(from_checkpoint=checkpoint)
async for _ in streaming:
pass
assert spy.call_args.kwargs["from_checkpoint"] is checkpoint
assert streaming.result == "async flow result"
class TestStreamingEdgeCases:
"""Tests for edge cases in streaming functionality."""

View File

@@ -3,8 +3,10 @@
import pytest
from crewai import Agent, Crew, Task
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMStreamChunkEvent
from crewai.flow.flow import Flow, start
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.types.streaming import AsyncStreamSession, CrewStreamingOutput, StreamSession
@pytest.fixture
@@ -212,7 +214,7 @@ class TestStreamingFlowIntegration:
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, StreamSession)
chunks = []
for chunk in streaming:
@@ -232,6 +234,14 @@ class TestStreamingFlowIntegration:
@start()
def execute(self) -> str:
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Flow result",
call_id="call-1",
),
)
return "Flow result"
flow = SimpleFlow()
@@ -241,8 +251,11 @@ class TestStreamingFlowIntegration:
pass
assert streaming.is_completed is True
streaming.get_full_text()
assert len(streaming.chunks) >= 0
content_frames = [frame for frame in streaming.frames if frame.content]
full_text = "".join(frame.content for frame in content_frames)
assert full_text == "Flow result"
assert len(content_frames) == 1
assert len(streaming.frames) > 0
result = streaming.result
assert result is not None
@@ -281,7 +294,7 @@ class TestStreamingFlowIntegration:
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
assert isinstance(streaming, AsyncStreamSession)
chunks = []
async for chunk in streaming:

View File

@@ -9,6 +9,7 @@ import pytest
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import crewai_event_bus
from crewai.events.stream_context import add_stream_sink, reset_stream_sinks
class AsyncTestEvent(BaseEvent):
@@ -53,6 +54,24 @@ async def test_aemit_with_async_handlers():
assert received_events[0] == event
@pytest.mark.asyncio
async def test_aemit_publishes_to_active_stream_sinks():
published_events = []
def sink(source: object, event: BaseEvent) -> None:
published_events.append((source, event))
event = AsyncTestEvent(type="async_test")
token = add_stream_sink(sink)
try:
await crewai_event_bus.aemit("test_source", event)
finally:
reset_stream_sinks(token)
assert published_events == [("test_source", event)]
assert event.emission_sequence is not None
@pytest.mark.asyncio
async def test_multiple_async_handlers():
received_events_1 = []