Files
crewAI/docs/edge/ar/learn/streaming-flow-execution.mdx
Lorenze Jay 8eed457e70 Define stream frame protocol for flows (#6391)
* Define stream frame protocol for flows

* Add direct LLM streaming helpers

* Unify flow streaming frame items

* Update flow streaming integration properties

* Drop stream frame debug runner example

* Address streaming contract review feedback

* Replay cached stream frame projections

* Remove stream frame version field

* Fix streaming contract docs link

* Preserve LLM instance state for stream events

* Address streaming review cleanup
2026-06-30 10:53:48 -07:00

480 lines
14 KiB
Plaintext

---
title: بث تنفيذ التدفق
description: بث المخرجات في الوقت الفعلي من تنفيذ تدفق CrewAI الخاص بك
icon: wave-pulse
mode: "wide"
---
## مقدمة
تدعم تدفقات CrewAI بث المخرجات، مما يتيح لك استلام تحديثات فورية أثناء تنفيذ تدفقك. تمكّنك هذه الميزة من بناء تطبيقات متجاوبة تعرض النتائج تدريجياً وتوفر تحديثات تقدم حية وتخلق تجربة مستخدم أفضل لسير العمل طويلة التشغيل.
## كيف يعمل بث التدفق
عند تفعيل البث في تدفق، يلتقط CrewAI ويبث المخرجات من أي أطقم أو استدعاءات LLM أو أدوات أو أحداث دورة حياة داخل التدفق. يقدم البث عناصر `StreamFrame` مرتبة تحتوي على محتوى قابل للطباعة وبيانات حدث مهيكلة مع تقدم التنفيذ.
## تفعيل البث
لتفعيل البث، عيّن خاصية `stream` إلى `True` في فئة التدفق الخاصة بك:
```python Code
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
class ResearchFlow(Flow):
stream = True # Enable streaming for the entire flow
@start()
def initialize(self):
return {"topic": "AI trends"}
@listen(initialize)
def research_topic(self, data):
researcher = Agent(
role="Research Analyst",
goal="Research topics thoroughly",
backstory="Expert researcher with analytical skills",
)
task = Task(
description="Research {topic} and provide insights",
expected_output="Detailed research findings",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
)
return crew.kickoff(inputs=data)
```
## البث المتزامن
عند استدعاء `kickoff()` على تدفق مع تفعيل البث، يُرجع جلسة stream تنتج عناصر `StreamFrame` مرتبة:
```python Code
flow = ResearchFlow()
# Start streaming execution
streaming = flow.kickoff()
# Iterate over stream items as they arrive
for item in streaming:
print(item.content, end="", flush=True)
# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result}")
```
### معلومات عنصر البث
يوفر كل عنصر محتوى قابلاً للطباعة وبيانات حدث مهيكلة:
```python Code
streaming = flow.kickoff()
for item in streaming:
print(f"Channel: {item.channel}")
print(f"Type: {item.type}")
print(f"Content: {item.content}")
print(f"Event payload: {item.event}")
```
### الوصول إلى خصائص البث
توفر جلسة stream خصائص وطرق مفيدة:
```python Code
streaming = flow.kickoff()
# Iterate and collect items
for item in streaming:
print(item.content, end="", flush=True)
# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Total frames: {len(streaming.frames)}")
print(f"Final result: {streaming.result}")
```
## البث غير المتزامن
للتطبيقات غير المتزامنة، استخدم `kickoff_async()` مع التكرار غير المتزامن:
```python Code
import asyncio
async def stream_flow():
flow = ResearchFlow()
# Start async streaming
streaming = await flow.kickoff_async()
# Async iteration over stream items
async for item in streaming:
print(item.content, end="", flush=True)
# Access final result
result = streaming.result
print(f"\n\nFinal output: {result}")
asyncio.run(stream_flow())
```
## البث مع التدفقات متعددة الخطوات
يعمل البث بسلاسة عبر خطوات تدفق متعددة، بما في ذلك التدفقات التي تنفذ أطقم متعددة:
```python Code
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
class MultiStepFlow(Flow):
stream = True
@start()
def research_phase(self):
"""First crew: Research the topic."""
researcher = Agent(
role="Research Analyst",
goal="Gather comprehensive information",
backstory="Expert at finding relevant information",
)
task = Task(
description="Research AI developments in healthcare",
expected_output="Research findings on AI in healthcare",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state["research"] = result.raw
return result.raw
@listen(research_phase)
def analysis_phase(self, research_data):
"""Second crew: Analyze the research."""
analyst = Agent(
role="Data Analyst",
goal="Analyze information and extract insights",
backstory="Expert at identifying patterns and trends",
)
task = Task(
description="Analyze this research: {research}",
expected_output="Key insights and trends",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
return crew.kickoff(inputs={"research": research_data})
# Stream across both phases
flow = MultiStepFlow()
streaming = flow.kickoff()
current_step = ""
for item in streaming:
# Track which flow step is executing
step_name = item.event.get("method_name") or item.event.get("task_name")
if step_name and step_name != current_step:
current_step = step_name
print(f"\n\n=== {step_name} ===\n")
print(item.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal analysis: {result}")
```
## مثال عملي: لوحة معلومات التقدم
إليك مثالاً كاملاً يوضح كيفية بناء لوحة معلومات تقدم مع البث:
```python Code
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
class ResearchPipeline(Flow):
stream = True
@start()
def gather_data(self):
researcher = Agent(
role="Data Gatherer",
goal="Collect relevant information",
backstory="Skilled at finding quality sources",
)
task = Task(
description="Gather data on renewable energy trends",
expected_output="Collection of relevant data points",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state["data"] = result.raw
return result.raw
@listen(gather_data)
def analyze_data(self, data):
analyst = Agent(
role="Data Analyst",
goal="Extract meaningful insights",
backstory="Expert at data analysis",
)
task = Task(
description="Analyze: {data}",
expected_output="Key insights and trends",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
return crew.kickoff(inputs={"data": data})
async def run_with_dashboard():
flow = ResearchPipeline()
print("="*60)
print("RESEARCH PIPELINE DASHBOARD")
print("="*60)
streaming = await flow.kickoff_async()
current_agent = ""
current_task = ""
frame_count = 0
async for item in streaming:
frame_count += 1
# Display phase transitions
task_name = item.event.get("task_name", "")
agent_role = item.event.get("agent_role", "")
if task_name and task_name != current_task:
current_task = task_name
current_agent = agent_role
print(f"\n\n📋 Phase: {current_task}")
print(f"👤 Agent: {current_agent}")
print("-" * 60)
# Display text output
if item.content:
print(item.content, end="", flush=True)
# Display tool usage
elif item.channel == "tools":
print(f"\n🔧 Tool event: {item.type}")
# Show completion summary
result = streaming.result
print(f"\n\n{'='*60}")
print("PIPELINE COMPLETE")
print(f"{'='*60}")
print(f"Total frames: {frame_count}")
print(f"Final output length: {len(str(result))} characters")
asyncio.run(run_with_dashboard())
```
## البث مع إدارة الحالة
يعمل البث بشكل طبيعي مع إدارة حالة التدفق:
```python Code
from pydantic import BaseModel
class AnalysisState(BaseModel):
topic: str = ""
research: str = ""
insights: str = ""
class StatefulStreamingFlow(Flow[AnalysisState]):
stream = True
@start()
def research(self):
# State is available during streaming
topic = self.state.topic
print(f"Researching: {topic}")
researcher = Agent(
role="Researcher",
goal="Research topics thoroughly",
backstory="Expert researcher",
)
task = Task(
description=f"Research {topic}",
expected_output="Research findings",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state.research = result.raw
return result.raw
@listen(research)
def analyze(self, research):
# Access updated state
print(f"Analyzing {len(self.state.research)} chars of research")
analyst = Agent(
role="Analyst",
goal="Extract insights",
backstory="Expert analyst",
)
task = Task(
description="Analyze: {research}",
expected_output="Key insights",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
result = crew.kickoff(inputs={"research": research})
self.state.insights = result.raw
return result.raw
# Run with streaming
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal state:")
print(f"Topic: {flow.state.topic}")
print(f"Research length: {len(flow.state.research)}")
print(f"Insights length: {len(flow.state.insights)}")
```
## حالات الاستخدام
بث التدفق ذو قيمة خاصة لـ:
- **سير العمل متعددة المراحل**: عرض التقدم عبر مراحل البحث والتحليل والتوليف
- **خطوط الأنابيب المعقدة**: توفير رؤية لتدفقات معالجة البيانات طويلة التشغيل
- **التطبيقات التفاعلية**: بناء واجهات مستخدم متجاوبة تعرض النتائج الوسيطة
- **المراقبة والتصحيح**: مراقبة تنفيذ التدفق وتفاعلات الأطقم في الوقت الفعلي
- **تتبع التقدم**: إظهار المرحلة الحالية من سير العمل للمستخدمين
- **لوحات المعلومات الحية**: إنشاء واجهات مراقبة لتدفقات الإنتاج
## قنوات إطارات البث
ينتج بث التدفق عناصر `StreamFrame` عبر عدة قنوات:
### إطارات LLM
محتوى نصي قياسي من استجابات LLM:
```python Code
for item in streaming:
if item.channel == "llm" and item.content:
print(item.content, end="", flush=True)
```
### إطارات الأدوات
معلومات حول استدعاءات الأدوات داخل التدفق:
```python Code
for item in streaming:
if item.channel == "tools":
print(f"\nTool event: {item.type}")
print(f"Payload: {item.event}")
```
## معالجة الأخطاء
التعامل مع الأخطاء بأناقة أثناء البث:
```python Code
flow = ResearchFlow()
streaming = flow.kickoff()
try:
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\nSuccess! Result: {result}")
except Exception as e:
print(f"\nError during flow execution: {e}")
if streaming.is_completed:
print("Streaming completed but flow encountered an error")
```
## الإلغاء وتنظيف الموارد
تدعم جلسة stream الإلغاء السلس بحيث يتوقف العمل الجاري فوراً عند انقطاع اتصال المستهلك.
### مدير السياق غير المتزامن
```python Code
streaming = await flow.kickoff_async()
async with streaming:
async for item in streaming:
print(item.content, end="", flush=True)
```
### الإلغاء الصريح
```python Code
streaming = await flow.kickoff_async()
try:
async for item in streaming:
print(item.content, end="", flush=True)
finally:
await streaming.aclose() # غير متزامن
# streaming.close() # المكافئ المتزامن
```
بعد الإلغاء، يكون كل من `streaming.is_cancelled` و `streaming.is_completed` بقيمة `True`. كل من `aclose()` و `close()` متساويان القوة.
## ملاحظات مهمة
- يفعّل البث تلقائياً بث LLM لأي أطقم مستخدمة داخل التدفق
- يجب التكرار عبر جميع عناصر stream قبل الوصول إلى خاصية `.result`
- يعمل البث مع كل من حالة التدفق المنظمة وغير المنظمة
- يلتقط بث التدفق المخرجات من جميع الأطقم واستدعاءات LLM في التدفق
- يتضمن كل إطار سياق حدث مهيكلاً مثل القناة والنوع والنطاق والحمولة
- يضيف البث حملاً ضئيلاً لتنفيذ التدفق
## الدمج مع تصور التدفق
يمكنك دمج البث مع تصور التدفق لتوفير صورة كاملة:
```python Code
# Generate flow visualization
flow = ResearchFlow()
flow.plot("research_flow") # Creates HTML visualization
# Run with streaming
streaming = flow.kickoff()
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")
```
من خلال الاستفادة من بث التدفق، يمكنك بناء تطبيقات متطورة ومتجاوبة توفر للمستخدمين رؤية فورية لسير العمل المعقدة متعددة المراحل، مما يجعل أتمتة الذكاء الاصطناعي الخاصة بك أكثر شفافية وجاذبية.