mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-06-30 12:48:10 +00:00
Compare commits
11 Commits
fix/codeql
...
lorenze/im
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bb290fa967 | ||
|
|
5e6fdc8374 | ||
|
|
4fda0c0eec | ||
|
|
926057635d | ||
|
|
7de7e32bb2 | ||
|
|
01c7915528 | ||
|
|
71edf39fee | ||
|
|
90b06a4523 | ||
|
|
a48f45c917 | ||
|
|
5ebf686254 | ||
|
|
72d78387bc |
@@ -364,6 +364,7 @@
|
||||
"edge/en/learn/human-feedback-in-flows",
|
||||
"edge/en/learn/kickoff-async",
|
||||
"edge/en/learn/kickoff-for-each",
|
||||
"edge/en/learn/streaming-runtime-contract",
|
||||
"edge/en/learn/llm-connections",
|
||||
"edge/en/learn/litellm-removal-guide",
|
||||
"edge/en/learn/multimodal-agents",
|
||||
@@ -9587,6 +9588,7 @@
|
||||
"edge/pt-BR/learn/human-feedback-in-flows",
|
||||
"edge/pt-BR/learn/kickoff-async",
|
||||
"edge/pt-BR/learn/kickoff-for-each",
|
||||
"edge/pt-BR/learn/streaming-runtime-contract",
|
||||
"edge/pt-BR/learn/llm-connections",
|
||||
"edge/pt-BR/learn/multimodal-agents",
|
||||
"edge/pt-BR/learn/replay-tasks-from-latest-crew-kickoff",
|
||||
@@ -18477,6 +18479,7 @@
|
||||
"edge/ko/learn/human-feedback-in-flows",
|
||||
"edge/ko/learn/kickoff-async",
|
||||
"edge/ko/learn/kickoff-for-each",
|
||||
"edge/ko/learn/streaming-runtime-contract",
|
||||
"edge/ko/learn/llm-connections",
|
||||
"edge/ko/learn/multimodal-agents",
|
||||
"edge/ko/learn/replay-tasks-from-latest-crew-kickoff",
|
||||
@@ -27583,6 +27586,7 @@
|
||||
"edge/ar/learn/human-feedback-in-flows",
|
||||
"edge/ar/learn/kickoff-async",
|
||||
"edge/ar/learn/kickoff-for-each",
|
||||
"edge/ar/learn/streaming-runtime-contract",
|
||||
"edge/ar/learn/llm-connections",
|
||||
"edge/ar/learn/multimodal-agents",
|
||||
"edge/ar/learn/replay-tasks-from-latest-crew-kickoff",
|
||||
|
||||
@@ -11,7 +11,7 @@ mode: "wide"
|
||||
|
||||
## كيف يعمل بث التدفق
|
||||
|
||||
عند تفعيل البث في تدفق، يلتقط CrewAI ويبث المخرجات من أي أطقم أو استدعاءات LLM داخل التدفق. يقدم البث أجزاء منظمة تحتوي على المحتوى وسياق المهمة ومعلومات الوكيل مع تقدم التنفيذ.
|
||||
عند تفعيل البث في تدفق، يلتقط CrewAI ويبث المخرجات من أي أطقم أو استدعاءات LLM أو أدوات أو أحداث دورة حياة داخل التدفق. يقدم البث عناصر `StreamFrame` مرتبة تحتوي على محتوى قابل للطباعة وبيانات حدث مهيكلة مع تقدم التنفيذ.
|
||||
|
||||
## تفعيل البث
|
||||
|
||||
@@ -52,7 +52,7 @@ class ResearchFlow(Flow):
|
||||
|
||||
## البث المتزامن
|
||||
|
||||
عند استدعاء `kickoff()` على تدفق مع تفعيل البث، يُرجع كائن `FlowStreamingOutput` يمكنك التكرار عليه:
|
||||
عند استدعاء `kickoff()` على تدفق مع تفعيل البث، يُرجع جلسة stream تنتج عناصر `StreamFrame` مرتبة:
|
||||
|
||||
```python Code
|
||||
flow = ResearchFlow()
|
||||
@@ -60,44 +60,43 @@ flow = ResearchFlow()
|
||||
# Start streaming execution
|
||||
streaming = flow.kickoff()
|
||||
|
||||
# Iterate over chunks as they arrive
|
||||
for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
# Iterate over stream items as they arrive
|
||||
for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
# Access the final result after streaming completes
|
||||
result = streaming.result
|
||||
print(f"\n\nFinal output: {result}")
|
||||
```
|
||||
|
||||
### معلومات جزء البث
|
||||
### معلومات عنصر البث
|
||||
|
||||
يوفر كل جزء سياقاً حول مصدره في التدفق:
|
||||
يوفر كل عنصر محتوى قابلاً للطباعة وبيانات حدث مهيكلة:
|
||||
|
||||
```python Code
|
||||
streaming = flow.kickoff()
|
||||
|
||||
for chunk in streaming:
|
||||
print(f"Agent: {chunk.agent_role}")
|
||||
print(f"Task: {chunk.task_name}")
|
||||
print(f"Content: {chunk.content}")
|
||||
print(f"Type: {chunk.chunk_type}") # TEXT or TOOL_CALL
|
||||
for item in streaming:
|
||||
print(f"Channel: {item.channel}")
|
||||
print(f"Type: {item.type}")
|
||||
print(f"Content: {item.content}")
|
||||
print(f"Event payload: {item.event}")
|
||||
```
|
||||
|
||||
### الوصول إلى خصائص البث
|
||||
|
||||
يوفر كائن `FlowStreamingOutput` خصائص وطرق مفيدة:
|
||||
توفر جلسة stream خصائص وطرق مفيدة:
|
||||
|
||||
```python Code
|
||||
streaming = flow.kickoff()
|
||||
|
||||
# Iterate and collect chunks
|
||||
for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
# Iterate and collect items
|
||||
for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
# After iteration completes
|
||||
print(f"\nCompleted: {streaming.is_completed}")
|
||||
print(f"Full text: {streaming.get_full_text()}")
|
||||
print(f"Total chunks: {len(streaming.chunks)}")
|
||||
print(f"Total frames: {len(streaming.frames)}")
|
||||
print(f"Final result: {streaming.result}")
|
||||
```
|
||||
|
||||
@@ -114,9 +113,9 @@ async def stream_flow():
|
||||
# Start async streaming
|
||||
streaming = await flow.kickoff_async()
|
||||
|
||||
# Async iteration over chunks
|
||||
async for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
# Async iteration over stream items
|
||||
async for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
# Access final result
|
||||
result = streaming.result
|
||||
@@ -181,13 +180,14 @@ flow = MultiStepFlow()
|
||||
streaming = flow.kickoff()
|
||||
|
||||
current_step = ""
|
||||
for chunk in streaming:
|
||||
for item in streaming:
|
||||
# Track which flow step is executing
|
||||
if chunk.task_name != current_step:
|
||||
current_step = chunk.task_name
|
||||
print(f"\n\n=== {chunk.task_name} ===\n")
|
||||
step_name = item.event.get("method_name") or item.event.get("task_name")
|
||||
if step_name and step_name != current_step:
|
||||
current_step = step_name
|
||||
print(f"\n\n=== {step_name} ===\n")
|
||||
|
||||
print(chunk.content, end="", flush=True)
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
result = streaming.result
|
||||
print(f"\n\nFinal analysis: {result}")
|
||||
@@ -201,7 +201,6 @@ print(f"\n\nFinal analysis: {result}")
|
||||
import asyncio
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.types.streaming import StreamChunkType
|
||||
|
||||
class ResearchPipeline(Flow):
|
||||
stream = True
|
||||
@@ -254,33 +253,35 @@ async def run_with_dashboard():
|
||||
|
||||
current_agent = ""
|
||||
current_task = ""
|
||||
chunk_count = 0
|
||||
frame_count = 0
|
||||
|
||||
async for chunk in streaming:
|
||||
chunk_count += 1
|
||||
async for item in streaming:
|
||||
frame_count += 1
|
||||
|
||||
# Display phase transitions
|
||||
if chunk.task_name != current_task:
|
||||
current_task = chunk.task_name
|
||||
current_agent = chunk.agent_role
|
||||
task_name = item.event.get("task_name", "")
|
||||
agent_role = item.event.get("agent_role", "")
|
||||
if task_name and task_name != current_task:
|
||||
current_task = task_name
|
||||
current_agent = agent_role
|
||||
print(f"\n\n📋 Phase: {current_task}")
|
||||
print(f"👤 Agent: {current_agent}")
|
||||
print("-" * 60)
|
||||
|
||||
# Display text output
|
||||
if chunk.chunk_type == StreamChunkType.TEXT:
|
||||
print(chunk.content, end="", flush=True)
|
||||
if item.content:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
# Display tool usage
|
||||
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
|
||||
print(f"\n🔧 Tool: {chunk.tool_call.tool_name}")
|
||||
elif item.channel == "tools":
|
||||
print(f"\n🔧 Tool event: {item.type}")
|
||||
|
||||
# Show completion summary
|
||||
result = streaming.result
|
||||
print(f"\n\n{'='*60}")
|
||||
print("PIPELINE COMPLETE")
|
||||
print(f"{'='*60}")
|
||||
print(f"Total chunks: {chunk_count}")
|
||||
print(f"Total frames: {frame_count}")
|
||||
print(f"Final output length: {len(str(result))} characters")
|
||||
|
||||
asyncio.run(run_with_dashboard())
|
||||
@@ -353,8 +354,8 @@ class StatefulStreamingFlow(Flow[AnalysisState]):
|
||||
flow = StatefulStreamingFlow()
|
||||
streaming = flow.kickoff(inputs={"topic": "quantum computing"})
|
||||
|
||||
for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
result = streaming.result
|
||||
print(f"\n\nFinal state:")
|
||||
@@ -374,29 +375,29 @@ print(f"Insights length: {len(flow.state.insights)}")
|
||||
- **تتبع التقدم**: إظهار المرحلة الحالية من سير العمل للمستخدمين
|
||||
- **لوحات المعلومات الحية**: إنشاء واجهات مراقبة لتدفقات الإنتاج
|
||||
|
||||
## أنواع أجزاء البث
|
||||
## قنوات إطارات البث
|
||||
|
||||
مثل بث الطاقم، يمكن أن تكون أجزاء التدفق من أنواع مختلفة:
|
||||
ينتج بث التدفق عناصر `StreamFrame` عبر عدة قنوات:
|
||||
|
||||
### أجزاء TEXT
|
||||
### إطارات LLM
|
||||
|
||||
محتوى نصي قياسي من استجابات LLM:
|
||||
|
||||
```python Code
|
||||
for chunk in streaming:
|
||||
if chunk.chunk_type == StreamChunkType.TEXT:
|
||||
print(chunk.content, end="", flush=True)
|
||||
for item in streaming:
|
||||
if item.channel == "llm" and item.content:
|
||||
print(item.content, end="", flush=True)
|
||||
```
|
||||
|
||||
### أجزاء TOOL_CALL
|
||||
### إطارات الأدوات
|
||||
|
||||
معلومات حول استدعاءات الأدوات داخل التدفق:
|
||||
|
||||
```python Code
|
||||
for chunk in streaming:
|
||||
if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
|
||||
print(f"\nTool: {chunk.tool_call.tool_name}")
|
||||
print(f"Args: {chunk.tool_call.arguments}")
|
||||
for item in streaming:
|
||||
if item.channel == "tools":
|
||||
print(f"\nTool event: {item.type}")
|
||||
print(f"Payload: {item.event}")
|
||||
```
|
||||
|
||||
## معالجة الأخطاء
|
||||
@@ -408,8 +409,8 @@ flow = ResearchFlow()
|
||||
streaming = flow.kickoff()
|
||||
|
||||
try:
|
||||
for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
result = streaming.result
|
||||
print(f"\nSuccess! Result: {result}")
|
||||
@@ -422,7 +423,7 @@ except Exception as e:
|
||||
|
||||
## الإلغاء وتنظيف الموارد
|
||||
|
||||
يدعم `FlowStreamingOutput` الإلغاء السلس بحيث يتوقف العمل الجاري فوراً عند انقطاع اتصال المستهلك.
|
||||
تدعم جلسة stream الإلغاء السلس بحيث يتوقف العمل الجاري فوراً عند انقطاع اتصال المستهلك.
|
||||
|
||||
### مدير السياق غير المتزامن
|
||||
|
||||
@@ -430,8 +431,8 @@ except Exception as e:
|
||||
streaming = await flow.kickoff_async()
|
||||
|
||||
async with streaming:
|
||||
async for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
async for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
```
|
||||
|
||||
### الإلغاء الصريح
|
||||
@@ -439,8 +440,8 @@ async with streaming:
|
||||
```python Code
|
||||
streaming = await flow.kickoff_async()
|
||||
try:
|
||||
async for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
async for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
finally:
|
||||
await streaming.aclose() # غير متزامن
|
||||
# streaming.close() # المكافئ المتزامن
|
||||
@@ -451,10 +452,10 @@ finally:
|
||||
## ملاحظات مهمة
|
||||
|
||||
- يفعّل البث تلقائياً بث LLM لأي أطقم مستخدمة داخل التدفق
|
||||
- يجب التكرار عبر جميع الأجزاء قبل الوصول إلى خاصية `.result`
|
||||
- يجب التكرار عبر جميع عناصر stream قبل الوصول إلى خاصية `.result`
|
||||
- يعمل البث مع كل من حالة التدفق المنظمة وغير المنظمة
|
||||
- يلتقط بث التدفق المخرجات من جميع الأطقم واستدعاءات LLM في التدفق
|
||||
- يتضمن كل جزء سياقاً حول الوكيل والمهمة التي ولدته
|
||||
- يتضمن كل إطار سياق حدث مهيكلاً مثل القناة والنوع والنطاق والحمولة
|
||||
- يضيف البث حملاً ضئيلاً لتنفيذ التدفق
|
||||
|
||||
## الدمج مع تصور التدفق
|
||||
@@ -468,8 +469,8 @@ flow.plot("research_flow") # Creates HTML visualization
|
||||
|
||||
# Run with streaming
|
||||
streaming = flow.kickoff()
|
||||
for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
result = streaming.result
|
||||
print(f"\nFlow complete! View structure at: research_flow.html")
|
||||
|
||||
194
docs/edge/ar/learn/streaming-runtime-contract.mdx
Normal file
194
docs/edge/ar/learn/streaming-runtime-contract.mdx
Normal file
@@ -0,0 +1,194 @@
|
||||
---
|
||||
title: عقد بث وقت التشغيل
|
||||
description: بث إطارات وقت تشغيل مرتبة من التدفقات واستدعاءات LLM المباشرة ودورات المحادثة.
|
||||
icon: tower-broadcast
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
## نظرة عامة
|
||||
|
||||
يوفر CrewAI عقد بث قائمًا على الإطارات للأنظمة التي تحتاج إلى أكثر من أجزاء نصية بسيطة. يصدر العقد كائنات `StreamFrame` مرتبة لأحداث دورة حياة Flow، وتوكنات LLM المباشرة، ونشاط الأدوات، ورسائل المحادثة، والأحداث المخصصة.
|
||||
|
||||
استخدم هذه الواجهة عندما تبني واجهة مستخدم، أو جسر خدمة، أو تطبيق طرفية، أو وقت تشغيل نشر يحتاج إلى تدفق ثابت من الأحداث المهيكلة أثناء تشغيل Flow أو دورة محادثة أو استدعاء LLM مباشر.
|
||||
|
||||
## StreamFrame
|
||||
|
||||
لكل إطار نفس الغلاف:
|
||||
|
||||
```python
|
||||
from crewai.types.streaming import StreamFrame
|
||||
|
||||
frame.id # معرف إطار فريد
|
||||
frame.seq # ترتيب محلي للتنفيذ، عند توفره
|
||||
frame.type # نوع الحدث المصدر، مثل "flow_started"
|
||||
frame.channel # "llm", "flow", "tools", "messages", "lifecycle", or "custom"
|
||||
frame.namespace # نطاق المصدر/وقت التشغيل
|
||||
frame.timestamp # طابع وقت الحدث
|
||||
frame.parent_id # معرف الحدث الأب، عند توفره
|
||||
frame.previous_id # معرف الحدث السابق، عند توفره
|
||||
frame.data # حمولة الحدث
|
||||
frame.event # اسم بديل لـ frame.data
|
||||
frame.content # نص قابل للطباعة لإطارات التوكن، وإلا ""
|
||||
```
|
||||
|
||||
حقل `channel` هو أسرع طريقة لتوجيه الإطارات في المستهلكين:
|
||||
|
||||
| القناة | تحتوي على |
|
||||
|--------|-----------|
|
||||
| `llm` | توكنات وأجزاء التفكير من أحداث بث LLM |
|
||||
| `flow` | دورة حياة Flow، وتنفيذ الدوال، والتوجيه، وأحداث الإيقاف/الاستئناف |
|
||||
| `tools` | أحداث استخدام الأدوات |
|
||||
| `messages` | أحداث سجل المحادثة |
|
||||
| `lifecycle` | أحداث دورة حياة وقت التشغيل التي لا تخص قناة أخرى |
|
||||
| `custom` | أحداث لا تُطابق قناة مدمجة |
|
||||
|
||||
يحافظ `frame.type` على نوع الحدث المصدر، حتى يتمكن المستهلكون من التعامل مع أحداث محددة داخل القناة.
|
||||
|
||||
## بث Flow
|
||||
|
||||
عيّن `stream=True` على Flow لجعل `kickoff()` يعيد جلسة stream:
|
||||
|
||||
```python
|
||||
from crewai.flow import Flow, start
|
||||
|
||||
|
||||
class ReportFlow(Flow):
|
||||
@start()
|
||||
def generate(self):
|
||||
return "done"
|
||||
|
||||
|
||||
flow = ReportFlow(stream=True)
|
||||
stream = flow.kickoff()
|
||||
|
||||
with stream:
|
||||
for chunk in stream:
|
||||
print(chunk.content, end="", flush=True)
|
||||
if chunk.type == "tool_usage_started":
|
||||
print(chunk.event["tool_name"])
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
يجب استهلاك stream قبل قراءة `stream.result`. يؤدي الوصول إلى النتيجة مبكرًا إلى رفع `RuntimeError` حتى لا يتعامل المستهلكون بالخطأ مع تشغيل جزئي على أنه مكتمل.
|
||||
|
||||
يمكنك أيضًا استدعاء `flow.stream_events(...)` مباشرة عندما تريد البث لاستدعاء واحد بدون تعيين `stream=True` على مثيل Flow.
|
||||
|
||||
## التصفية حسب القناة
|
||||
|
||||
يوفر `StreamSession` إسقاطات حسب القناة تحافظ على ترتيب الإطارات العالمي داخل القناة المحددة:
|
||||
|
||||
```python
|
||||
stream = flow.stream_events()
|
||||
|
||||
with stream:
|
||||
for frame in stream.llm:
|
||||
print(frame.content, end="", flush=True)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
الإسقاطات المتاحة هي:
|
||||
|
||||
| الإسقاط | الإطارات |
|
||||
|---------|----------|
|
||||
| `stream.events` | كل الإطارات |
|
||||
| `stream.llm` | إطارات LLM |
|
||||
| `stream.messages` | إطارات رسائل المحادثة |
|
||||
| `stream.flow` | إطارات Flow |
|
||||
| `stream.tools` | إطارات الأدوات |
|
||||
| `stream.interleave([...])` | مجموعة مختارة من القنوات |
|
||||
|
||||
استخدم `stream.interleave(["flow", "llm", "messages"])` عندما يريد المستهلك بعض القنوات فقط لكنه ما زال يحتاج إلى ترتيبها النسبي.
|
||||
|
||||
## البث غير المتزامن
|
||||
|
||||
استخدم `astream()` للمستهلكين غير المتزامنين:
|
||||
|
||||
```python
|
||||
flow = ReportFlow()
|
||||
stream = flow.astream()
|
||||
|
||||
async with stream:
|
||||
async for chunk in stream.events:
|
||||
print(chunk.channel, chunk.type, chunk.content)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
تملك الجلسة غير المتزامنة نفس إسقاطات الجلسة المتزامنة.
|
||||
|
||||
## بث استدعاء LLM مباشر
|
||||
|
||||
ما زال `llm.call(...)` يعيد النتيجة النهائية المجمعة. استخدم `llm.stream_events(...)` عندما تريد التكرار على الأجزاء فور وصولها مع الحفاظ على حمولة الحدث المهيكلة:
|
||||
|
||||
```python
|
||||
from crewai import LLM
|
||||
|
||||
|
||||
llm = LLM(model="gpt-4o-mini")
|
||||
stream = llm.stream_events(
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Explain CrewAI streaming in two short sentences.",
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
with stream:
|
||||
for chunk in stream:
|
||||
print(chunk.content, end="", flush=True)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
يفعل `llm.stream_events(...)` البث مؤقتًا للاستدعاء المغلف ثم يستعيد إعداد `stream` السابق في LLM بعد ذلك. تستمر تكاملات المزودين في إصدار أحداث بث LLM الأساسية؛ يوفر هذا المساعد واجهة مكرر مشتركة فوق تلك الأحداث لكل مزودي LLM.
|
||||
|
||||
## دورات المحادثة
|
||||
|
||||
يمكن للتدفقات المحادثية بث دورة مستخدم واحدة باستخدام `stream_turn()`:
|
||||
|
||||
```python
|
||||
from crewai import Flow
|
||||
from crewai.experimental.conversational import ConversationConfig, ConversationState
|
||||
|
||||
|
||||
@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
|
||||
class ChatFlow(Flow[ConversationState]):
|
||||
conversational = True
|
||||
|
||||
|
||||
flow = ChatFlow()
|
||||
stream = flow.stream_turn("What can you help me with?", session_id="session-1")
|
||||
|
||||
with stream:
|
||||
for frame in stream.events:
|
||||
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
|
||||
print(frame.content, end="", flush=True)
|
||||
|
||||
reply = stream.result
|
||||
```
|
||||
|
||||
أثناء `stream_turn()`، يفعّل مسار الإجابة المحادثية المدمج بث توكنات LLM لذلك الدور ثم يستعيد إعداد `stream` السابق في LLM بعد ذلك. يجب على معالجات المسارات المخصصة التي تنشئ Agents أو مثيلات LLM خاصة بها تهيئة تلك النماذج للبث إذا احتاجت إلى إخراج على مستوى التوكن.
|
||||
|
||||
## التنظيف
|
||||
|
||||
استخدم الجلسة كمدير سياق عندما يكون ذلك ممكنًا. إذا انقطع اتصال العميل قبل استهلاك stream بالكامل، فأغلق الجلسة صراحة:
|
||||
|
||||
```python
|
||||
stream = flow.stream_events()
|
||||
|
||||
try:
|
||||
for frame in stream.events:
|
||||
print(frame.type)
|
||||
finally:
|
||||
if not stream.is_exhausted:
|
||||
stream.close()
|
||||
```
|
||||
|
||||
للتدفقات غير المتزامنة، استخدم `await stream.aclose()`.
|
||||
|
||||
## بث الأجزاء القديم
|
||||
|
||||
ما زال بث Crew مع `stream=True` يعيد واجهة `CrewStreamingOutput` المعتمدة على الأجزاء والموضحة في [بث تنفيذ Crew](/ar/learn/streaming-crew-execution). وما زالت استدعاءات `llm.call(...)` المباشرة تعيد نتيجة LLM النهائية. عقد الإطارات مخصص لأوقات التشغيل التي تحتاج إلى غلاف حدث ثابت عبر Flows، واستدعاءات LLM المباشرة، ودورات المحادثة، والأدوات، والرسائل.
|
||||
@@ -25,6 +25,7 @@ Use **`flow.handle_turn(message, session_id=...)`** for every user message from
|
||||
| API | Use for |
|
||||
|-----|---------|
|
||||
| `handle_turn(message, session_id=...)` | Ergonomic one-turn wrapper for conversational `Flow` |
|
||||
| `stream_turn(message, session_id=...)` | Stream one conversational turn as ordered runtime frames |
|
||||
| `chat()` | Local terminal REPL for conversational `Flow` |
|
||||
| `kickoff(inputs={...})` | Advanced flow execution without conversational turn handling |
|
||||
| `ask()` | Blocking prompt **inside** one step (wizard, clarification) |
|
||||
@@ -85,6 +86,23 @@ finally:
|
||||
flow.finalize_session_traces() # one trace link for the whole chat
|
||||
```
|
||||
|
||||
## Streaming a turn
|
||||
|
||||
Use `stream_turn()` when a UI or runtime needs structured events for one chat turn. It returns a stream session with ordered frames for Flow routing, LLM chunks, tool activity, and conversation messages.
|
||||
|
||||
```python
|
||||
stream = flow.stream_turn("Where is my order?", session_id=session_id)
|
||||
|
||||
with stream:
|
||||
for frame in stream.events:
|
||||
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
|
||||
print(frame.data.get("chunk", ""), end="", flush=True)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
For the full frame contract, channel list, and async API, see [Streaming Runtime Contract](/edge/en/learn/streaming-runtime-contract).
|
||||
|
||||
## Turn lifecycle
|
||||
|
||||
Each `handle_turn` runs this pipeline:
|
||||
|
||||
@@ -11,7 +11,7 @@ CrewAI Flows support streaming output, allowing you to receive real-time updates
|
||||
|
||||
## How Flow Streaming Works
|
||||
|
||||
When streaming is enabled on a Flow, CrewAI captures and streams output from any crews or LLM calls within the flow. The stream delivers structured chunks containing the content, task context, and agent information as execution progresses.
|
||||
When streaming is enabled on a Flow, CrewAI captures and streams output from any crews, LLM calls, tools, and lifecycle events within the flow. The stream delivers ordered `StreamFrame` items with printable content plus structured event data as execution progresses.
|
||||
|
||||
## Enabling Streaming
|
||||
|
||||
@@ -52,7 +52,7 @@ class ResearchFlow(Flow):
|
||||
|
||||
## Synchronous Streaming
|
||||
|
||||
When you call `kickoff()` on a flow with streaming enabled, it returns a `FlowStreamingOutput` object that you can iterate over:
|
||||
When you call `kickoff()` on a flow with streaming enabled, it returns a stream session that yields ordered `StreamFrame` items:
|
||||
|
||||
```python Code
|
||||
flow = ResearchFlow()
|
||||
@@ -60,44 +60,43 @@ flow = ResearchFlow()
|
||||
# Start streaming execution
|
||||
streaming = flow.kickoff()
|
||||
|
||||
# Iterate over chunks as they arrive
|
||||
for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
# Iterate over stream items as they arrive
|
||||
for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
# Access the final result after streaming completes
|
||||
result = streaming.result
|
||||
print(f"\n\nFinal output: {result}")
|
||||
```
|
||||
|
||||
### Stream Chunk Information
|
||||
### Stream Item Information
|
||||
|
||||
Each chunk provides context about where it originated in the flow:
|
||||
Each item provides both printable content and structured event data:
|
||||
|
||||
```python Code
|
||||
streaming = flow.kickoff()
|
||||
|
||||
for chunk in streaming:
|
||||
print(f"Agent: {chunk.agent_role}")
|
||||
print(f"Task: {chunk.task_name}")
|
||||
print(f"Content: {chunk.content}")
|
||||
print(f"Type: {chunk.chunk_type}") # TEXT or TOOL_CALL
|
||||
for item in streaming:
|
||||
print(f"Channel: {item.channel}")
|
||||
print(f"Type: {item.type}")
|
||||
print(f"Content: {item.content}")
|
||||
print(f"Event payload: {item.event}")
|
||||
```
|
||||
|
||||
### Accessing Streaming Properties
|
||||
|
||||
The `FlowStreamingOutput` object provides useful properties and methods:
|
||||
The stream session provides useful properties and methods:
|
||||
|
||||
```python Code
|
||||
streaming = flow.kickoff()
|
||||
|
||||
# Iterate and collect chunks
|
||||
for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
# Iterate and collect items
|
||||
for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
# After iteration completes
|
||||
print(f"\nCompleted: {streaming.is_completed}")
|
||||
print(f"Full text: {streaming.get_full_text()}")
|
||||
print(f"Total chunks: {len(streaming.chunks)}")
|
||||
print(f"Total frames: {len(streaming.frames)}")
|
||||
print(f"Final result: {streaming.result}")
|
||||
```
|
||||
|
||||
@@ -114,9 +113,9 @@ async def stream_flow():
|
||||
# Start async streaming
|
||||
streaming = await flow.kickoff_async()
|
||||
|
||||
# Async iteration over chunks
|
||||
async for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
# Async iteration over stream items
|
||||
async for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
# Access final result
|
||||
result = streaming.result
|
||||
@@ -181,13 +180,14 @@ flow = MultiStepFlow()
|
||||
streaming = flow.kickoff()
|
||||
|
||||
current_step = ""
|
||||
for chunk in streaming:
|
||||
for item in streaming:
|
||||
# Track which flow step is executing
|
||||
if chunk.task_name != current_step:
|
||||
current_step = chunk.task_name
|
||||
print(f"\n\n=== {chunk.task_name} ===\n")
|
||||
step_name = item.event.get("method_name") or item.event.get("task_name")
|
||||
if step_name and step_name != current_step:
|
||||
current_step = step_name
|
||||
print(f"\n\n=== {step_name} ===\n")
|
||||
|
||||
print(chunk.content, end="", flush=True)
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
result = streaming.result
|
||||
print(f"\n\nFinal analysis: {result}")
|
||||
@@ -201,7 +201,6 @@ Here's a complete example showing how to build a progress dashboard with streami
|
||||
import asyncio
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.types.streaming import StreamChunkType
|
||||
|
||||
class ResearchPipeline(Flow):
|
||||
stream = True
|
||||
@@ -254,33 +253,35 @@ async def run_with_dashboard():
|
||||
|
||||
current_agent = ""
|
||||
current_task = ""
|
||||
chunk_count = 0
|
||||
frame_count = 0
|
||||
|
||||
async for chunk in streaming:
|
||||
chunk_count += 1
|
||||
async for item in streaming:
|
||||
frame_count += 1
|
||||
|
||||
# Display phase transitions
|
||||
if chunk.task_name != current_task:
|
||||
current_task = chunk.task_name
|
||||
current_agent = chunk.agent_role
|
||||
task_name = item.event.get("task_name", "")
|
||||
agent_role = item.event.get("agent_role", "")
|
||||
if task_name and task_name != current_task:
|
||||
current_task = task_name
|
||||
current_agent = agent_role
|
||||
print(f"\n\n📋 Phase: {current_task}")
|
||||
print(f"👤 Agent: {current_agent}")
|
||||
print("-" * 60)
|
||||
|
||||
# Display text output
|
||||
if chunk.chunk_type == StreamChunkType.TEXT:
|
||||
print(chunk.content, end="", flush=True)
|
||||
if item.content:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
# Display tool usage
|
||||
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
|
||||
print(f"\n🔧 Tool: {chunk.tool_call.tool_name}")
|
||||
elif item.channel == "tools":
|
||||
print(f"\n🔧 Tool event: {item.type}")
|
||||
|
||||
# Show completion summary
|
||||
result = streaming.result
|
||||
print(f"\n\n{'='*60}")
|
||||
print("PIPELINE COMPLETE")
|
||||
print(f"{'='*60}")
|
||||
print(f"Total chunks: {chunk_count}")
|
||||
print(f"Total frames: {frame_count}")
|
||||
print(f"Final output length: {len(str(result))} characters")
|
||||
|
||||
asyncio.run(run_with_dashboard())
|
||||
@@ -353,8 +354,8 @@ class StatefulStreamingFlow(Flow[AnalysisState]):
|
||||
flow = StatefulStreamingFlow()
|
||||
streaming = flow.kickoff(inputs={"topic": "quantum computing"})
|
||||
|
||||
for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
result = streaming.result
|
||||
print(f"\n\nFinal state:")
|
||||
@@ -374,29 +375,29 @@ Flow streaming is particularly valuable for:
|
||||
- **Progress Tracking**: Show users which stage of the workflow is currently executing
|
||||
- **Live Dashboards**: Create monitoring interfaces for production flows
|
||||
|
||||
## Stream Chunk Types
|
||||
## Stream Frame Channels
|
||||
|
||||
Like crew streaming, flow chunks can be of different types:
|
||||
Flow streaming yields `StreamFrame` items across several channels:
|
||||
|
||||
### TEXT Chunks
|
||||
### LLM Frames
|
||||
|
||||
Standard text content from LLM responses:
|
||||
|
||||
```python Code
|
||||
for chunk in streaming:
|
||||
if chunk.chunk_type == StreamChunkType.TEXT:
|
||||
print(chunk.content, end="", flush=True)
|
||||
for item in streaming:
|
||||
if item.channel == "llm" and item.content:
|
||||
print(item.content, end="", flush=True)
|
||||
```
|
||||
|
||||
### TOOL_CALL Chunks
|
||||
### Tool Frames
|
||||
|
||||
Information about tool calls within the flow:
|
||||
|
||||
```python Code
|
||||
for chunk in streaming:
|
||||
if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
|
||||
print(f"\nTool: {chunk.tool_call.tool_name}")
|
||||
print(f"Args: {chunk.tool_call.arguments}")
|
||||
for item in streaming:
|
||||
if item.channel == "tools":
|
||||
print(f"\nTool event: {item.type}")
|
||||
print(f"Payload: {item.event}")
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
@@ -408,8 +409,8 @@ flow = ResearchFlow()
|
||||
streaming = flow.kickoff()
|
||||
|
||||
try:
|
||||
for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
result = streaming.result
|
||||
print(f"\nSuccess! Result: {result}")
|
||||
@@ -422,7 +423,7 @@ except Exception as e:
|
||||
|
||||
## Cancellation and Resource Cleanup
|
||||
|
||||
`FlowStreamingOutput` supports graceful cancellation so that in-flight work stops promptly when the consumer disconnects.
|
||||
The stream session supports graceful cancellation so that in-flight work stops promptly when the consumer disconnects.
|
||||
|
||||
### Async Context Manager
|
||||
|
||||
@@ -430,8 +431,8 @@ except Exception as e:
|
||||
streaming = await flow.kickoff_async()
|
||||
|
||||
async with streaming:
|
||||
async for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
async for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
```
|
||||
|
||||
### Explicit Cancellation
|
||||
@@ -439,8 +440,8 @@ async with streaming:
|
||||
```python Code
|
||||
streaming = await flow.kickoff_async()
|
||||
try:
|
||||
async for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
async for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
finally:
|
||||
await streaming.aclose() # async
|
||||
# streaming.close() # sync equivalent
|
||||
@@ -451,10 +452,10 @@ After cancellation, `streaming.is_cancelled` and `streaming.is_completed` are bo
|
||||
## Important Notes
|
||||
|
||||
- Streaming automatically enables LLM streaming for any crews used within the flow
|
||||
- You must iterate through all chunks before accessing the `.result` property
|
||||
- You must iterate through all stream items before accessing the `.result` property
|
||||
- Streaming works with both structured and unstructured flow state
|
||||
- Flow streaming captures output from all crews and LLM calls in the flow
|
||||
- Each chunk includes context about which agent and task generated it
|
||||
- Each frame includes structured event context such as channel, type, namespace, and payload
|
||||
- Streaming adds minimal overhead to flow execution
|
||||
|
||||
## Combining with Flow Visualization
|
||||
@@ -468,11 +469,11 @@ flow.plot("research_flow") # Creates HTML visualization
|
||||
|
||||
# Run with streaming
|
||||
streaming = flow.kickoff()
|
||||
for chunk in streaming:
|
||||
print(chunk.content, end="", flush=True)
|
||||
for item in streaming:
|
||||
print(item.content, end="", flush=True)
|
||||
|
||||
result = streaming.result
|
||||
print(f"\nFlow complete! View structure at: research_flow.html")
|
||||
```
|
||||
|
||||
By leveraging flow streaming, you can build sophisticated, responsive applications that provide users with real-time visibility into complex multi-stage workflows, making your AI automations more transparent and engaging.
|
||||
By leveraging flow streaming, you can build sophisticated, responsive applications that provide users with real-time visibility into complex multi-stage workflows, making your AI automations more transparent and engaging.
|
||||
|
||||
194
docs/edge/en/learn/streaming-runtime-contract.mdx
Normal file
194
docs/edge/en/learn/streaming-runtime-contract.mdx
Normal file
@@ -0,0 +1,194 @@
|
||||
---
|
||||
title: Streaming Runtime Contract
|
||||
description: Stream ordered runtime frames from Flows, direct LLM calls, and conversational turns.
|
||||
icon: tower-broadcast
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
CrewAI exposes a frame-based streaming contract for runtimes that need more than plain text chunks. The contract emits ordered `StreamFrame` objects for Flow lifecycle events, direct LLM tokens, tool activity, conversation messages, and custom events.
|
||||
|
||||
Use this API when you are building a UI, service bridge, terminal app, or deployment runtime that needs a stable stream of structured events while a Flow, chat turn, or direct LLM call is running.
|
||||
|
||||
## StreamFrame
|
||||
|
||||
Every frame has the same envelope:
|
||||
|
||||
```python
|
||||
from crewai.types.streaming import StreamFrame
|
||||
|
||||
frame.id # unique frame id
|
||||
frame.seq # execution-local order, when available
|
||||
frame.type # source event type, such as "flow_started"
|
||||
frame.channel # "llm", "flow", "tools", "messages", "lifecycle", or "custom"
|
||||
frame.namespace # source/runtime namespace
|
||||
frame.timestamp # event timestamp
|
||||
frame.parent_id # parent event id, when available
|
||||
frame.previous_id # previous event id, when available
|
||||
frame.data # event payload
|
||||
frame.event # alias for frame.data
|
||||
frame.content # printable text for token-like frames, otherwise ""
|
||||
```
|
||||
|
||||
The `channel` field is the fastest way to route frames in consumers:
|
||||
|
||||
| Channel | Contains |
|
||||
|---------|----------|
|
||||
| `llm` | Token and thinking chunks from LLM streaming events |
|
||||
| `flow` | Flow lifecycle, method execution, routing, and pause/resume events |
|
||||
| `tools` | Tool usage events |
|
||||
| `messages` | Conversation transcript events |
|
||||
| `lifecycle` | Runtime lifecycle events that are not specific to another channel |
|
||||
| `custom` | Events that do not map to a built-in channel |
|
||||
|
||||
`frame.type` preserves the source event type, so consumers can handle specific events inside a channel.
|
||||
|
||||
## Stream a Flow
|
||||
|
||||
Set `stream=True` on a Flow to make `kickoff()` return a stream session:
|
||||
|
||||
```python
|
||||
from crewai.flow import Flow, start
|
||||
|
||||
|
||||
class ReportFlow(Flow):
|
||||
@start()
|
||||
def generate(self):
|
||||
return "done"
|
||||
|
||||
|
||||
flow = ReportFlow(stream=True)
|
||||
stream = flow.kickoff()
|
||||
|
||||
with stream:
|
||||
for chunk in stream:
|
||||
print(chunk.content, end="", flush=True)
|
||||
if chunk.type == "tool_usage_started":
|
||||
print(chunk.event["tool_name"])
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
You must consume the stream before reading `stream.result`. Accessing the result early raises a `RuntimeError` so consumers do not accidentally treat a partial run as complete.
|
||||
|
||||
You can also call `flow.stream_events(...)` directly when you want streaming for a single invocation without setting `stream=True` on the Flow instance.
|
||||
|
||||
## Filter by Channel
|
||||
|
||||
`StreamSession` exposes channel projections that preserve global frame order within the selected channel:
|
||||
|
||||
```python
|
||||
stream = flow.stream_events()
|
||||
|
||||
with stream:
|
||||
for frame in stream.llm:
|
||||
print(frame.content, end="", flush=True)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
Available projections are:
|
||||
|
||||
| Projection | Frames |
|
||||
|------------|--------|
|
||||
| `stream.events` | All frames |
|
||||
| `stream.llm` | LLM frames |
|
||||
| `stream.messages` | Conversation message frames |
|
||||
| `stream.flow` | Flow frames |
|
||||
| `stream.tools` | Tool frames |
|
||||
| `stream.interleave([...])` | A selected set of channels |
|
||||
|
||||
Use `stream.interleave(["flow", "llm", "messages"])` when a consumer wants only some channels but still needs their relative order.
|
||||
|
||||
## Async Streaming
|
||||
|
||||
Use `astream()` for async consumers:
|
||||
|
||||
```python
|
||||
flow = ReportFlow()
|
||||
stream = flow.astream()
|
||||
|
||||
async with stream:
|
||||
async for chunk in stream.events:
|
||||
print(chunk.channel, chunk.type, chunk.content)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
The async session has the same projections as the sync session.
|
||||
|
||||
## Stream a Direct LLM Call
|
||||
|
||||
`llm.call(...)` still returns the final assembled result. Use `llm.stream_events(...)` when you want to iterate over chunks as they arrive while keeping the structured event payload:
|
||||
|
||||
```python
|
||||
from crewai import LLM
|
||||
|
||||
|
||||
llm = LLM(model="gpt-4o-mini")
|
||||
stream = llm.stream_events(
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Explain CrewAI streaming in two short sentences.",
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
with stream:
|
||||
for chunk in stream:
|
||||
print(chunk.content, end="", flush=True)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
`llm.stream_events(...)` temporarily enables streaming for the wrapped call and restores the LLM's previous `stream` setting afterward. Provider integrations continue to emit the underlying LLM stream events; this helper provides a common iterator API over those events for every LLM provider.
|
||||
|
||||
## Conversational Turns
|
||||
|
||||
Conversational Flows can stream one user turn with `stream_turn()`:
|
||||
|
||||
```python
|
||||
from crewai import Flow
|
||||
from crewai.experimental.conversational import ConversationConfig, ConversationState
|
||||
|
||||
|
||||
@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
|
||||
class ChatFlow(Flow[ConversationState]):
|
||||
conversational = True
|
||||
|
||||
|
||||
flow = ChatFlow()
|
||||
stream = flow.stream_turn("What can you help me with?", session_id="session-1")
|
||||
|
||||
with stream:
|
||||
for frame in stream.events:
|
||||
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
|
||||
print(frame.content, end="", flush=True)
|
||||
|
||||
reply = stream.result
|
||||
```
|
||||
|
||||
During `stream_turn()`, the built-in conversational answer path enables LLM token streaming for that turn and restores the LLM's previous `stream` setting afterward. Custom route handlers that create their own agents or LLM instances should configure those LLMs for streaming if they need token-level output.
|
||||
|
||||
## Cleanup
|
||||
|
||||
Use the session as a context manager when possible. If a client disconnects before the stream is exhausted, close the session explicitly:
|
||||
|
||||
```python
|
||||
stream = flow.stream_events()
|
||||
|
||||
try:
|
||||
for frame in stream.events:
|
||||
print(frame.type)
|
||||
finally:
|
||||
if not stream.is_exhausted:
|
||||
stream.close()
|
||||
```
|
||||
|
||||
For async streams, use `await stream.aclose()`.
|
||||
|
||||
## Legacy Chunk Streaming
|
||||
|
||||
Crew streaming with `stream=True` still returns the chunk-oriented `CrewStreamingOutput` API described in [Streaming Crew Execution](/en/learn/streaming-crew-execution). Direct `llm.call(...)` still returns the final LLM result. The frame contract is intended for runtimes that need a stable event envelope across Flows, direct LLM calls, conversational turns, tools, and messages.
|
||||
194
docs/edge/ko/learn/streaming-runtime-contract.mdx
Normal file
194
docs/edge/ko/learn/streaming-runtime-contract.mdx
Normal file
@@ -0,0 +1,194 @@
|
||||
---
|
||||
title: 스트리밍 런타임 계약
|
||||
description: Flow, 직접 LLM 호출, 대화 턴에서 정렬된 런타임 프레임을 스트리밍합니다.
|
||||
icon: tower-broadcast
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
## 개요
|
||||
|
||||
CrewAI는 단순한 텍스트 청크보다 더 많은 정보가 필요한 런타임을 위해 프레임 기반 스트리밍 계약을 제공합니다. 이 계약은 Flow 생명주기 이벤트, 직접 LLM 토큰, 도구 활동, 대화 메시지, 사용자 지정 이벤트에 대해 정렬된 `StreamFrame` 객체를 방출합니다.
|
||||
|
||||
UI, 서비스 브리지, 터미널 앱, 배포 런타임을 만들 때 Flow, 채팅 턴, 직접 LLM 호출이 실행되는 동안 안정적인 구조화 이벤트 스트림이 필요하다면 이 API를 사용하세요.
|
||||
|
||||
## StreamFrame
|
||||
|
||||
모든 프레임은 같은 envelope를 가집니다:
|
||||
|
||||
```python
|
||||
from crewai.types.streaming import StreamFrame
|
||||
|
||||
frame.id # 고유 프레임 id
|
||||
frame.seq # 사용 가능한 경우 실행 로컬 순서
|
||||
frame.type # "flow_started" 같은 원본 이벤트 타입
|
||||
frame.channel # "llm", "flow", "tools", "messages", "lifecycle", "custom"
|
||||
frame.namespace # 소스/런타임 namespace
|
||||
frame.timestamp # 이벤트 timestamp
|
||||
frame.parent_id # 사용 가능한 경우 부모 이벤트 id
|
||||
frame.previous_id # 사용 가능한 경우 이전 이벤트 id
|
||||
frame.data # 이벤트 payload
|
||||
frame.event # frame.data의 alias
|
||||
frame.content # 토큰류 프레임의 출력 가능한 텍스트, 그 외에는 ""
|
||||
```
|
||||
|
||||
`channel` 필드는 소비자에서 프레임을 라우팅하는 가장 빠른 방법입니다:
|
||||
|
||||
| 채널 | 포함 내용 |
|
||||
|------|-----------|
|
||||
| `llm` | LLM 스트리밍 이벤트의 토큰 및 thinking 청크 |
|
||||
| `flow` | Flow 생명주기, 메서드 실행, 라우팅, pause/resume 이벤트 |
|
||||
| `tools` | 도구 사용 이벤트 |
|
||||
| `messages` | 대화 transcript 이벤트 |
|
||||
| `lifecycle` | 다른 채널에 속하지 않는 런타임 생명주기 이벤트 |
|
||||
| `custom` | 내장 채널에 매핑되지 않는 이벤트 |
|
||||
|
||||
`frame.type`은 원본 이벤트 타입을 보존하므로, 소비자는 채널 안에서 특정 이벤트를 처리할 수 있습니다.
|
||||
|
||||
## Flow 스트리밍
|
||||
|
||||
Flow에 `stream=True`를 설정하면 `kickoff()`가 stream session을 반환합니다:
|
||||
|
||||
```python
|
||||
from crewai.flow import Flow, start
|
||||
|
||||
|
||||
class ReportFlow(Flow):
|
||||
@start()
|
||||
def generate(self):
|
||||
return "done"
|
||||
|
||||
|
||||
flow = ReportFlow(stream=True)
|
||||
stream = flow.kickoff()
|
||||
|
||||
with stream:
|
||||
for chunk in stream:
|
||||
print(chunk.content, end="", flush=True)
|
||||
if chunk.type == "tool_usage_started":
|
||||
print(chunk.event["tool_name"])
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
`stream.result`를 읽기 전에 stream을 소비해야 합니다. 결과를 너무 일찍 접근하면 `RuntimeError`가 발생하여, 소비자가 부분 실행을 완료된 실행으로 잘못 처리하지 않도록 합니다.
|
||||
|
||||
Flow 인스턴스에 `stream=True`를 설정하지 않고 단일 호출만 스트리밍하려면 `flow.stream_events(...)`를 직접 호출할 수도 있습니다.
|
||||
|
||||
## 채널별 필터링
|
||||
|
||||
`StreamSession`은 선택한 채널 안에서 전역 프레임 순서를 보존하는 채널 projection을 제공합니다:
|
||||
|
||||
```python
|
||||
stream = flow.stream_events()
|
||||
|
||||
with stream:
|
||||
for frame in stream.llm:
|
||||
print(frame.content, end="", flush=True)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
사용 가능한 projection은 다음과 같습니다:
|
||||
|
||||
| Projection | 프레임 |
|
||||
|------------|--------|
|
||||
| `stream.events` | 모든 프레임 |
|
||||
| `stream.llm` | LLM 프레임 |
|
||||
| `stream.messages` | 대화 메시지 프레임 |
|
||||
| `stream.flow` | Flow 프레임 |
|
||||
| `stream.tools` | 도구 프레임 |
|
||||
| `stream.interleave([...])` | 선택한 채널 집합 |
|
||||
|
||||
소비자가 일부 채널만 원하지만 상대 순서도 필요하다면 `stream.interleave(["flow", "llm", "messages"])`를 사용하세요.
|
||||
|
||||
## 비동기 스트리밍
|
||||
|
||||
비동기 소비자는 `astream()`을 사용하세요:
|
||||
|
||||
```python
|
||||
flow = ReportFlow()
|
||||
stream = flow.astream()
|
||||
|
||||
async with stream:
|
||||
async for chunk in stream.events:
|
||||
print(chunk.channel, chunk.type, chunk.content)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
비동기 세션은 동기 세션과 같은 projection을 제공합니다.
|
||||
|
||||
## 직접 LLM 호출 스트리밍
|
||||
|
||||
`llm.call(...)`은 계속 최종 조립 결과를 반환합니다. 구조화된 이벤트 payload를 유지하면서 청크가 도착하는 대로 반복 처리하려면 `llm.stream_events(...)`를 사용하세요:
|
||||
|
||||
```python
|
||||
from crewai import LLM
|
||||
|
||||
|
||||
llm = LLM(model="gpt-4o-mini")
|
||||
stream = llm.stream_events(
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Explain CrewAI streaming in two short sentences.",
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
with stream:
|
||||
for chunk in stream:
|
||||
print(chunk.content, end="", flush=True)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
`llm.stream_events(...)`는 감싼 호출 동안 일시적으로 streaming을 활성화하고, 이후 LLM의 이전 `stream` 설정을 복원합니다. provider 통합은 계속 기본 LLM stream 이벤트를 방출하며, 이 helper는 모든 LLM provider에서 그 이벤트 위에 공통 iterator API를 제공합니다.
|
||||
|
||||
## 대화 턴
|
||||
|
||||
대화형 Flow는 `stream_turn()`으로 사용자 턴 하나를 스트리밍할 수 있습니다:
|
||||
|
||||
```python
|
||||
from crewai import Flow
|
||||
from crewai.experimental.conversational import ConversationConfig, ConversationState
|
||||
|
||||
|
||||
@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
|
||||
class ChatFlow(Flow[ConversationState]):
|
||||
conversational = True
|
||||
|
||||
|
||||
flow = ChatFlow()
|
||||
stream = flow.stream_turn("What can you help me with?", session_id="session-1")
|
||||
|
||||
with stream:
|
||||
for frame in stream.events:
|
||||
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
|
||||
print(frame.content, end="", flush=True)
|
||||
|
||||
reply = stream.result
|
||||
```
|
||||
|
||||
`stream_turn()` 중에는 내장 대화 응답 경로가 해당 턴에 대해 LLM 토큰 스트리밍을 활성화하고 이후 LLM의 이전 `stream` 설정을 복원합니다. 자체 agent 또는 LLM 인스턴스를 만드는 사용자 지정 route handler는 토큰 단위 출력이 필요하다면 해당 LLM을 streaming으로 구성해야 합니다.
|
||||
|
||||
## 정리
|
||||
|
||||
가능하면 세션을 context manager로 사용하세요. stream이 끝나기 전에 클라이언트 연결이 끊기면 세션을 명시적으로 닫으세요:
|
||||
|
||||
```python
|
||||
stream = flow.stream_events()
|
||||
|
||||
try:
|
||||
for frame in stream.events:
|
||||
print(frame.type)
|
||||
finally:
|
||||
if not stream.is_exhausted:
|
||||
stream.close()
|
||||
```
|
||||
|
||||
비동기 stream에서는 `await stream.aclose()`를 사용하세요.
|
||||
|
||||
## 레거시 청크 스트리밍
|
||||
|
||||
`stream=True`를 사용하는 Crew 스트리밍은 계속 [스트리밍 Crew 실행](/ko/learn/streaming-crew-execution)에 설명된 청크 중심 `CrewStreamingOutput` API를 반환합니다. 직접 `llm.call(...)` 호출도 계속 최종 LLM 결과를 반환합니다. 프레임 계약은 Flow, 직접 LLM 호출, 대화 턴, 도구, 메시지 전반에서 안정적인 이벤트 envelope가 필요한 런타임을 위한 것입니다.
|
||||
194
docs/edge/pt-BR/learn/streaming-runtime-contract.mdx
Normal file
194
docs/edge/pt-BR/learn/streaming-runtime-contract.mdx
Normal file
@@ -0,0 +1,194 @@
|
||||
---
|
||||
title: Contrato de Streaming do Runtime
|
||||
description: Transmita frames ordenados do runtime a partir de Flows, chamadas diretas de LLM e turnos conversacionais.
|
||||
icon: tower-broadcast
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
## Visão geral
|
||||
|
||||
O CrewAI expõe um contrato de streaming baseado em frames para runtimes que precisam de mais do que chunks de texto simples. O contrato emite objetos `StreamFrame` ordenados para eventos de ciclo de vida de Flow, tokens de LLM diretos, atividade de ferramentas, mensagens de conversa e eventos personalizados.
|
||||
|
||||
Use esta API ao criar uma UI, ponte de serviço, aplicativo de terminal ou runtime de implantação que precise de um fluxo estável de eventos estruturados enquanto um Flow, turno de chat ou chamada direta de LLM está em execução.
|
||||
|
||||
## StreamFrame
|
||||
|
||||
Todo frame tem o mesmo envelope:
|
||||
|
||||
```python
|
||||
from crewai.types.streaming import StreamFrame
|
||||
|
||||
frame.id # id único do frame
|
||||
frame.seq # ordem local da execução, quando disponível
|
||||
frame.type # tipo do evento de origem, como "flow_started"
|
||||
frame.channel # "llm", "flow", "tools", "messages", "lifecycle" ou "custom"
|
||||
frame.namespace # namespace de origem/runtime
|
||||
frame.timestamp # timestamp do evento
|
||||
frame.parent_id # id do evento pai, quando disponível
|
||||
frame.previous_id # id do evento anterior, quando disponível
|
||||
frame.data # payload do evento
|
||||
frame.event # alias para frame.data
|
||||
frame.content # texto imprimível para frames de token, caso contrário ""
|
||||
```
|
||||
|
||||
O campo `channel` é a forma mais rápida de rotear frames em consumidores:
|
||||
|
||||
| Canal | Contém |
|
||||
|-------|--------|
|
||||
| `llm` | Tokens e chunks de raciocínio de eventos de streaming de LLM |
|
||||
| `flow` | Ciclo de vida do Flow, execução de métodos, roteamento e eventos de pausa/retomada |
|
||||
| `tools` | Eventos de uso de ferramentas |
|
||||
| `messages` | Eventos do transcript da conversa |
|
||||
| `lifecycle` | Eventos de ciclo de vida do runtime que não pertencem a outro canal |
|
||||
| `custom` | Eventos que não mapeiam para um canal integrado |
|
||||
|
||||
`frame.type` preserva o tipo do evento de origem, para que consumidores possam tratar eventos específicos dentro de um canal.
|
||||
|
||||
## Transmitir um Flow
|
||||
|
||||
Defina `stream=True` em um Flow para fazer `kickoff()` retornar uma sessão de stream:
|
||||
|
||||
```python
|
||||
from crewai.flow import Flow, start
|
||||
|
||||
|
||||
class ReportFlow(Flow):
|
||||
@start()
|
||||
def generate(self):
|
||||
return "done"
|
||||
|
||||
|
||||
flow = ReportFlow(stream=True)
|
||||
stream = flow.kickoff()
|
||||
|
||||
with stream:
|
||||
for chunk in stream:
|
||||
print(chunk.content, end="", flush=True)
|
||||
if chunk.type == "tool_usage_started":
|
||||
print(chunk.event["tool_name"])
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
Você deve consumir o stream antes de ler `stream.result`. Acessar o resultado cedo demais gera um `RuntimeError`, para que consumidores não tratem uma execução parcial como concluída.
|
||||
|
||||
Você também pode chamar `flow.stream_events(...)` diretamente quando quiser streaming para uma única invocação sem definir `stream=True` na instância do Flow.
|
||||
|
||||
## Filtrar por canal
|
||||
|
||||
`StreamSession` expõe projeções por canal que preservam a ordem global dos frames dentro do canal selecionado:
|
||||
|
||||
```python
|
||||
stream = flow.stream_events()
|
||||
|
||||
with stream:
|
||||
for frame in stream.llm:
|
||||
print(frame.content, end="", flush=True)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
As projeções disponíveis são:
|
||||
|
||||
| Projeção | Frames |
|
||||
|----------|--------|
|
||||
| `stream.events` | Todos os frames |
|
||||
| `stream.llm` | Frames de LLM |
|
||||
| `stream.messages` | Frames de mensagens de conversa |
|
||||
| `stream.flow` | Frames de Flow |
|
||||
| `stream.tools` | Frames de ferramentas |
|
||||
| `stream.interleave([...])` | Um conjunto selecionado de canais |
|
||||
|
||||
Use `stream.interleave(["flow", "llm", "messages"])` quando um consumidor quiser apenas alguns canais, mas ainda precisar da ordem relativa entre eles.
|
||||
|
||||
## Streaming assíncrono
|
||||
|
||||
Use `astream()` para consumidores assíncronos:
|
||||
|
||||
```python
|
||||
flow = ReportFlow()
|
||||
stream = flow.astream()
|
||||
|
||||
async with stream:
|
||||
async for chunk in stream.events:
|
||||
print(chunk.channel, chunk.type, chunk.content)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
A sessão assíncrona tem as mesmas projeções da sessão síncrona.
|
||||
|
||||
## Transmitir uma chamada direta de LLM
|
||||
|
||||
`llm.call(...)` ainda retorna o resultado final montado. Use `llm.stream_events(...)` quando quiser iterar pelos chunks conforme eles chegam, mantendo o payload estruturado do evento:
|
||||
|
||||
```python
|
||||
from crewai import LLM
|
||||
|
||||
|
||||
llm = LLM(model="gpt-4o-mini")
|
||||
stream = llm.stream_events(
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Explain CrewAI streaming in two short sentences.",
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
with stream:
|
||||
for chunk in stream:
|
||||
print(chunk.content, end="", flush=True)
|
||||
|
||||
result = stream.result
|
||||
```
|
||||
|
||||
`llm.stream_events(...)` ativa temporariamente o streaming para a chamada encapsulada e restaura a configuração anterior de `stream` do LLM depois. As integrações de provedores continuam emitindo os eventos de stream de LLM subjacentes; esse helper fornece uma API de iterador comum sobre esses eventos para todos os provedores de LLM.
|
||||
|
||||
## Turnos conversacionais
|
||||
|
||||
Flows conversacionais podem transmitir um turno de usuário com `stream_turn()`:
|
||||
|
||||
```python
|
||||
from crewai import Flow
|
||||
from crewai.experimental.conversational import ConversationConfig, ConversationState
|
||||
|
||||
|
||||
@ConversationConfig(llm="gpt-4o-mini", defer_trace_finalization=True)
|
||||
class ChatFlow(Flow[ConversationState]):
|
||||
conversational = True
|
||||
|
||||
|
||||
flow = ChatFlow()
|
||||
stream = flow.stream_turn("What can you help me with?", session_id="session-1")
|
||||
|
||||
with stream:
|
||||
for frame in stream.events:
|
||||
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
|
||||
print(frame.content, end="", flush=True)
|
||||
|
||||
reply = stream.result
|
||||
```
|
||||
|
||||
Durante `stream_turn()`, o caminho de resposta conversacional integrado ativa o streaming de tokens de LLM para esse turno e restaura a configuração anterior de `stream` do LLM depois. Handlers de rota personalizados que criam seus próprios agentes ou instâncias de LLM devem configurar esses LLMs para streaming se precisarem de saída em nível de token.
|
||||
|
||||
## Limpeza
|
||||
|
||||
Use a sessão como gerenciador de contexto quando possível. Se um cliente se desconectar antes de o stream ser esgotado, feche a sessão explicitamente:
|
||||
|
||||
```python
|
||||
stream = flow.stream_events()
|
||||
|
||||
try:
|
||||
for frame in stream.events:
|
||||
print(frame.type)
|
||||
finally:
|
||||
if not stream.is_exhausted:
|
||||
stream.close()
|
||||
```
|
||||
|
||||
Para streams assíncronos, use `await stream.aclose()`.
|
||||
|
||||
## Streaming de chunks legado
|
||||
|
||||
O streaming de Crew com `stream=True` ainda retorna a API orientada a chunks `CrewStreamingOutput` descrita em [Streaming da Execução de Crew](/pt-BR/learn/streaming-crew-execution). Chamadas diretas `llm.call(...)` ainda retornam o resultado final do LLM. O contrato de frames é destinado a runtimes que precisam de um envelope de evento estável em Flows, chamadas diretas de LLM, turnos conversacionais, ferramentas e mensagens.
|
||||
@@ -164,13 +164,7 @@ def test_navigate_command(mock_run, stagehand_tool):
|
||||
)
|
||||
|
||||
# Assertions
|
||||
assert result == "Successfully navigated to https://example.com"
|
||||
mock_run.assert_called_once_with(
|
||||
stagehand_tool,
|
||||
instruction="Go to example.com",
|
||||
url="https://example.com",
|
||||
command_type="navigate",
|
||||
)
|
||||
assert "https://example.com" in result
|
||||
|
||||
|
||||
@patch(
|
||||
|
||||
@@ -40,6 +40,7 @@ from crewai.events.event_context import (
|
||||
set_last_event_id,
|
||||
)
|
||||
from crewai.events.handler_graph import build_execution_plan
|
||||
from crewai.events.stream_context import publish_stream_event
|
||||
from crewai.events.types.event_bus_types import (
|
||||
AsyncHandler,
|
||||
AsyncHandlerSet,
|
||||
@@ -565,6 +566,7 @@ class CrewAIEventsBus:
|
||||
|
||||
set_last_event_id(event.event_id)
|
||||
|
||||
publish_stream_event(source, event)
|
||||
self._record_event(event)
|
||||
|
||||
def emit(self, source: Any, event: BaseEvent) -> Future[None] | None:
|
||||
@@ -775,9 +777,7 @@ class CrewAIEventsBus:
|
||||
source: The object emitting the event
|
||||
event: The event instance to emit
|
||||
"""
|
||||
self._register_source(source)
|
||||
event.emission_sequence = get_next_emission_sequence()
|
||||
self._record_event(event)
|
||||
self._prepare_event(source, event)
|
||||
|
||||
event_type = type(event)
|
||||
|
||||
|
||||
30
lib/crewai/src/crewai/events/stream_context.py
Normal file
30
lib/crewai/src/crewai/events/stream_context.py
Normal file
@@ -0,0 +1,30 @@
|
||||
"""Scoped stream sinks for converting emitted events into public frames."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
import contextvars
|
||||
from typing import Any
|
||||
|
||||
|
||||
StreamSink = Callable[[Any, Any], None]
|
||||
|
||||
_stream_sinks: contextvars.ContextVar[tuple[StreamSink, ...]] = contextvars.ContextVar(
|
||||
"crewai_stream_sinks", default=()
|
||||
)
|
||||
|
||||
|
||||
def add_stream_sink(sink: StreamSink) -> contextvars.Token[tuple[StreamSink, ...]]:
|
||||
"""Register a sink in the current context."""
|
||||
return _stream_sinks.set((*_stream_sinks.get(), sink))
|
||||
|
||||
|
||||
def reset_stream_sinks(token: contextvars.Token[tuple[StreamSink, ...]]) -> None:
|
||||
"""Restore the stream sink context."""
|
||||
_stream_sinks.reset(token)
|
||||
|
||||
|
||||
def publish_stream_event(source: Any, event: Any) -> None:
|
||||
"""Publish a prepared event to sinks scoped to the current execution."""
|
||||
for sink in _stream_sinks.get():
|
||||
sink(source, event)
|
||||
@@ -18,7 +18,8 @@ Import surface:
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable, Mapping, Sequence
|
||||
from collections.abc import Callable, Iterator, Mapping, Sequence
|
||||
from contextlib import contextmanager
|
||||
from enum import Enum
|
||||
import json
|
||||
import logging
|
||||
@@ -44,6 +45,7 @@ from crewai.experimental.conversational import (
|
||||
_conversational_only,
|
||||
message_to_llm_dict,
|
||||
)
|
||||
from crewai.flow.async_feedback import HumanFeedbackPending
|
||||
from crewai.flow.conversation import (
|
||||
append_message as _append_conversation_message,
|
||||
get_conversation_messages,
|
||||
@@ -221,7 +223,9 @@ class _ConversationalMixin:
|
||||
messages.append({"role": "system", "content": system_prompt})
|
||||
messages.extend(self.conversation_messages)
|
||||
|
||||
response = self._coerce_llm(llm).call(messages=messages)
|
||||
llm_instance = self._coerce_llm(llm)
|
||||
with self._conversation_streaming_enabled(llm_instance):
|
||||
response = llm_instance.call(messages=messages)
|
||||
content = self._stringify_result(response)
|
||||
self.append_assistant_message(content)
|
||||
return content
|
||||
@@ -254,7 +258,8 @@ class _ConversationalMixin:
|
||||
},
|
||||
*self.build_agent_context("answer_from_history"),
|
||||
]
|
||||
response = llm_instance.call(messages=messages)
|
||||
with self._conversation_streaming_enabled(llm_instance):
|
||||
response = llm_instance.call(messages=messages)
|
||||
content = self._stringify_result(response)
|
||||
self.append_assistant_message(content)
|
||||
return content
|
||||
@@ -337,6 +342,102 @@ class _ConversationalMixin:
|
||||
)
|
||||
return result
|
||||
|
||||
def stream_turn(
|
||||
self,
|
||||
message: str,
|
||||
*,
|
||||
session_id: str | None = None,
|
||||
intents: Sequence[str] | None = None,
|
||||
intent_llm: str | BaseLLM | None = None,
|
||||
**kickoff_kwargs: Any,
|
||||
) -> Any:
|
||||
"""Append a user message and stream one conversational turn as frames."""
|
||||
if not self._is_conversational_enabled():
|
||||
raise ValueError(
|
||||
"Flow.stream_turn() is only available on conversational flows"
|
||||
)
|
||||
|
||||
from crewai.types.streaming import StreamSession
|
||||
from crewai.utilities.streaming import (
|
||||
create_frame_generator,
|
||||
create_frame_streaming_state,
|
||||
)
|
||||
|
||||
state = cast(ConversationState, self.state)
|
||||
sid = session_id or state.id
|
||||
result_holder: list[Any] = []
|
||||
frame_state = create_frame_streaming_state(result_holder, use_async=False)
|
||||
output_holder: list[StreamSession[Any]] = []
|
||||
|
||||
def run_turn() -> Any:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
ConversationTurnStartedEvent(
|
||||
type="conversation_turn_started",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
session_id=sid,
|
||||
),
|
||||
)
|
||||
|
||||
self._pending_user_message = message
|
||||
self._pending_intents = list(intents) if intents else None
|
||||
self._pending_intent_llm = intent_llm
|
||||
|
||||
try:
|
||||
if "from_checkpoint" not in kickoff_kwargs:
|
||||
self._reset_turn_execution_state()
|
||||
|
||||
assistant_count = self._assistant_message_count()
|
||||
original_stream = bool(getattr(self, "stream", False))
|
||||
original_streaming_turn = getattr(
|
||||
self, "_streaming_conversation_turn", False
|
||||
)
|
||||
try:
|
||||
object.__setattr__(self, "stream", False)
|
||||
object.__setattr__(self, "_streaming_conversation_turn", True)
|
||||
result = self.kickoff(inputs={"id": sid}, **kickoff_kwargs)
|
||||
finally:
|
||||
object.__setattr__(self, "stream", original_stream)
|
||||
object.__setattr__(
|
||||
self, "_streaming_conversation_turn", original_streaming_turn
|
||||
)
|
||||
if (
|
||||
result is not None
|
||||
and self._assistant_message_count() == assistant_count
|
||||
and self._is_public_turn_result(result)
|
||||
):
|
||||
self.append_assistant_message(self._stringify_result(result))
|
||||
except HumanFeedbackPending as exc:
|
||||
return exc
|
||||
except Exception as exc:
|
||||
failed_event = ConversationTurnFailedEvent(
|
||||
type="conversation_turn_failed",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
session_id=sid,
|
||||
error=exc,
|
||||
)
|
||||
self._emit_terminal_conversation_turn_event(failed_event)
|
||||
raise
|
||||
finally:
|
||||
self._pending_user_message = None
|
||||
self._pending_intents = None
|
||||
self._pending_intent_llm = None
|
||||
|
||||
self._emit_terminal_conversation_turn_event(
|
||||
ConversationTurnCompletedEvent(
|
||||
type="conversation_turn_completed",
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
session_id=sid,
|
||||
),
|
||||
)
|
||||
return result
|
||||
|
||||
stream_session: StreamSession[Any] = StreamSession(
|
||||
sync_iterator=create_frame_generator(frame_state, run_turn, output_holder)
|
||||
)
|
||||
output_holder.append(stream_session)
|
||||
return stream_session
|
||||
|
||||
def _emit_terminal_conversation_turn_event(
|
||||
self,
|
||||
event: ConversationTurnCompletedEvent | ConversationTurnFailedEvent,
|
||||
@@ -685,6 +786,8 @@ class _ConversationalMixin:
|
||||
object.__setattr__(self, "_pending_intents", None)
|
||||
if not hasattr(self, "_pending_intent_llm"):
|
||||
object.__setattr__(self, "_pending_intent_llm", None)
|
||||
if not hasattr(self, "_streaming_conversation_turn"):
|
||||
object.__setattr__(self, "_streaming_conversation_turn", False)
|
||||
|
||||
def _create_default_extension_state(self) -> ConversationState | None:
|
||||
initial_state_t = getattr(self, "_initial_state_t", None)
|
||||
@@ -1055,6 +1158,21 @@ class _ConversationalMixin:
|
||||
return llm
|
||||
raise ValueError(f"Invalid llm type: {type(llm)}. Expected str or BaseLLM.")
|
||||
|
||||
@contextmanager
|
||||
def _conversation_streaming_enabled(self, llm: Any) -> Iterator[None]:
|
||||
if not getattr(self, "_streaming_conversation_turn", False) or not hasattr(
|
||||
llm, "stream"
|
||||
):
|
||||
yield
|
||||
return
|
||||
|
||||
original_stream = llm.stream
|
||||
try:
|
||||
llm.stream = True
|
||||
yield
|
||||
finally:
|
||||
llm.stream = original_stream
|
||||
|
||||
def finalize_session_traces(self) -> None:
|
||||
"""Emit a final ``FlowFinishedEvent`` and finalize the trace batch.
|
||||
|
||||
|
||||
@@ -9,11 +9,7 @@ Structure (see ``flow_definition``) and executed here.
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import (
|
||||
Callable,
|
||||
Iterator,
|
||||
Sequence,
|
||||
)
|
||||
from collections.abc import Callable, Iterator, Sequence
|
||||
from concurrent.futures import Future, ThreadPoolExecutor
|
||||
import contextvars
|
||||
import copy
|
||||
@@ -140,17 +136,16 @@ if TYPE_CHECKING:
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
from crewai.flow.visualization import build_flow_structure, render_interactive
|
||||
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
|
||||
from crewai.types.streaming import (
|
||||
AsyncStreamSession,
|
||||
StreamSession,
|
||||
)
|
||||
from crewai.types.usage_metrics import UsageMetrics
|
||||
from crewai.utilities.env import get_env_context
|
||||
from crewai.utilities.streaming import (
|
||||
TaskInfo,
|
||||
create_async_chunk_generator,
|
||||
create_chunk_generator,
|
||||
create_streaming_state,
|
||||
register_cleanup,
|
||||
signal_end,
|
||||
signal_error,
|
||||
create_async_frame_generator,
|
||||
create_frame_generator,
|
||||
create_frame_streaming_state,
|
||||
)
|
||||
|
||||
|
||||
@@ -1832,13 +1827,94 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
if hasattr(self._state, key):
|
||||
object.__setattr__(self._state, key, value)
|
||||
|
||||
def stream_events(
|
||||
self,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
input_files: dict[str, FileInput] | None = None,
|
||||
from_checkpoint: CheckpointConfig | None = None,
|
||||
restore_from_state_id: str | None = None,
|
||||
) -> StreamSession[Any]:
|
||||
"""Run the flow and stream all scoped public ``StreamFrame`` events."""
|
||||
result_holder: list[Any] = []
|
||||
state = create_frame_streaming_state(result_holder, use_async=False)
|
||||
output_holder: list[StreamSession[Any]] = []
|
||||
|
||||
def run_flow() -> Any:
|
||||
original_stream = self.stream
|
||||
try:
|
||||
self.stream = False
|
||||
return self.kickoff(
|
||||
inputs=inputs,
|
||||
input_files=input_files,
|
||||
from_checkpoint=from_checkpoint,
|
||||
restore_from_state_id=restore_from_state_id,
|
||||
)
|
||||
except HumanFeedbackPending as e:
|
||||
return e
|
||||
finally:
|
||||
self.stream = original_stream
|
||||
|
||||
stream_session: StreamSession[Any] = StreamSession(
|
||||
sync_iterator=create_frame_generator(state, run_flow, output_holder)
|
||||
)
|
||||
output_holder.append(stream_session)
|
||||
return stream_session
|
||||
|
||||
def stream_frames(
|
||||
self,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
input_files: dict[str, FileInput] | None = None,
|
||||
from_checkpoint: CheckpointConfig | None = None,
|
||||
restore_from_state_id: str | None = None,
|
||||
) -> StreamSession[Any]:
|
||||
"""Alias for :meth:`stream_events`."""
|
||||
return self.stream_events(
|
||||
inputs=inputs,
|
||||
input_files=input_files,
|
||||
from_checkpoint=from_checkpoint,
|
||||
restore_from_state_id=restore_from_state_id,
|
||||
)
|
||||
|
||||
def astream(
|
||||
self,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
input_files: dict[str, FileInput] | None = None,
|
||||
from_checkpoint: CheckpointConfig | None = None,
|
||||
restore_from_state_id: str | None = None,
|
||||
) -> AsyncStreamSession[Any]:
|
||||
"""Run the flow asynchronously and stream scoped public frames."""
|
||||
result_holder: list[Any] = []
|
||||
state = create_frame_streaming_state(result_holder, use_async=True)
|
||||
output_holder: list[AsyncStreamSession[Any]] = []
|
||||
|
||||
async def run_flow() -> Any:
|
||||
original_stream = self.stream
|
||||
try:
|
||||
self.stream = False
|
||||
return await self.kickoff_async(
|
||||
inputs=inputs,
|
||||
input_files=input_files,
|
||||
from_checkpoint=from_checkpoint,
|
||||
restore_from_state_id=restore_from_state_id,
|
||||
)
|
||||
except HumanFeedbackPending as e:
|
||||
return e
|
||||
finally:
|
||||
self.stream = original_stream
|
||||
|
||||
stream_session: AsyncStreamSession[Any] = AsyncStreamSession(
|
||||
async_iterator=create_async_frame_generator(state, run_flow, output_holder)
|
||||
)
|
||||
output_holder.append(stream_session)
|
||||
return stream_session
|
||||
|
||||
def kickoff(
|
||||
self,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
input_files: dict[str, FileInput] | None = None,
|
||||
from_checkpoint: CheckpointConfig | None = None,
|
||||
restore_from_state_id: str | None = None,
|
||||
) -> Any | FlowStreamingOutput:
|
||||
) -> Any | StreamSession[Any]:
|
||||
"""Start the flow execution in a synchronous context.
|
||||
|
||||
This method wraps kickoff_async so that all state initialization and event
|
||||
@@ -1859,7 +1935,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
``from_checkpoint``; passing both raises ``ValueError``.
|
||||
|
||||
Returns:
|
||||
The final output from the flow or FlowStreamingOutput if streaming.
|
||||
The final output from the flow or StreamSession if streaming.
|
||||
"""
|
||||
if from_checkpoint is not None and restore_from_state_id is not None:
|
||||
raise ValueError(
|
||||
@@ -1871,46 +1947,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
if restored is not None:
|
||||
return restored.kickoff(inputs=inputs, input_files=input_files)
|
||||
if self.stream:
|
||||
result_holder: list[Any] = []
|
||||
current_task_info: TaskInfo = {
|
||||
"index": 0,
|
||||
"name": "",
|
||||
"id": "",
|
||||
"agent_role": "",
|
||||
"agent_id": "",
|
||||
}
|
||||
|
||||
state = create_streaming_state(
|
||||
current_task_info, result_holder, use_async=False
|
||||
return self.stream_events(
|
||||
inputs=inputs,
|
||||
input_files=input_files,
|
||||
from_checkpoint=from_checkpoint,
|
||||
restore_from_state_id=restore_from_state_id,
|
||||
)
|
||||
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
|
||||
|
||||
def run_flow() -> None:
|
||||
try:
|
||||
self.stream = False
|
||||
result = self.kickoff(
|
||||
inputs=inputs,
|
||||
input_files=input_files,
|
||||
restore_from_state_id=restore_from_state_id,
|
||||
)
|
||||
result_holder.append(result)
|
||||
except Exception as e:
|
||||
# HumanFeedbackPending is expected control flow, not an error
|
||||
if isinstance(e, HumanFeedbackPending):
|
||||
result_holder.append(e)
|
||||
else:
|
||||
signal_error(state, e)
|
||||
finally:
|
||||
self.stream = True
|
||||
signal_end(state)
|
||||
|
||||
streaming_output = FlowStreamingOutput(
|
||||
sync_iterator=create_chunk_generator(state, run_flow, output_holder)
|
||||
)
|
||||
register_cleanup(streaming_output, state)
|
||||
output_holder.append(streaming_output)
|
||||
|
||||
return streaming_output
|
||||
|
||||
async def _run_flow() -> Any:
|
||||
return await self.kickoff_async(
|
||||
@@ -1937,7 +1979,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
input_files: dict[str, FileInput] | None = None,
|
||||
from_checkpoint: CheckpointConfig | None = None,
|
||||
restore_from_state_id: str | None = None,
|
||||
) -> Any | FlowStreamingOutput:
|
||||
) -> Any | AsyncStreamSession[Any]:
|
||||
"""Start the flow execution asynchronously.
|
||||
|
||||
This method performs state restoration (if an 'id' is provided and persistence is available)
|
||||
@@ -1971,48 +2013,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
if restored is not None:
|
||||
return await restored.kickoff_async(inputs=inputs, input_files=input_files)
|
||||
if self.stream:
|
||||
result_holder: list[Any] = []
|
||||
current_task_info: TaskInfo = {
|
||||
"index": 0,
|
||||
"name": "",
|
||||
"id": "",
|
||||
"agent_role": "",
|
||||
"agent_id": "",
|
||||
}
|
||||
|
||||
state = create_streaming_state(
|
||||
current_task_info, result_holder, use_async=True
|
||||
return self.astream(
|
||||
inputs=inputs,
|
||||
input_files=input_files,
|
||||
from_checkpoint=from_checkpoint,
|
||||
restore_from_state_id=restore_from_state_id,
|
||||
)
|
||||
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
|
||||
|
||||
async def run_flow() -> None:
|
||||
try:
|
||||
self.stream = False
|
||||
result = await self.kickoff_async(
|
||||
inputs=inputs,
|
||||
input_files=input_files,
|
||||
restore_from_state_id=restore_from_state_id,
|
||||
)
|
||||
result_holder.append(result)
|
||||
except Exception as e:
|
||||
# HumanFeedbackPending is expected control flow, not an error
|
||||
if isinstance(e, HumanFeedbackPending):
|
||||
result_holder.append(e)
|
||||
else:
|
||||
signal_error(state, e, is_async=True)
|
||||
finally:
|
||||
self.stream = True
|
||||
signal_end(state, is_async=True)
|
||||
|
||||
streaming_output = FlowStreamingOutput(
|
||||
async_iterator=create_async_chunk_generator(
|
||||
state, run_flow, output_holder
|
||||
)
|
||||
)
|
||||
register_cleanup(streaming_output, state)
|
||||
output_holder.append(streaming_output)
|
||||
|
||||
return streaming_output
|
||||
|
||||
ctx = baggage.set_baggage("flow_inputs", inputs or {})
|
||||
ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx)
|
||||
@@ -2356,7 +2362,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
input_files: dict[str, FileInput] | None = None,
|
||||
from_checkpoint: CheckpointConfig | None = None,
|
||||
restore_from_state_id: str | None = None,
|
||||
) -> Any | FlowStreamingOutput:
|
||||
) -> Any | AsyncStreamSession[Any]:
|
||||
"""Native async method to start the flow execution. Alias for kickoff_async.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -749,7 +749,7 @@ class LLM(BaseLLM):
|
||||
"base_url": self.base_url,
|
||||
"api_version": self.api_version,
|
||||
"api_key": self.api_key,
|
||||
"stream": self.stream,
|
||||
"stream": self._effective_stream(),
|
||||
"tools": tools,
|
||||
"reasoning_effort": self.reasoning_effort,
|
||||
**self.additional_params,
|
||||
@@ -1841,7 +1841,7 @@ class LLM(BaseLLM):
|
||||
self.set_callbacks(callbacks)
|
||||
try:
|
||||
params = self._prepare_completion_params(messages, tools)
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
result = self._handle_streaming_response(
|
||||
params=params,
|
||||
callbacks=callbacks,
|
||||
@@ -1983,7 +1983,7 @@ class LLM(BaseLLM):
|
||||
messages, tools, skip_file_processing=True
|
||||
)
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return await self._ahandle_streaming_response(
|
||||
params=params,
|
||||
callbacks=callbacks,
|
||||
|
||||
@@ -42,8 +42,13 @@ from crewai.events.types.tool_usage_events import (
|
||||
ToolUsageFinishedEvent,
|
||||
ToolUsageStartedEvent,
|
||||
)
|
||||
from crewai.types.streaming import StreamSession
|
||||
from crewai.types.usage_metrics import UsageMetrics
|
||||
from crewai.utilities.pydantic_schema_utils import serialize_model_class
|
||||
from crewai.utilities.streaming import (
|
||||
create_frame_generator,
|
||||
create_frame_streaming_state,
|
||||
)
|
||||
|
||||
|
||||
try:
|
||||
@@ -77,6 +82,9 @@ _current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
|
||||
_call_stop_override_var: contextvars.ContextVar[dict[int, list[str]] | None] = (
|
||||
contextvars.ContextVar("_call_stop_override_var", default=None)
|
||||
)
|
||||
_call_stream_override_var: contextvars.ContextVar[dict[int, bool] | None] = (
|
||||
contextvars.ContextVar("_call_stream_override_var", default=None)
|
||||
)
|
||||
|
||||
|
||||
@contextmanager
|
||||
@@ -115,6 +123,19 @@ def call_stop_override(
|
||||
_call_stop_override_var.reset(token)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def call_stream_override(llm: BaseLLM, stream: bool) -> Generator[None, None, None]:
|
||||
"""Override streaming for ``llm`` within the current call scope."""
|
||||
current = _call_stream_override_var.get()
|
||||
new_overrides: dict[int, bool] = dict(current) if current else {}
|
||||
new_overrides[id(llm)] = stream
|
||||
token = _call_stream_override_var.set(new_overrides)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
_call_stream_override_var.reset(token)
|
||||
|
||||
|
||||
def get_current_call_id() -> str:
|
||||
"""Get current call_id from context"""
|
||||
call_id = _current_call_id.get()
|
||||
@@ -213,6 +234,13 @@ class BaseLLM(BaseModel, ABC):
|
||||
return override
|
||||
return self.stop
|
||||
|
||||
def _effective_stream(self) -> bool | None:
|
||||
"""Return the call-scoped streaming mode for this instance."""
|
||||
overrides = _call_stream_override_var.get()
|
||||
if overrides is not None and id(self) in overrides:
|
||||
return overrides[id(self)]
|
||||
return self.stream
|
||||
|
||||
_token_usage: dict[str, int] = PrivateAttr(
|
||||
default_factory=lambda: {
|
||||
"total_tokens": 0,
|
||||
@@ -318,6 +346,39 @@ class BaseLLM(BaseModel, ABC):
|
||||
RuntimeError: If the LLM request fails for other reasons.
|
||||
"""
|
||||
|
||||
def stream_events(
|
||||
self,
|
||||
messages: str | list[LLMMessage],
|
||||
tools: list[dict[str, BaseTool]] | None = None,
|
||||
callbacks: list[Any] | None = None,
|
||||
available_functions: dict[str, Any] | None = None,
|
||||
from_task: Task | None = None,
|
||||
from_agent: BaseAgent | None = None,
|
||||
response_model: type[BaseModel] | None = None,
|
||||
) -> StreamSession[Any]:
|
||||
"""Run the LLM call and stream scoped public ``StreamFrame`` events."""
|
||||
result_holder: list[Any] = []
|
||||
state = create_frame_streaming_state(result_holder, use_async=False)
|
||||
output_holder: list[StreamSession[Any]] = []
|
||||
|
||||
def run_llm_call() -> Any:
|
||||
with call_stream_override(self, True):
|
||||
return self.call(
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
callbacks=callbacks,
|
||||
available_functions=available_functions,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
response_model=response_model,
|
||||
)
|
||||
|
||||
stream_session: StreamSession[Any] = StreamSession(
|
||||
sync_iterator=create_frame_generator(state, run_llm_call, output_holder)
|
||||
)
|
||||
output_holder.append(stream_session)
|
||||
return stream_session
|
||||
|
||||
async def acall(
|
||||
self,
|
||||
messages: str | list[LLMMessage],
|
||||
@@ -509,7 +570,7 @@ class BaseLLM(BaseModel, ABC):
|
||||
if max_tokens is None:
|
||||
max_tokens = self._effective_max_tokens()
|
||||
if stream is None:
|
||||
stream = self.stream
|
||||
stream = self._effective_stream()
|
||||
if seed is None:
|
||||
seed = self.seed
|
||||
if stop_sequences is None:
|
||||
|
||||
@@ -323,7 +323,7 @@ class AnthropicCompletion(BaseLLM):
|
||||
|
||||
effective_response_model = response_model or self.response_format
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return self._handle_streaming_completion(
|
||||
completion_params,
|
||||
available_functions,
|
||||
@@ -393,7 +393,7 @@ class AnthropicCompletion(BaseLLM):
|
||||
|
||||
effective_response_model = response_model or self.response_format
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return await self._ahandle_streaming_completion(
|
||||
completion_params,
|
||||
available_functions,
|
||||
@@ -441,7 +441,7 @@ class AnthropicCompletion(BaseLLM):
|
||||
"model": self.model,
|
||||
"messages": messages,
|
||||
"max_tokens": self.max_tokens,
|
||||
"stream": self.stream,
|
||||
"stream": self._effective_stream(),
|
||||
}
|
||||
|
||||
if system_message:
|
||||
|
||||
@@ -42,7 +42,7 @@ try:
|
||||
)
|
||||
|
||||
from crewai.events.types.llm_events import LLMCallType
|
||||
from crewai.llms.base_llm import BaseLLM, llm_call_context
|
||||
from crewai.llms.base_llm import BaseLLM, call_stream_override, llm_call_context
|
||||
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
@@ -493,15 +493,18 @@ class AzureCompletion(BaseLLM):
|
||||
Completion response or tool call result
|
||||
"""
|
||||
if self.api == "responses":
|
||||
return self._responses_delegate.call(
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
callbacks=callbacks,
|
||||
available_functions=available_functions,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
response_model=response_model,
|
||||
)
|
||||
with call_stream_override(
|
||||
self._responses_delegate, bool(self._effective_stream())
|
||||
):
|
||||
return self._responses_delegate.call(
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
callbacks=callbacks,
|
||||
available_functions=available_functions,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
response_model=response_model,
|
||||
)
|
||||
|
||||
with llm_call_context():
|
||||
try:
|
||||
@@ -527,7 +530,7 @@ class AzureCompletion(BaseLLM):
|
||||
formatted_messages, tools, effective_response_model
|
||||
)
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return self._handle_streaming_completion(
|
||||
completion_params,
|
||||
available_functions,
|
||||
@@ -572,15 +575,18 @@ class AzureCompletion(BaseLLM):
|
||||
Completion response or tool call result
|
||||
"""
|
||||
if self.api == "responses":
|
||||
return await self._responses_delegate.acall(
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
callbacks=callbacks,
|
||||
available_functions=available_functions,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
response_model=response_model,
|
||||
)
|
||||
with call_stream_override(
|
||||
self._responses_delegate, bool(self._effective_stream())
|
||||
):
|
||||
return await self._responses_delegate.acall(
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
callbacks=callbacks,
|
||||
available_functions=available_functions,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
response_model=response_model,
|
||||
)
|
||||
|
||||
with llm_call_context():
|
||||
try:
|
||||
@@ -601,7 +607,7 @@ class AzureCompletion(BaseLLM):
|
||||
formatted_messages, tools, effective_response_model
|
||||
)
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return await self._ahandle_streaming_completion(
|
||||
completion_params,
|
||||
available_functions,
|
||||
@@ -639,11 +645,11 @@ class AzureCompletion(BaseLLM):
|
||||
"""
|
||||
params: AzureCompletionParams = {
|
||||
"messages": messages,
|
||||
"stream": self.stream,
|
||||
"stream": bool(self._effective_stream()),
|
||||
}
|
||||
|
||||
model_extras: dict[str, Any] = {}
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
model_extras["stream_options"] = {"include_usage": True}
|
||||
|
||||
if response_model and self.is_openai_model:
|
||||
|
||||
@@ -428,7 +428,7 @@ class BedrockCompletion(BaseLLM):
|
||||
self.additional_model_response_field_paths
|
||||
)
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return self._handle_streaming_converse(
|
||||
formatted_messages,
|
||||
body,
|
||||
@@ -556,7 +556,7 @@ class BedrockCompletion(BaseLLM):
|
||||
self.additional_model_response_field_paths
|
||||
)
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return await self._ahandle_streaming_converse(
|
||||
formatted_messages,
|
||||
body,
|
||||
|
||||
@@ -322,7 +322,7 @@ class GeminiCompletion(BaseLLM):
|
||||
system_instruction, tools, effective_response_model
|
||||
)
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return self._handle_streaming_completion(
|
||||
formatted_content,
|
||||
config,
|
||||
@@ -401,7 +401,7 @@ class GeminiCompletion(BaseLLM):
|
||||
system_instruction, tools, effective_response_model
|
||||
)
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return await self._ahandle_streaming_completion(
|
||||
formatted_content,
|
||||
config,
|
||||
|
||||
@@ -469,7 +469,7 @@ class OpenAICompletion(BaseLLM):
|
||||
messages=messages, tools=tools
|
||||
)
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return self._handle_streaming_completion(
|
||||
params=completion_params,
|
||||
available_functions=available_functions,
|
||||
@@ -564,7 +564,7 @@ class OpenAICompletion(BaseLLM):
|
||||
messages=messages, tools=tools
|
||||
)
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return await self._ahandle_streaming_completion(
|
||||
params=completion_params,
|
||||
available_functions=available_functions,
|
||||
@@ -595,7 +595,7 @@ class OpenAICompletion(BaseLLM):
|
||||
messages=messages, tools=tools, response_model=response_model
|
||||
)
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return self._handle_streaming_responses(
|
||||
params=params,
|
||||
available_functions=available_functions,
|
||||
@@ -626,7 +626,7 @@ class OpenAICompletion(BaseLLM):
|
||||
messages=messages, tools=tools, response_model=response_model
|
||||
)
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
return await self._ahandle_streaming_responses(
|
||||
params=params,
|
||||
available_functions=available_functions,
|
||||
@@ -685,7 +685,7 @@ class OpenAICompletion(BaseLLM):
|
||||
if instructions:
|
||||
params["instructions"] = instructions
|
||||
|
||||
if self.stream:
|
||||
if self._effective_stream():
|
||||
params["stream"] = True
|
||||
|
||||
if self.store is not None:
|
||||
@@ -1540,8 +1540,8 @@ class OpenAICompletion(BaseLLM):
|
||||
"model": self.model,
|
||||
"messages": messages,
|
||||
}
|
||||
if self.stream:
|
||||
params["stream"] = self.stream
|
||||
if self._effective_stream():
|
||||
params["stream"] = self._effective_stream()
|
||||
params["stream_options"] = {"include_usage": True}
|
||||
|
||||
params.update(self.additional_params)
|
||||
|
||||
@@ -2,9 +2,10 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import AsyncIterator, Callable, Iterator
|
||||
from collections.abc import AsyncIterator, Callable, Iterator, Sequence
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Any, Generic, TypeVar
|
||||
from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from typing_extensions import Self
|
||||
@@ -15,6 +16,262 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
_MISSING = object()
|
||||
|
||||
StreamChannel = Literal[
|
||||
"llm",
|
||||
"flow",
|
||||
"tools",
|
||||
"messages",
|
||||
"lifecycle",
|
||||
"custom",
|
||||
]
|
||||
|
||||
|
||||
class StreamFrame(BaseModel):
|
||||
"""Stable public stream frame emitted by streamable runtimes."""
|
||||
|
||||
id: str = Field(description="Unique frame/event identifier")
|
||||
seq: int | None = Field(default=None, description="Execution-local order")
|
||||
type: str = Field(description="Source event type")
|
||||
channel: StreamChannel = Field(description="High-level stream channel")
|
||||
namespace: list[str] = Field(default_factory=list)
|
||||
timestamp: datetime
|
||||
parent_id: str | None = None
|
||||
previous_id: str | None = None
|
||||
data: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
@property
|
||||
def content(self) -> str:
|
||||
"""Printable text content for chunk-like consumers."""
|
||||
chunk = self.data.get("chunk")
|
||||
if isinstance(chunk, str):
|
||||
return chunk
|
||||
return ""
|
||||
|
||||
@property
|
||||
def event(self) -> dict[str, Any]:
|
||||
"""Structured source event payload."""
|
||||
return self.data
|
||||
|
||||
|
||||
class StreamSessionBase(Generic[T]):
|
||||
"""Base stream session with ordered frame iteration and result access."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
sync_iterator: Iterator[StreamFrame] | None = None,
|
||||
async_iterator: AsyncIterator[StreamFrame] | None = None,
|
||||
) -> None:
|
||||
self._result: T | object = _MISSING
|
||||
self._completed = False
|
||||
self._frames: list[StreamFrame] = []
|
||||
self._error: Exception | None = None
|
||||
self._cancelled = False
|
||||
self._exhausted = False
|
||||
self._on_cleanup: Callable[[], None] | None = None
|
||||
self._sync_iterator = sync_iterator
|
||||
self._async_iterator = async_iterator
|
||||
|
||||
@property
|
||||
def result(self) -> T:
|
||||
"""Return the final result after stream exhaustion or completion."""
|
||||
if not self._completed:
|
||||
raise RuntimeError(
|
||||
"Streaming has not completed yet. "
|
||||
"Iterate over all frames before accessing result."
|
||||
)
|
||||
if self._error is not None:
|
||||
raise self._error
|
||||
if self._result is _MISSING:
|
||||
raise RuntimeError("No result available")
|
||||
return self._result # type: ignore[return-value]
|
||||
|
||||
@property
|
||||
def is_completed(self) -> bool:
|
||||
"""Check if the stream has completed."""
|
||||
return self._completed
|
||||
|
||||
@property
|
||||
def is_cancelled(self) -> bool:
|
||||
"""Check if the stream was cancelled."""
|
||||
return self._cancelled
|
||||
|
||||
@property
|
||||
def is_exhausted(self) -> bool:
|
||||
"""Check if the stream iterator was fully consumed."""
|
||||
return self._exhausted
|
||||
|
||||
@property
|
||||
def frames(self) -> list[StreamFrame]:
|
||||
"""Return collected frames."""
|
||||
return self._frames.copy()
|
||||
|
||||
def _set_result(self, result: T) -> None:
|
||||
self._result = result
|
||||
self._completed = True
|
||||
|
||||
|
||||
class StreamSession(StreamSessionBase[T]):
|
||||
"""Synchronous stream session for ordered public frames."""
|
||||
|
||||
def __enter__(self) -> Self:
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc_info: Any) -> None:
|
||||
if not self._exhausted:
|
||||
self.close()
|
||||
|
||||
@property
|
||||
def events(self) -> Iterator[StreamFrame]:
|
||||
"""Iterate over all ordered frames."""
|
||||
return self.subscribe()
|
||||
|
||||
def __iter__(self) -> Iterator[StreamFrame]:
|
||||
"""Iterate over all ordered frames."""
|
||||
return self.events
|
||||
|
||||
@property
|
||||
def llm(self) -> Iterator[StreamFrame]:
|
||||
"""Iterate over LLM token and thinking frames."""
|
||||
return self.subscribe(channels=["llm"])
|
||||
|
||||
@property
|
||||
def messages(self) -> Iterator[StreamFrame]:
|
||||
"""Iterate over conversation message frames."""
|
||||
return self.subscribe(channels=["messages"])
|
||||
|
||||
@property
|
||||
def flow(self) -> Iterator[StreamFrame]:
|
||||
"""Iterate over Flow lifecycle and method frames."""
|
||||
return self.subscribe(channels=["flow"])
|
||||
|
||||
@property
|
||||
def tools(self) -> Iterator[StreamFrame]:
|
||||
"""Iterate over tool execution frames."""
|
||||
return self.subscribe(channels=["tools"])
|
||||
|
||||
def interleave(self, channels: Sequence[StreamChannel]) -> Iterator[StreamFrame]:
|
||||
"""Iterate over selected channels while preserving global order."""
|
||||
return self.subscribe(channels=channels)
|
||||
|
||||
def subscribe(
|
||||
self, channels: Sequence[StreamChannel] | None = None
|
||||
) -> Iterator[StreamFrame]:
|
||||
"""Iterate over frames, optionally filtered by channel."""
|
||||
selected = set(channels) if channels is not None else None
|
||||
if self._exhausted:
|
||||
for frame in self._frames:
|
||||
if selected is None or frame.channel in selected:
|
||||
yield frame
|
||||
return
|
||||
if self._sync_iterator is None:
|
||||
raise RuntimeError("Sync iterator not available")
|
||||
try:
|
||||
for frame in self._sync_iterator:
|
||||
self._frames.append(frame)
|
||||
if selected is None or frame.channel in selected:
|
||||
yield frame
|
||||
self._exhausted = True
|
||||
except Exception as e:
|
||||
self._error = e
|
||||
raise
|
||||
finally:
|
||||
self._completed = True
|
||||
|
||||
def close(self) -> None:
|
||||
"""Cancel streaming and clean up resources."""
|
||||
if self._cancelled or self._exhausted or self._error is not None:
|
||||
return
|
||||
self._cancelled = True
|
||||
self._completed = True
|
||||
if self._sync_iterator is not None and hasattr(self._sync_iterator, "close"):
|
||||
self._sync_iterator.close()
|
||||
if self._on_cleanup is not None:
|
||||
self._on_cleanup()
|
||||
self._on_cleanup = None
|
||||
|
||||
|
||||
class AsyncStreamSession(StreamSessionBase[T]):
|
||||
"""Asynchronous stream session for ordered public frames."""
|
||||
|
||||
async def __aenter__(self) -> Self:
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *exc_info: Any) -> None:
|
||||
if not self._exhausted:
|
||||
await self.aclose()
|
||||
|
||||
@property
|
||||
def events(self) -> AsyncIterator[StreamFrame]:
|
||||
"""Iterate over all ordered frames."""
|
||||
return self.subscribe()
|
||||
|
||||
def __aiter__(self) -> AsyncIterator[StreamFrame]:
|
||||
"""Iterate over all ordered frames."""
|
||||
return self.events
|
||||
|
||||
@property
|
||||
def llm(self) -> AsyncIterator[StreamFrame]:
|
||||
"""Iterate over LLM token and thinking frames."""
|
||||
return self.subscribe(channels=["llm"])
|
||||
|
||||
@property
|
||||
def messages(self) -> AsyncIterator[StreamFrame]:
|
||||
"""Iterate over conversation message frames."""
|
||||
return self.subscribe(channels=["messages"])
|
||||
|
||||
@property
|
||||
def flow(self) -> AsyncIterator[StreamFrame]:
|
||||
"""Iterate over Flow lifecycle and method frames."""
|
||||
return self.subscribe(channels=["flow"])
|
||||
|
||||
@property
|
||||
def tools(self) -> AsyncIterator[StreamFrame]:
|
||||
"""Iterate over tool execution frames."""
|
||||
return self.subscribe(channels=["tools"])
|
||||
|
||||
def interleave(
|
||||
self, channels: Sequence[StreamChannel]
|
||||
) -> AsyncIterator[StreamFrame]:
|
||||
"""Iterate over selected channels while preserving global order."""
|
||||
return self.subscribe(channels=channels)
|
||||
|
||||
async def subscribe(
|
||||
self, channels: Sequence[StreamChannel] | None = None
|
||||
) -> AsyncIterator[StreamFrame]:
|
||||
"""Iterate over frames, optionally filtered by channel."""
|
||||
selected = set(channels) if channels is not None else None
|
||||
if self._exhausted:
|
||||
for frame in self._frames:
|
||||
if selected is None or frame.channel in selected:
|
||||
yield frame
|
||||
return
|
||||
if self._async_iterator is None:
|
||||
raise RuntimeError("Async iterator not available")
|
||||
try:
|
||||
async for frame in self._async_iterator:
|
||||
self._frames.append(frame)
|
||||
if selected is None or frame.channel in selected:
|
||||
yield frame
|
||||
self._exhausted = True
|
||||
except Exception as e:
|
||||
self._error = e
|
||||
raise
|
||||
finally:
|
||||
self._completed = True
|
||||
|
||||
async def aclose(self) -> None:
|
||||
"""Cancel streaming and clean up resources."""
|
||||
if self._cancelled or self._exhausted or self._error is not None:
|
||||
return
|
||||
self._cancelled = True
|
||||
self._completed = True
|
||||
if self._async_iterator is not None and hasattr(self._async_iterator, "aclose"):
|
||||
await self._async_iterator.aclose()
|
||||
if self._on_cleanup is not None:
|
||||
self._on_cleanup()
|
||||
self._on_cleanup = None
|
||||
|
||||
|
||||
class StreamChunkType(Enum):
|
||||
|
||||
@@ -6,19 +6,32 @@ import contextvars
|
||||
import logging
|
||||
import queue
|
||||
import threading
|
||||
from typing import Any, NamedTuple
|
||||
from typing import Any, NamedTuple, cast
|
||||
import uuid
|
||||
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from crewai.events.base_events import BaseEvent
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.llm_events import LLMStreamChunkEvent
|
||||
from crewai.events.stream_context import add_stream_sink, reset_stream_sinks
|
||||
from crewai.events.types.flow_events import ConversationMessageAddedEvent, FlowEvent
|
||||
from crewai.events.types.llm_events import (
|
||||
LLMEventBase,
|
||||
LLMStreamChunkEvent,
|
||||
)
|
||||
from crewai.events.types.tool_usage_events import (
|
||||
ToolExecutionErrorEvent,
|
||||
ToolUsageEvent,
|
||||
)
|
||||
from crewai.types.streaming import (
|
||||
AsyncStreamSession,
|
||||
CrewStreamingOutput,
|
||||
FlowStreamingOutput,
|
||||
StreamChannel,
|
||||
StreamChunk,
|
||||
StreamChunkType,
|
||||
StreamFrame,
|
||||
StreamSession,
|
||||
ToolCallChunk,
|
||||
)
|
||||
from crewai.utilities.string_utils import sanitize_tool_name
|
||||
@@ -53,6 +66,16 @@ class StreamingState(NamedTuple):
|
||||
stream_id: str | None = None
|
||||
|
||||
|
||||
class FrameStreamingState(NamedTuple):
|
||||
"""Immutable state for public frame streaming execution."""
|
||||
|
||||
result_holder: list[Any]
|
||||
sync_queue: queue.Queue[StreamFrame | None | Exception]
|
||||
async_queue: asyncio.Queue[StreamFrame | None | Exception] | None
|
||||
loop: asyncio.AbstractEventLoop | None
|
||||
sink: Callable[[Any, BaseEvent], None]
|
||||
|
||||
|
||||
def _extract_tool_call_info(
|
||||
event: LLMStreamChunkEvent,
|
||||
) -> tuple[StreamChunkType, ToolCallChunk | None]:
|
||||
@@ -80,6 +103,46 @@ def _extract_tool_call_info(
|
||||
return StreamChunkType.TEXT, None
|
||||
|
||||
|
||||
def _extract_tool_call_chunk(data: dict[str, Any]) -> ToolCallChunk | None:
|
||||
tool_call = data.get("tool_call")
|
||||
if not isinstance(tool_call, dict):
|
||||
return None
|
||||
function = tool_call.get("function")
|
||||
if not isinstance(function, dict):
|
||||
function = {}
|
||||
function_name = function.get("name")
|
||||
return ToolCallChunk(
|
||||
tool_id=tool_call.get("id"),
|
||||
tool_name=sanitize_tool_name(str(function_name)) if function_name else None,
|
||||
arguments=function.get("arguments") or "",
|
||||
index=tool_call.get("index") or 0,
|
||||
)
|
||||
|
||||
|
||||
def stream_frame_to_chunk(frame: StreamFrame) -> StreamChunk | None:
|
||||
"""Project an LLM stream frame into the legacy ``StreamChunk`` shape."""
|
||||
if frame.channel != "llm" or frame.type not in {
|
||||
"llm_stream_chunk",
|
||||
"llm_thinking_chunk",
|
||||
}:
|
||||
return None
|
||||
|
||||
tool_call = _extract_tool_call_chunk(frame.data)
|
||||
chunk_type = (
|
||||
StreamChunkType.TOOL_CALL if tool_call is not None else StreamChunkType.TEXT
|
||||
)
|
||||
return StreamChunk(
|
||||
content=str(frame.data.get("chunk") or ""),
|
||||
chunk_type=chunk_type,
|
||||
task_index=0,
|
||||
task_name=str(frame.data.get("task_name") or ""),
|
||||
task_id=str(frame.data.get("task_id") or ""),
|
||||
agent_role=str(frame.data.get("agent_role") or ""),
|
||||
agent_id=str(frame.data.get("agent_id") or ""),
|
||||
tool_call=tool_call,
|
||||
)
|
||||
|
||||
|
||||
def _create_stream_chunk(
|
||||
event: LLMStreamChunkEvent,
|
||||
current_task_info: TaskInfo,
|
||||
@@ -107,6 +170,207 @@ def _create_stream_chunk(
|
||||
)
|
||||
|
||||
|
||||
_FRAME_DATA_EXCLUDE = {
|
||||
"timestamp",
|
||||
"type",
|
||||
"event_id",
|
||||
"parent_event_id",
|
||||
"previous_event_id",
|
||||
"emission_sequence",
|
||||
}
|
||||
|
||||
|
||||
def _stream_channel(event: BaseEvent) -> StreamChannel:
|
||||
if isinstance(event, LLMEventBase):
|
||||
return "llm"
|
||||
if isinstance(event, ConversationMessageAddedEvent):
|
||||
return "messages"
|
||||
if isinstance(event, FlowEvent):
|
||||
return "flow"
|
||||
if isinstance(event, ToolUsageEvent | ToolExecutionErrorEvent):
|
||||
return "tools"
|
||||
if "error" in event.type or "failed" in event.type:
|
||||
return "lifecycle"
|
||||
return "custom"
|
||||
|
||||
|
||||
def _stream_namespace(event: BaseEvent, channel: StreamChannel) -> list[str]:
|
||||
namespace: list[str] = [channel]
|
||||
for attr in (
|
||||
"flow_name",
|
||||
"method_name",
|
||||
"session_id",
|
||||
"call_id",
|
||||
"tool_name",
|
||||
"agent_role",
|
||||
"task_name",
|
||||
):
|
||||
value = getattr(event, attr, None)
|
||||
if value is not None:
|
||||
namespace.append(str(value))
|
||||
return namespace
|
||||
|
||||
|
||||
def stream_frame_from_event(event: BaseEvent) -> StreamFrame:
|
||||
"""Convert an internal CrewAI event into the public stream frame contract."""
|
||||
channel = _stream_channel(event)
|
||||
data = event.to_json(exclude=_FRAME_DATA_EXCLUDE)
|
||||
if not isinstance(data, dict):
|
||||
data = {"value": data}
|
||||
return StreamFrame(
|
||||
id=event.event_id,
|
||||
seq=event.emission_sequence,
|
||||
type=event.type,
|
||||
channel=channel,
|
||||
namespace=_stream_namespace(event, channel),
|
||||
timestamp=event.timestamp,
|
||||
parent_id=event.parent_event_id,
|
||||
previous_id=event.previous_event_id,
|
||||
data=cast(dict[str, Any], data),
|
||||
)
|
||||
|
||||
|
||||
def _create_frame_sink(
|
||||
sync_queue: queue.Queue[StreamFrame | None | Exception],
|
||||
async_queue: asyncio.Queue[StreamFrame | None | Exception] | None = None,
|
||||
loop: asyncio.AbstractEventLoop | None = None,
|
||||
) -> Callable[[Any, BaseEvent], None]:
|
||||
def frame_sink(_: Any, event: BaseEvent) -> None:
|
||||
frame = stream_frame_from_event(event)
|
||||
if async_queue is not None and loop is not None:
|
||||
loop.call_soon_threadsafe(async_queue.put_nowait, frame)
|
||||
else:
|
||||
sync_queue.put(frame)
|
||||
|
||||
return frame_sink
|
||||
|
||||
|
||||
def create_frame_streaming_state(
|
||||
result_holder: list[Any],
|
||||
use_async: bool = False,
|
||||
) -> FrameStreamingState:
|
||||
"""Create state for a scoped public frame stream."""
|
||||
sync_queue: queue.Queue[StreamFrame | None | Exception] = queue.Queue()
|
||||
async_queue: asyncio.Queue[StreamFrame | None | Exception] | None = None
|
||||
loop: asyncio.AbstractEventLoop | None = None
|
||||
if use_async:
|
||||
async_queue = asyncio.Queue()
|
||||
loop = asyncio.get_running_loop()
|
||||
sink = _create_frame_sink(sync_queue, async_queue, loop)
|
||||
return FrameStreamingState(
|
||||
result_holder=result_holder,
|
||||
sync_queue=sync_queue,
|
||||
async_queue=async_queue,
|
||||
loop=loop,
|
||||
sink=sink,
|
||||
)
|
||||
|
||||
|
||||
def _signal_frame_end(state: FrameStreamingState, is_async: bool = False) -> None:
|
||||
if is_async and state.async_queue is not None and state.loop is not None:
|
||||
state.loop.call_soon_threadsafe(state.async_queue.put_nowait, None)
|
||||
else:
|
||||
state.sync_queue.put(None)
|
||||
|
||||
|
||||
def _signal_frame_error(
|
||||
state: FrameStreamingState, error: Exception, is_async: bool = False
|
||||
) -> None:
|
||||
if is_async and state.async_queue is not None and state.loop is not None:
|
||||
state.loop.call_soon_threadsafe(state.async_queue.put_nowait, error)
|
||||
else:
|
||||
state.sync_queue.put(error)
|
||||
|
||||
|
||||
def _finalize_frame_streaming(
|
||||
state: FrameStreamingState,
|
||||
stream_session: StreamSession[Any] | AsyncStreamSession[Any],
|
||||
) -> None:
|
||||
stream_session._on_cleanup = None
|
||||
if state.result_holder:
|
||||
stream_session._set_result(state.result_holder[0])
|
||||
|
||||
|
||||
def create_frame_generator(
|
||||
state: FrameStreamingState,
|
||||
run_func: Callable[[], Any],
|
||||
output_holder: list[StreamSession[Any]],
|
||||
) -> Iterator[StreamFrame]:
|
||||
"""Create a scoped synchronous public frame generator."""
|
||||
|
||||
def run_with_sink() -> None:
|
||||
token = add_stream_sink(state.sink)
|
||||
try:
|
||||
result = run_func()
|
||||
state.result_holder.append(result)
|
||||
except Exception as e:
|
||||
_signal_frame_error(state, e)
|
||||
finally:
|
||||
reset_stream_sinks(token)
|
||||
_signal_frame_end(state)
|
||||
|
||||
ctx = contextvars.copy_context()
|
||||
thread = threading.Thread(target=ctx.run, args=(run_with_sink,), daemon=True)
|
||||
thread.start()
|
||||
|
||||
try:
|
||||
while True:
|
||||
item = state.sync_queue.get()
|
||||
if item is None:
|
||||
break
|
||||
if isinstance(item, Exception):
|
||||
raise item
|
||||
yield item
|
||||
finally:
|
||||
thread.join()
|
||||
if output_holder:
|
||||
_finalize_frame_streaming(state, output_holder[0])
|
||||
|
||||
|
||||
async def create_async_frame_generator(
|
||||
state: FrameStreamingState,
|
||||
run_coro: Callable[[], Any],
|
||||
output_holder: list[AsyncStreamSession[Any]],
|
||||
) -> AsyncIterator[StreamFrame]:
|
||||
"""Create a scoped asynchronous public frame generator."""
|
||||
if state.async_queue is None:
|
||||
raise RuntimeError(
|
||||
"Async queue not initialized. Use create_frame_streaming_state(use_async=True)."
|
||||
)
|
||||
|
||||
async def run_with_sink() -> None:
|
||||
token = add_stream_sink(state.sink)
|
||||
try:
|
||||
result = await run_coro()
|
||||
state.result_holder.append(result)
|
||||
except Exception as e:
|
||||
_signal_frame_error(state, e, is_async=True)
|
||||
finally:
|
||||
reset_stream_sinks(token)
|
||||
_signal_frame_end(state, is_async=True)
|
||||
|
||||
task = asyncio.create_task(run_with_sink())
|
||||
try:
|
||||
while True:
|
||||
item = await state.async_queue.get()
|
||||
if item is None:
|
||||
break
|
||||
if isinstance(item, Exception):
|
||||
raise item
|
||||
yield item
|
||||
finally:
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception:
|
||||
logger.debug("Background frame streaming task failed", exc_info=True)
|
||||
if output_holder:
|
||||
_finalize_frame_streaming(state, output_holder[0])
|
||||
|
||||
|
||||
def _create_stream_handler(
|
||||
current_task_info: TaskInfo,
|
||||
sync_queue: queue.Queue[StreamChunk | None | Exception],
|
||||
|
||||
@@ -21,7 +21,7 @@ from crewai.events.types.flow_events import (
|
||||
MethodExecutionFinishedEvent,
|
||||
MethodExecutionStartedEvent,
|
||||
)
|
||||
from crewai.events.types.llm_events import LLMCallStartedEvent
|
||||
from crewai.events.types.llm_events import LLMCallStartedEvent, LLMStreamChunkEvent
|
||||
from crewai.experimental import (
|
||||
ConversationConfig,
|
||||
ConversationMessage,
|
||||
@@ -29,6 +29,7 @@ from crewai.experimental import (
|
||||
RouterConfig,
|
||||
)
|
||||
from crewai.flow import Flow, ChatState, listen, start
|
||||
from crewai.flow.async_feedback import HumanFeedbackPending, PendingFeedbackContext
|
||||
from crewai.flow.flow_context import (
|
||||
current_flow_defer_trace_finalization,
|
||||
current_flow_id,
|
||||
@@ -137,6 +138,111 @@ class TestClassifyIntent:
|
||||
|
||||
|
||||
class TestConversationalFlow:
|
||||
def test_stream_turn_emits_ordered_conversation_frames(self) -> None:
|
||||
flow = ConversationalFlow()
|
||||
flow.stream = True
|
||||
stream_values_seen_by_kickoff: list[bool] = []
|
||||
|
||||
def kickoff_side_effect(*_: Any, **__: Any) -> str:
|
||||
stream_values_seen_by_kickoff.append(flow.stream)
|
||||
crewai_event_bus.emit(
|
||||
flow,
|
||||
LLMStreamChunkEvent(
|
||||
type="llm_stream_chunk",
|
||||
chunk="pong",
|
||||
call_id="call-1",
|
||||
),
|
||||
)
|
||||
return "pong"
|
||||
|
||||
with patch.object(flow, "kickoff", side_effect=kickoff_side_effect):
|
||||
stream = flow.stream_turn("ping", session_id="session-1")
|
||||
|
||||
with pytest.raises(RuntimeError, match="Streaming has not completed yet"):
|
||||
_ = stream.result
|
||||
|
||||
frames = list(stream.events)
|
||||
|
||||
assert stream.result == "pong"
|
||||
assert stream_values_seen_by_kickoff == [False]
|
||||
assert flow.stream is True
|
||||
assert [frame.seq for frame in frames] == sorted(frame.seq for frame in frames)
|
||||
assert [frame.type for frame in frames] == [
|
||||
"conversation_turn_started",
|
||||
"llm_stream_chunk",
|
||||
"conversation_message_added",
|
||||
"conversation_turn_completed",
|
||||
]
|
||||
assert [frame.channel for frame in frames] == [
|
||||
"flow",
|
||||
"llm",
|
||||
"messages",
|
||||
"flow",
|
||||
]
|
||||
assert frames[1].data["chunk"] == "pong"
|
||||
assert flow.state.messages[-1].content == "pong"
|
||||
|
||||
def test_stream_turn_enables_streaming_on_conversation_llm(self) -> None:
|
||||
class FakeLLM:
|
||||
stream = False
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.stream_values: list[bool] = []
|
||||
|
||||
def call(self, *, messages: list[dict[str, Any]]) -> str:
|
||||
self.stream_values.append(self.stream)
|
||||
for chunk in ("po", "ng"):
|
||||
crewai_event_bus.emit(
|
||||
flow,
|
||||
LLMStreamChunkEvent(
|
||||
type="llm_stream_chunk",
|
||||
chunk=chunk,
|
||||
call_id="call-1",
|
||||
),
|
||||
)
|
||||
return "pong"
|
||||
|
||||
llm = FakeLLM()
|
||||
|
||||
@ConversationConfig(llm=llm)
|
||||
class StreamingChatFlow(ConversationalFlow):
|
||||
pass
|
||||
|
||||
flow = StreamingChatFlow()
|
||||
stream = flow.stream_turn("ping", session_id="session-1")
|
||||
frames = list(stream.events)
|
||||
|
||||
assert stream.result == "pong"
|
||||
assert llm.stream_values == [True]
|
||||
assert llm.stream is False
|
||||
assert [
|
||||
frame.data["chunk"]
|
||||
for frame in frames
|
||||
if frame.type == "llm_stream_chunk"
|
||||
] == ["po", "ng"]
|
||||
|
||||
def test_stream_turn_returns_pending_feedback_without_failure_event(self) -> None:
|
||||
flow = ConversationalFlow()
|
||||
pending = HumanFeedbackPending(
|
||||
context=PendingFeedbackContext(
|
||||
flow_id="session-1",
|
||||
flow_class="tests.PendingFeedbackFlow",
|
||||
method_name="review",
|
||||
method_output="draft",
|
||||
message="Please review",
|
||||
)
|
||||
)
|
||||
|
||||
def kickoff_side_effect(*_: Any, **__: Any) -> None:
|
||||
raise pending
|
||||
|
||||
with patch.object(flow, "kickoff", side_effect=kickoff_side_effect):
|
||||
stream = flow.stream_turn("review this", session_id="session-1")
|
||||
frames = list(stream.events)
|
||||
|
||||
assert stream.result is pending
|
||||
assert [frame.type for frame in frames] == ["conversation_turn_started"]
|
||||
|
||||
def test_deferred_multi_turn_emits_single_flow_finished(self) -> None:
|
||||
"""A deferred multi-turn session lands as one trace: exactly one
|
||||
``FlowFinishedEvent`` is emitted at ``finalize_session_traces()``, not
|
||||
|
||||
@@ -29,7 +29,7 @@ from crewai.flow.persistence.base import FlowPersistence
|
||||
from crewai.flow.runtime._actions import FlowScriptExecutionDisabledError
|
||||
from crewai.state.checkpoint_config import CheckpointConfig
|
||||
from crewai.tools import BaseTool
|
||||
from crewai.types.streaming import FlowStreamingOutput
|
||||
from crewai.types.streaming import StreamSession
|
||||
|
||||
|
||||
class StaticSearchTool(BaseTool):
|
||||
@@ -2485,7 +2485,7 @@ def test_config_max_method_calls_from_declaration():
|
||||
def test_config_stream_from_declaration():
|
||||
flow = Flow.from_declaration(contents=STREAMING_CHAIN_YAML)
|
||||
streaming = flow.kickoff()
|
||||
assert isinstance(streaming, FlowStreamingOutput)
|
||||
assert isinstance(streaming, StreamSession)
|
||||
for _ in streaming:
|
||||
pass
|
||||
assert streaming.result == "confirmed:True"
|
||||
|
||||
300
lib/crewai/tests/test_stream_frames.py
Normal file
300
lib/crewai/tests/test_stream_frames.py
Normal file
@@ -0,0 +1,300 @@
|
||||
"""Tests for the public stream frame contract."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import AsyncIterator
|
||||
from typing import Any, ClassVar
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.flow_events import ConversationMessageAddedEvent
|
||||
from crewai.events.types.llm_events import LLMStreamChunkEvent, LLMThinkingChunkEvent
|
||||
from crewai.events.types.tool_usage_events import ToolUsageStartedEvent
|
||||
from crewai.flow.flow import Flow, start
|
||||
from crewai.llms.base_llm import BaseLLM, call_stop_override
|
||||
from crewai.types.streaming import StreamFrame
|
||||
|
||||
|
||||
class FrameFlow(Flow):
|
||||
@start()
|
||||
def run(self) -> str:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
LLMStreamChunkEvent(
|
||||
type="llm_stream_chunk",
|
||||
chunk="hello",
|
||||
call_id="call-1",
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
LLMThinkingChunkEvent(
|
||||
type="llm_thinking_chunk",
|
||||
chunk="thinking",
|
||||
call_id="call-1",
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
ConversationMessageAddedEvent(
|
||||
type="conversation_message_added",
|
||||
flow_name=self._definition.name,
|
||||
session_id="session-1",
|
||||
role="assistant",
|
||||
content="hello",
|
||||
message_index=0,
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
ToolUsageStartedEvent(
|
||||
type="tool_usage_started",
|
||||
tool_name="search",
|
||||
tool_args={"query": "crew"},
|
||||
),
|
||||
)
|
||||
return "done"
|
||||
|
||||
|
||||
class DirectStreamingLLM(BaseLLM):
|
||||
call_stream_values: ClassVar[list[bool | None]] = []
|
||||
raw_stream_values: ClassVar[list[bool | None]] = []
|
||||
call_instance_ids: ClassVar[list[int]] = []
|
||||
call_stop_values: ClassVar[list[list[str]]] = []
|
||||
|
||||
def call(self, messages: Any, *args: Any, **kwargs: Any) -> str:
|
||||
self.call_stream_values.append(self._effective_stream())
|
||||
self.raw_stream_values.append(self.stream)
|
||||
self.call_instance_ids.append(id(self))
|
||||
self.call_stop_values.append(list(self.stop_sequences))
|
||||
self._track_token_usage_internal(
|
||||
{
|
||||
"prompt_tokens": 1,
|
||||
"completion_tokens": 2,
|
||||
"total_tokens": 3,
|
||||
}
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
LLMStreamChunkEvent(
|
||||
type="llm_stream_chunk",
|
||||
chunk="hel",
|
||||
call_id="call-1",
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
LLMStreamChunkEvent(
|
||||
type="llm_stream_chunk",
|
||||
chunk="lo",
|
||||
call_id="call-1",
|
||||
),
|
||||
)
|
||||
return "hello"
|
||||
|
||||
|
||||
def test_stream_frame_contract_and_ordering() -> None:
|
||||
stream = FrameFlow().stream_events()
|
||||
|
||||
with pytest.raises(RuntimeError, match="Streaming has not completed yet"):
|
||||
_ = stream.result
|
||||
|
||||
with stream:
|
||||
frames = list(stream.events)
|
||||
|
||||
assert stream.result == "done"
|
||||
assert all(isinstance(frame, StreamFrame) for frame in frames)
|
||||
assert [frame.seq for frame in frames] == sorted(frame.seq for frame in frames)
|
||||
|
||||
by_type = {frame.type: frame for frame in frames}
|
||||
assert by_type["flow_started"].channel == "flow"
|
||||
assert by_type["method_execution_started"].parent_id == by_type["flow_started"].id
|
||||
assert by_type["llm_stream_chunk"].channel == "llm"
|
||||
assert by_type["llm_thinking_chunk"].channel == "llm"
|
||||
assert by_type["conversation_message_added"].channel == "messages"
|
||||
assert by_type["tool_usage_started"].channel == "tools"
|
||||
assert "FrameFlow" in by_type["method_execution_started"].namespace
|
||||
assert "run" in by_type["method_execution_started"].namespace
|
||||
|
||||
|
||||
def test_stream_subscribe_filters_channels_without_losing_order() -> None:
|
||||
with FrameFlow().stream_events() as stream:
|
||||
frames = list(stream.interleave(["messages", "tools"]))
|
||||
|
||||
assert [frame.channel for frame in frames] == ["messages", "tools"]
|
||||
assert [frame.seq for frame in frames] == sorted(frame.seq for frame in frames)
|
||||
assert stream.result == "done"
|
||||
|
||||
|
||||
def test_stream_projections_replay_cached_frames_after_exhaustion() -> None:
|
||||
with FrameFlow().stream_events() as stream:
|
||||
all_frames = list(stream.events)
|
||||
|
||||
assert [frame.content for frame in stream.llm if frame.content] == [
|
||||
"hello",
|
||||
"thinking",
|
||||
]
|
||||
assert [frame.type for frame in stream.tools] == ["tool_usage_started"]
|
||||
assert list(stream.events) == all_frames
|
||||
|
||||
|
||||
def test_stream_channel_projection_can_be_followed_by_cached_projection() -> None:
|
||||
with FrameFlow().stream_events() as stream:
|
||||
llm_frames = list(stream.llm)
|
||||
|
||||
assert [frame.content for frame in llm_frames if frame.content] == [
|
||||
"hello",
|
||||
"thinking",
|
||||
]
|
||||
assert [frame.type for frame in stream.flow] == [
|
||||
"flow_started",
|
||||
"method_execution_started",
|
||||
"method_execution_finished",
|
||||
"flow_finished",
|
||||
]
|
||||
|
||||
|
||||
def test_stream_errors_surface_after_failed_frame() -> None:
|
||||
class ErrorFlow(Flow):
|
||||
@start()
|
||||
def run(self) -> str:
|
||||
raise ValueError("boom")
|
||||
|
||||
stream = ErrorFlow().stream_events()
|
||||
|
||||
with pytest.raises(ValueError, match="boom"):
|
||||
list(stream.events)
|
||||
|
||||
assert any(frame.type == "method_execution_failed" for frame in stream.frames)
|
||||
with pytest.raises(ValueError, match="boom"):
|
||||
_ = stream.result
|
||||
|
||||
|
||||
def test_flow_streaming_returns_iterable_frame_session() -> None:
|
||||
flow = FrameFlow()
|
||||
flow.stream = True
|
||||
|
||||
stream = flow.kickoff()
|
||||
|
||||
with stream:
|
||||
frames = list(stream)
|
||||
|
||||
assert all(isinstance(frame, StreamFrame) for frame in frames)
|
||||
assert [frame.content for frame in frames if frame.content] == [
|
||||
"hello",
|
||||
"thinking",
|
||||
]
|
||||
first_content_frame = next(frame for frame in frames if frame.content)
|
||||
assert first_content_frame.event["chunk"] == "hello"
|
||||
assert stream.result == "done"
|
||||
|
||||
|
||||
def test_direct_llm_stream_events_scope_and_restore_stream_flag() -> None:
|
||||
DirectStreamingLLM.call_stream_values = []
|
||||
DirectStreamingLLM.raw_stream_values = []
|
||||
DirectStreamingLLM.call_instance_ids = []
|
||||
DirectStreamingLLM.call_stop_values = []
|
||||
llm = DirectStreamingLLM(model="gpt-4o-mini", stream=False)
|
||||
|
||||
with call_stop_override(llm, ["STOP"]):
|
||||
with llm.stream_events("hello") as stream:
|
||||
frames = list(stream)
|
||||
|
||||
assert [frame.content for frame in frames] == ["hel", "lo"]
|
||||
assert frames[0].event["chunk"] == "hel"
|
||||
assert stream.result == "hello"
|
||||
assert llm.stream is False
|
||||
assert DirectStreamingLLM.call_stream_values == [True]
|
||||
assert DirectStreamingLLM.raw_stream_values == [False]
|
||||
assert DirectStreamingLLM.call_instance_ids == [id(llm)]
|
||||
assert DirectStreamingLLM.call_stop_values == [["STOP"]]
|
||||
usage = llm.get_token_usage_summary()
|
||||
assert usage.total_tokens == 3
|
||||
assert usage.prompt_tokens == 1
|
||||
assert usage.completion_tokens == 2
|
||||
assert usage.successful_requests == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_astream_scopes_concurrent_executions() -> None:
|
||||
class ConcurrentFlow(Flow):
|
||||
@start()
|
||||
async def run(self) -> str:
|
||||
label = str(self.state["label"])
|
||||
await asyncio.sleep(0)
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
LLMStreamChunkEvent(
|
||||
type="llm_stream_chunk",
|
||||
chunk=label,
|
||||
call_id=label,
|
||||
),
|
||||
)
|
||||
return label
|
||||
|
||||
async def collect(label: str) -> tuple[str, list[str]]:
|
||||
async with ConcurrentFlow().astream(inputs={"label": label}) as stream:
|
||||
frames = [frame async for frame in stream.llm]
|
||||
return stream.result, [frame.data["chunk"] for frame in frames]
|
||||
|
||||
first, second = await asyncio.gather(collect("first"), collect("second"))
|
||||
|
||||
assert first == ("first", ["first"])
|
||||
assert second == ("second", ["second"])
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_stream_projections_replay_cached_frames_after_exhaustion() -> None:
|
||||
async with FrameFlow().astream() as stream:
|
||||
all_frames = [frame async for frame in stream.events]
|
||||
|
||||
llm_frames = [frame async for frame in stream.llm]
|
||||
tool_frames = [frame async for frame in stream.tools]
|
||||
replayed_frames = [frame async for frame in stream.events]
|
||||
|
||||
assert [frame.content for frame in llm_frames if frame.content] == [
|
||||
"hello",
|
||||
"thinking",
|
||||
]
|
||||
assert [frame.type for frame in tool_frames] == ["tool_usage_started"]
|
||||
assert replayed_frames == all_frames
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_stream_channel_projection_can_be_followed_by_cached_projection() -> None:
|
||||
async with FrameFlow().astream() as stream:
|
||||
llm_frames = [frame async for frame in stream.llm]
|
||||
|
||||
flow_frames = [frame async for frame in stream.flow]
|
||||
|
||||
assert [frame.content for frame in llm_frames if frame.content] == [
|
||||
"hello",
|
||||
"thinking",
|
||||
]
|
||||
assert [frame.type for frame in flow_frames] == [
|
||||
"flow_started",
|
||||
"method_execution_started",
|
||||
"method_execution_finished",
|
||||
"flow_finished",
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_astream_cancellation_cleans_up_task() -> None:
|
||||
class SlowFlow(Flow):
|
||||
@start()
|
||||
async def run(self) -> str:
|
||||
await asyncio.sleep(10)
|
||||
return "too late"
|
||||
|
||||
stream = SlowFlow().astream()
|
||||
events: AsyncIterator[StreamFrame] = stream.events
|
||||
first_frame = await anext(events)
|
||||
|
||||
assert first_frame.type == "flow_started"
|
||||
await stream.aclose()
|
||||
|
||||
assert stream.is_cancelled is True
|
||||
assert stream.is_completed is True
|
||||
@@ -11,11 +11,15 @@ from crewai import Agent, Crew, Task
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.llm_events import LLMStreamChunkEvent, ToolCall, FunctionCall
|
||||
from crewai.flow.flow import Flow, start
|
||||
from crewai.state.checkpoint_config import CheckpointConfig
|
||||
from crewai.types.streaming import (
|
||||
AsyncStreamSession,
|
||||
CrewStreamingOutput,
|
||||
FlowStreamingOutput,
|
||||
StreamChunk,
|
||||
StreamChunkType,
|
||||
StreamFrame,
|
||||
StreamSession,
|
||||
ToolCallChunk,
|
||||
)
|
||||
|
||||
@@ -417,8 +421,8 @@ class TestCrewKickoffStreamingAsync:
|
||||
class TestFlowKickoffStreaming:
|
||||
"""Tests for Flow(stream=True).kickoff() method."""
|
||||
|
||||
def test_kickoff_streaming_returns_streaming_output(self) -> None:
|
||||
"""Test that flow kickoff with stream=True returns FlowStreamingOutput."""
|
||||
def test_kickoff_streaming_returns_stream_session(self) -> None:
|
||||
"""Test that flow kickoff with stream=True returns StreamSession."""
|
||||
|
||||
class SimpleFlow(Flow[dict[str, Any]]):
|
||||
@start()
|
||||
@@ -428,7 +432,7 @@ class TestFlowKickoffStreaming:
|
||||
flow = SimpleFlow()
|
||||
flow.stream = True
|
||||
streaming = flow.kickoff()
|
||||
assert isinstance(streaming, FlowStreamingOutput)
|
||||
assert isinstance(streaming, StreamSession)
|
||||
|
||||
def test_flow_kickoff_streaming_captures_chunks(self) -> None:
|
||||
"""Test that flow streaming captures LLM chunks from crew execution."""
|
||||
@@ -469,7 +473,7 @@ class TestFlowKickoffStreaming:
|
||||
|
||||
with patch.object(Flow, "kickoff", mock_kickoff_fn):
|
||||
streaming = flow.kickoff()
|
||||
assert isinstance(streaming, FlowStreamingOutput)
|
||||
assert isinstance(streaming, StreamSession)
|
||||
chunks = list(streaming)
|
||||
|
||||
assert len(chunks) >= 2
|
||||
@@ -500,19 +504,38 @@ class TestFlowKickoffStreaming:
|
||||
|
||||
with patch.object(Flow, "kickoff", mock_kickoff_fn):
|
||||
streaming = flow.kickoff()
|
||||
assert isinstance(streaming, FlowStreamingOutput)
|
||||
assert isinstance(streaming, StreamSession)
|
||||
_ = list(streaming)
|
||||
|
||||
result = streaming.result
|
||||
assert result == "flow result"
|
||||
|
||||
def test_streaming_kickoff_passes_checkpoint_config_to_stream_events(self) -> None:
|
||||
"""stream=True preserves checkpoint config when routing to stream_events."""
|
||||
|
||||
class TestFlow(Flow[dict[str, Any]]):
|
||||
@start()
|
||||
def generate(self) -> str:
|
||||
return "flow result"
|
||||
|
||||
flow = TestFlow()
|
||||
flow.stream = True
|
||||
checkpoint = CheckpointConfig()
|
||||
|
||||
with patch.object(flow, "stream_events", wraps=flow.stream_events) as spy:
|
||||
streaming = flow.kickoff(from_checkpoint=checkpoint)
|
||||
list(streaming)
|
||||
|
||||
assert spy.call_args.kwargs["from_checkpoint"] is checkpoint
|
||||
assert streaming.result == "flow result"
|
||||
|
||||
|
||||
class TestFlowKickoffStreamingAsync:
|
||||
"""Tests for Flow(stream=True).kickoff_async() method."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_kickoff_streaming_async_returns_streaming_output(self) -> None:
|
||||
"""Test that flow kickoff_async with stream=True returns FlowStreamingOutput."""
|
||||
async def test_kickoff_streaming_async_returns_stream_session(self) -> None:
|
||||
"""Test that flow kickoff_async with stream=True returns AsyncStreamSession."""
|
||||
|
||||
class SimpleFlow(Flow[dict[str, Any]]):
|
||||
@start()
|
||||
@@ -522,7 +545,7 @@ class TestFlowKickoffStreamingAsync:
|
||||
flow = SimpleFlow()
|
||||
flow.stream = True
|
||||
streaming = await flow.kickoff_async()
|
||||
assert isinstance(streaming, FlowStreamingOutput)
|
||||
assert isinstance(streaming, AsyncStreamSession)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_flow_kickoff_streaming_async_captures_chunks(self) -> None:
|
||||
@@ -567,8 +590,8 @@ class TestFlowKickoffStreamingAsync:
|
||||
|
||||
with patch.object(Flow, "kickoff_async", mock_kickoff_fn):
|
||||
streaming = await flow.kickoff_async()
|
||||
assert isinstance(streaming, FlowStreamingOutput)
|
||||
chunks: list[StreamChunk] = []
|
||||
assert isinstance(streaming, AsyncStreamSession)
|
||||
chunks: list[StreamFrame] = []
|
||||
async for chunk in streaming:
|
||||
chunks.append(chunk)
|
||||
|
||||
@@ -601,13 +624,36 @@ class TestFlowKickoffStreamingAsync:
|
||||
|
||||
with patch.object(Flow, "kickoff_async", mock_kickoff_fn):
|
||||
streaming = await flow.kickoff_async()
|
||||
assert isinstance(streaming, FlowStreamingOutput)
|
||||
assert isinstance(streaming, AsyncStreamSession)
|
||||
async for _ in streaming:
|
||||
pass
|
||||
|
||||
result = streaming.result
|
||||
assert result == "async flow result"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_streaming_kickoff_async_passes_checkpoint_config_to_astream(
|
||||
self,
|
||||
) -> None:
|
||||
"""stream=True preserves checkpoint config when routing to astream."""
|
||||
|
||||
class TestFlow(Flow[dict[str, Any]]):
|
||||
@start()
|
||||
async def generate(self) -> str:
|
||||
return "async flow result"
|
||||
|
||||
flow = TestFlow()
|
||||
flow.stream = True
|
||||
checkpoint = CheckpointConfig()
|
||||
|
||||
with patch.object(flow, "astream", wraps=flow.astream) as spy:
|
||||
streaming = await flow.kickoff_async(from_checkpoint=checkpoint)
|
||||
async for _ in streaming:
|
||||
pass
|
||||
|
||||
assert spy.call_args.kwargs["from_checkpoint"] is checkpoint
|
||||
assert streaming.result == "async flow result"
|
||||
|
||||
|
||||
class TestStreamingEdgeCases:
|
||||
"""Tests for edge cases in streaming functionality."""
|
||||
|
||||
@@ -3,8 +3,10 @@
|
||||
import pytest
|
||||
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.llm_events import LLMStreamChunkEvent
|
||||
from crewai.flow.flow import Flow, start
|
||||
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
|
||||
from crewai.types.streaming import AsyncStreamSession, CrewStreamingOutput, StreamSession
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -212,7 +214,7 @@ class TestStreamingFlowIntegration:
|
||||
|
||||
streaming = flow.kickoff()
|
||||
|
||||
assert isinstance(streaming, FlowStreamingOutput)
|
||||
assert isinstance(streaming, StreamSession)
|
||||
|
||||
chunks = []
|
||||
for chunk in streaming:
|
||||
@@ -232,6 +234,14 @@ class TestStreamingFlowIntegration:
|
||||
|
||||
@start()
|
||||
def execute(self) -> str:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
LLMStreamChunkEvent(
|
||||
type="llm_stream_chunk",
|
||||
chunk="Flow result",
|
||||
call_id="call-1",
|
||||
),
|
||||
)
|
||||
return "Flow result"
|
||||
|
||||
flow = SimpleFlow()
|
||||
@@ -241,8 +251,11 @@ class TestStreamingFlowIntegration:
|
||||
pass
|
||||
|
||||
assert streaming.is_completed is True
|
||||
streaming.get_full_text()
|
||||
assert len(streaming.chunks) >= 0
|
||||
content_frames = [frame for frame in streaming.frames if frame.content]
|
||||
full_text = "".join(frame.content for frame in content_frames)
|
||||
assert full_text == "Flow result"
|
||||
assert len(content_frames) == 1
|
||||
assert len(streaming.frames) > 0
|
||||
|
||||
result = streaming.result
|
||||
assert result is not None
|
||||
@@ -281,7 +294,7 @@ class TestStreamingFlowIntegration:
|
||||
|
||||
streaming = await flow.kickoff_async()
|
||||
|
||||
assert isinstance(streaming, FlowStreamingOutput)
|
||||
assert isinstance(streaming, AsyncStreamSession)
|
||||
|
||||
chunks = []
|
||||
async for chunk in streaming:
|
||||
|
||||
@@ -9,6 +9,7 @@ import pytest
|
||||
|
||||
from crewai.events.base_events import BaseEvent
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.stream_context import add_stream_sink, reset_stream_sinks
|
||||
|
||||
|
||||
class AsyncTestEvent(BaseEvent):
|
||||
@@ -53,6 +54,24 @@ async def test_aemit_with_async_handlers():
|
||||
assert received_events[0] == event
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_aemit_publishes_to_active_stream_sinks():
|
||||
published_events = []
|
||||
|
||||
def sink(source: object, event: BaseEvent) -> None:
|
||||
published_events.append((source, event))
|
||||
|
||||
event = AsyncTestEvent(type="async_test")
|
||||
token = add_stream_sink(sink)
|
||||
try:
|
||||
await crewai_event_bus.aemit("test_source", event)
|
||||
finally:
|
||||
reset_stream_sinks(token)
|
||||
|
||||
assert published_events == [("test_source", event)]
|
||||
assert event.emission_sequence is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multiple_async_handlers():
|
||||
received_events_1 = []
|
||||
|
||||
Reference in New Issue
Block a user