Files
crewAI/docs/ar/concepts/flows.mdx

1069 lines
50 KiB
Plaintext

---
title: التدفقات
description: تعلّم كيفية إنشاء وإدارة سير عمل الذكاء الاصطناعي باستخدام تدفقات CrewAI.
icon: arrow-progress
mode: "wide"
---
## نظرة عامة
تدفقات CrewAI هي ميزة قوية مصممة لتبسيط إنشاء وإدارة سير عمل الذكاء الاصطناعي. تتيح التدفقات للمطورين دمج وتنسيق مهام البرمجة وفرق Crew بكفاءة، مما يوفر إطار عمل متين لبناء أتمتة ذكاء اصطناعي متطورة.
تتيح لك التدفقات إنشاء سير عمل منظم يعتمد على الأحداث. فهي توفر طريقة سلسة لربط مهام متعددة وإدارة الحالة والتحكم في تدفق التنفيذ في تطبيقات الذكاء الاصطناعي الخاصة بك. باستخدام التدفقات، يمكنك بسهولة تصميم وتنفيذ عمليات متعددة الخطوات تستفيد من الإمكانيات الكاملة لـ CrewAI.
1. **تبسيط إنشاء سير العمل**: ربط فرق Crew والمهام المتعددة بسهولة لإنشاء سير عمل ذكاء اصطناعي معقد.
2. **إدارة الحالة**: تجعل التدفقات إدارة ومشاركة الحالة بين المهام المختلفة في سير العمل أمرًا سهلًا للغاية.
3. **بنية تعتمد على الأحداث**: مبنية على نموذج يعتمد على الأحداث، مما يتيح سير عمل ديناميكي وسريع الاستجابة.
4. **تحكم مرن في التدفق**: تنفيذ المنطق الشرطي والحلقات والتفرع ضمن سير العمل.
## البدء
لنقم بإنشاء تدفق بسيط حيث ستستخدم OpenAI لإنشاء مدينة عشوائية في مهمة واحدة ثم استخدام تلك المدينة لإنشاء حقيقة ممتعة في مهمة أخرى.
```python Code
from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
class ExampleFlow(Flow):
model = "gpt-4o-mini"
@start()
def generate_city(self):
print("Starting flow")
# Each flow state automatically gets a unique ID
print(f"Flow State ID: {self.state['id']}")
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": "Return the name of a random city in the world.",
},
],
)
random_city = response["choices"][0]["message"]["content"]
# Store the city in our state
self.state["city"] = random_city
print(f"Random City: {random_city}")
return random_city
@listen(generate_city)
def generate_fun_fact(self, random_city):
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": f"Tell me a fun fact about {random_city}",
},
],
)
fun_fact = response["choices"][0]["message"]["content"]
# Store the fun fact in our state
self.state["fun_fact"] = fun_fact
return fun_fact
flow = ExampleFlow()
flow.plot()
result = flow.kickoff()
print(f"Generated fun fact: {result}")
```
![Flow Visual image](/images/crewai-flow-1.png)
في المثال أعلاه، أنشأنا تدفقًا بسيطًا يولّد مدينة عشوائية باستخدام OpenAI ثم يولّد حقيقة ممتعة عن تلك المدينة. يتكون التدفق من مهمتين: `generate_city` و `generate_fun_fact`. مهمة `generate_city` هي نقطة البداية للتدفق، ومهمة `generate_fun_fact` تستمع لمخرجات مهمة `generate_city`.
يتلقى كل مثيل من التدفق تلقائيًا معرّفًا فريدًا (UUID) في حالته، مما يساعد في تتبع وإدارة عمليات تنفيذ التدفق. يمكن للحالة أيضًا تخزين بيانات إضافية (مثل المدينة المولّدة والحقيقة الممتعة) التي تستمر طوال تنفيذ التدفق.
عند تشغيل التدفق، سيقوم بما يلي:
1. توليد معرّف فريد لحالة التدفق
2. توليد مدينة عشوائية وتخزينها في الحالة
3. توليد حقيقة ممتعة عن تلك المدينة وتخزينها في الحالة
4. طباعة النتائج في وحدة التحكم
يمكن أن يكون المعرّف الفريد للحالة والبيانات المخزّنة مفيدًا لتتبع عمليات تنفيذ التدفق والحفاظ على السياق بين المهام.
**ملاحظة:** تأكد من إعداد ملف `.env` لتخزين `OPENAI_API_KEY` الخاص بك. هذا المفتاح ضروري للمصادقة على طلبات OpenAI API.
### @start()
يحدد المزخرف `@start()` نقاط الدخول للتدفق. يمكنك:
- تعريف عدة نقاط بداية غير مشروطة: `@start()`
- ربط البداية بدالة سابقة أو تسمية موجّه: `@start("method_or_label")`
- توفير شرط قابل للاستدعاء للتحكم في وقت تنفيذ البداية
جميع دوال `@start()` المستوفية للشروط ستُنفَّذ (غالبًا بالتوازي) عند بدء أو استئناف التدفق.
### @listen()
يُستخدم المزخرف `@listen()` لتحديد دالة كمستمع لمخرجات مهمة أخرى في التدفق. ستُنفَّذ الدالة المزخرفة بـ `@listen()` عندما تُصدر المهمة المحددة مخرجاتها. يمكن للدالة الوصول إلى مخرجات المهمة التي تستمع إليها كمعامل.
#### الاستخدام
يمكن استخدام المزخرف `@listen()` بعدة طرق:
1. **الاستماع لدالة بالاسم**: يمكنك تمرير اسم الدالة التي تريد الاستماع إليها كسلسلة نصية. عند اكتمال تلك الدالة، سيتم تشغيل دالة المستمع.
```python Code
@listen("generate_city")
def generate_fun_fact(self, random_city):
# Implementation
```
2. **الاستماع لدالة مباشرة**: يمكنك تمرير الدالة نفسها. عند اكتمال تلك الدالة، سيتم تشغيل دالة المستمع.
```python Code
@listen(generate_city)
def generate_fun_fact(self, random_city):
# Implementation
```
### مخرجات التدفق
الوصول إلى مخرجات التدفق والتعامل معها أمر أساسي لدمج سير عمل الذكاء الاصطناعي في التطبيقات أو الأنظمة الأكبر. توفر تدفقات CrewAI آليات مباشرة لاسترداد المخرجات النهائية والوصول إلى النتائج الوسيطة وإدارة الحالة العامة للتدفق.
#### استرداد المخرجات النهائية
عند تشغيل تدفق، يتم تحديد المخرجات النهائية بواسطة آخر دالة تكتمل. تُعيد دالة `kickoff()` مخرجات هذه الدالة الأخيرة.
إليك كيفية الوصول إلى المخرجات النهائية:
<CodeGroup>
```python Code
from crewai.flow.flow import Flow, listen, start
class OutputExampleFlow(Flow):
@start()
def first_method(self):
return "Output from first_method"
@listen(first_method)
def second_method(self, first_output):
return f"Second method received: {first_output}"
flow = OutputExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print("---- Final Output ----")
print(final_output)
```
```text Output
---- Final Output ----
Second method received: Output from first_method
```
</CodeGroup>
![Flow Visual image](/images/crewai-flow-2.png)
في هذا المثال، `second_method` هي آخر دالة تكتمل، لذا ستكون مخرجاتها هي المخرجات النهائية للتدفق.
ستُعيد دالة `kickoff()` المخرجات النهائية، التي تُطبع بعد ذلك في وحدة التحكم. ستولّد دالة `plot()` ملف HTML الذي سيساعدك على فهم التدفق.
#### الوصول إلى الحالة وتحديثها
بالإضافة إلى استرداد المخرجات النهائية، يمكنك أيضًا الوصول إلى الحالة وتحديثها داخل التدفق. يمكن استخدام الحالة لتخزين ومشاركة البيانات بين الدوال المختلفة في التدفق. بعد تشغيل التدفق، يمكنك الوصول إلى الحالة لاسترداد أي معلومات تمت إضافتها أو تحديثها أثناء التنفيذ.
إليك مثال على كيفية تحديث الحالة والوصول إليها:
<CodeGroup>
```python Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
counter: int = 0
message: str = ""
class StateExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
self.state.message = "Hello from first_method"
self.state.counter += 1
@listen(first_method)
def second_method(self):
self.state.message += " - updated by second_method"
self.state.counter += 1
return self.state.message
flow = StateExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)
```
```text Output
Final Output: Hello from first_method - updated by second_method
Final State:
counter=2 message='Hello from first_method - updated by second_method'
```
</CodeGroup>
![Flow Visual image](/images/crewai-flow-2.png)
في هذا المثال، يتم تحديث الحالة بواسطة كل من `first_method` و `second_method`.
بعد تشغيل التدفق، يمكنك الوصول إلى الحالة النهائية لرؤية التحديثات التي أجرتها هذه الدوال.
من خلال ضمان إعادة مخرجات الدالة الأخيرة وتوفير الوصول إلى الحالة، تجعل تدفقات CrewAI من السهل دمج نتائج سير عمل الذكاء الاصطناعي في التطبيقات أو الأنظمة الأكبر،
مع الحفاظ على الوصول إلى الحالة طوال تنفيذ التدفق.
## إدارة حالة التدفق
إدارة الحالة بفعالية أمر بالغ الأهمية لبناء سير عمل ذكاء اصطناعي موثوق وقابل للصيانة. توفر تدفقات CrewAI آليات قوية لإدارة الحالة غير المهيكلة والمهيكلة،
مما يتيح للمطورين اختيار النهج الأنسب لاحتياجات تطبيقاتهم.
### إدارة الحالة غير المهيكلة
في إدارة الحالة غير المهيكلة، يتم تخزين جميع الحالات في خاصية `state` لفئة `Flow`.
يوفر هذا النهج مرونة، مما يمكّن المطورين من إضافة أو تعديل خصائص الحالة أثناء التشغيل دون تحديد مخطط صارم.
حتى مع الحالات غير المهيكلة، تولّد تدفقات CrewAI تلقائيًا معرّفًا فريدًا (UUID) لكل مثيل حالة وتحافظ عليه.
```python Code
from crewai.flow.flow import Flow, listen, start
class UnstructuredExampleFlow(Flow):
@start()
def first_method(self):
# The state automatically includes an 'id' field
print(f"State ID: {self.state['id']}")
self.state['counter'] = 0
self.state['message'] = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state['counter'] += 1
self.state['message'] += " - updated"
@listen(second_method)
def third_method(self):
self.state['counter'] += 1
self.state['message'] += " - updated again"
print(f"State after third_method: {self.state}")
flow = UnstructuredExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
```
![Flow Visual image](/images/crewai-flow-3.png)
**ملاحظة:** يتم توليد حقل `id` تلقائيًا والحفاظ عليه طوال تنفيذ التدفق. لا تحتاج إلى إدارته أو تعيينه يدويًا، وسيتم الحفاظ عليه حتى عند تحديث الحالة ببيانات جديدة.
**النقاط الرئيسية:**
- **المرونة:** يمكنك إضافة خصائص ديناميكيًا إلى `self.state` دون قيود محددة مسبقًا.
- **البساطة:** مثالي لسير العمل البسيط حيث يكون هيكل الحالة بسيطًا أو متغيرًا بشكل كبير.
### إدارة الحالة المهيكلة
تستفيد إدارة الحالة المهيكلة من مخططات محددة مسبقًا لضمان الاتساق وسلامة الأنواع عبر سير العمل.
باستخدام نماذج مثل `BaseModel` من Pydantic، يمكن للمطورين تحديد الشكل الدقيق للحالة، مما يتيح تحققًا أفضل وإكمالًا تلقائيًا في بيئات التطوير.
تتلقى كل حالة في تدفقات CrewAI تلقائيًا معرّفًا فريدًا (UUID) للمساعدة في تتبع وإدارة مثيلات الحالة. يتم توليد هذا المعرّف وإدارته تلقائيًا بواسطة نظام التدفق.
```python Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
# Note: 'id' field is automatically added to all states
counter: int = 0
message: str = ""
class StructuredExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
# Access the auto-generated ID if needed
print(f"State ID: {self.state.id}")
self.state.message = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state.counter += 1
self.state.message += " - updated"
@listen(second_method)
def third_method(self):
self.state.counter += 1
self.state.message += " - updated again"
print(f"State after third_method: {self.state}")
flow = StructuredExampleFlow()
flow.kickoff()
```
![Flow Visual image](/images/crewai-flow-3.png)
**النقاط الرئيسية:**
- **مخطط محدد:** يحدد `ExampleState` هيكل الحالة بوضوح، مما يعزز قابلية قراءة الكود وصيانته.
- **سلامة الأنواع:** يضمن استخدام Pydantic التزام خصائص الحالة بالأنواع المحددة، مما يقلل من أخطاء وقت التشغيل.
- **الإكمال التلقائي:** يمكن لبيئات التطوير المتكاملة توفير إكمال تلقائي أفضل وفحص أخطاء بناءً على نموذج الحالة المحدد.
### الاختيار بين إدارة الحالة غير المهيكلة والمهيكلة
- **استخدم إدارة الحالة غير المهيكلة عندما:**
- يكون حالة سير العمل بسيطة أو ديناميكية للغاية.
- تكون المرونة أولوية على تعريفات الحالة الصارمة.
- يكون النماذج الأولية السريعة مطلوبة دون عبء تحديد المخططات.
- **استخدم إدارة الحالة المهيكلة عندما:**
- يتطلب سير العمل هيكل حالة محدد جيدًا ومتسق.
- تكون سلامة الأنواع والتحقق مهمتين لموثوقية تطبيقك.
- تريد الاستفادة من ميزات بيئة التطوير المتكاملة مثل الإكمال التلقائي وفحص الأنواع لتجربة مطور أفضل.
من خلال توفير خيارات إدارة الحالة غير المهيكلة والمهيكلة، تمكّن تدفقات CrewAI المطورين من بناء سير عمل ذكاء اصطناعي مرن ومتين في آن واحد، ملبيةً مجموعة واسعة من متطلبات التطبيقات.
## استمرارية التدفق
يتيح مزخرف @persist الاستمرارية التلقائية للحالة في تدفقات CrewAI، مما يسمح لك بالحفاظ على حالة التدفق عبر عمليات إعادة التشغيل أو تنفيذات سير العمل المختلفة. يمكن تطبيق هذا المزخرف على مستوى الفئة أو مستوى الدالة، مما يوفر مرونة في كيفية إدارة استمرارية الحالة.
### الاستمرارية على مستوى الفئة
عند التطبيق على مستوى الفئة، يقوم مزخرف @persist باستمرارية حالات جميع دوال التدفق تلقائيًا:
```python
@persist # Using SQLiteFlowPersistence by default
class MyFlow(Flow[MyState]):
@start()
def initialize_flow(self):
# This method will automatically have its state persisted
self.state.counter = 1
print("Initialized flow. State ID:", self.state.id)
@listen(initialize_flow)
def next_step(self):
# The state (including self.state.id) is automatically reloaded
self.state.counter += 1
print("Flow state is persisted. Counter:", self.state.counter)
```
### الاستمرارية على مستوى الدالة
للتحكم الأكثر دقة، يمكنك تطبيق @persist على دوال محددة:
```python
class AnotherFlow(Flow[dict]):
@persist # Persists only this method's state
@start()
def begin(self):
if "runs" not in self.state:
self.state["runs"] = 0
self.state["runs"] += 1
print("Method-level persisted runs:", self.state["runs"])
```
### كيف تعمل
1. **تعريف الحالة الفريد**
- تتلقى كل حالة تدفق UUID فريد تلقائيًا
- يتم الحفاظ على المعرّف عبر تحديثات الحالة واستدعاءات الدوال
- يدعم كلًا من الحالات المهيكلة (Pydantic BaseModel) وغير المهيكلة (القاموس)
2. **واجهة SQLite الافتراضية**
- SQLiteFlowPersistence هي واجهة التخزين الافتراضية
- يتم حفظ الحالات تلقائيًا في قاعدة بيانات SQLite محلية
- معالجة أخطاء متينة تضمن رسائل واضحة في حالة فشل عمليات قاعدة البيانات
3. **معالجة الأخطاء**
- رسائل خطأ شاملة لعمليات قاعدة البيانات
- تحقق تلقائي من الحالة أثناء الحفظ والتحميل
- ملاحظات واضحة عند مواجهة مشاكل في عمليات الاستمرارية
### اعتبارات مهمة
- **أنواع الحالة**: يتم دعم كل من الحالات المهيكلة (Pydantic BaseModel) وغير المهيكلة (القاموس)
- **المعرّف التلقائي**: يتم إضافة حقل `id` تلقائيًا إذا لم يكن موجودًا
- **استعادة الحالة**: يمكن للتدفقات الفاشلة أو المُعاد تشغيلها إعادة تحميل حالتها السابقة تلقائيًا
- **التنفيذ المخصص**: يمكنك توفير تنفيذ FlowPersistence الخاص بك لاحتياجات التخزين المتخصصة
### المزايا التقنية
1. **تحكم دقيق من خلال الوصول المنخفض المستوى**
- وصول مباشر لعمليات الاستمرارية لحالات الاستخدام المتقدمة
- تحكم دقيق عبر مزخرفات الاستمرارية على مستوى الدوال
- قدرات مدمجة لفحص الحالة وتصحيح الأخطاء
- رؤية كاملة لتغييرات الحالة وعمليات الاستمرارية
2. **موثوقية معززة**
- استعادة تلقائية للحالة بعد أعطال النظام أو إعادة التشغيل
- تحديثات حالة قائمة على المعاملات لسلامة البيانات
- معالجة أخطاء شاملة مع رسائل خطأ واضحة
- تحقق متين أثناء عمليات حفظ وتحميل الحالة
3. **بنية قابلة للتوسع**
- واجهة استمرارية قابلة للتخصيص من خلال واجهة FlowPersistence
- دعم لحلول تخزين متخصصة تتجاوز SQLite
- متوافقة مع كل من الحالات المهيكلة (Pydantic) وغير المهيكلة (dict)
- تكامل سلس مع أنماط تدفق CrewAI الحالية
تركز بنية نظام الاستمرارية على الدقة التقنية وخيارات التخصيص، مما يتيح للمطورين الحفاظ على التحكم الكامل في إدارة الحالة مع الاستفادة من ميزات الموثوقية المدمجة.
## التحكم في التدفق
### المنطق الشرطي: `or`
تتيح لك دالة `or_` في التدفقات الاستماع لعدة دوال وتشغيل دالة المستمع عندما تُصدر أي من الدوال المحددة مخرجاتها.
<CodeGroup>
```python Code
from crewai.flow.flow import Flow, listen, or_, start
class OrExampleFlow(Flow):
@start()
def start_method(self):
return "Hello from the start method"
@listen(start_method)
def second_method(self):
return "Hello from the second method"
@listen(or_(start_method, second_method))
def logger(self, result):
print(f"Logger: {result}")
flow = OrExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
```
```text Output
Logger: Hello from the start method
Logger: Hello from the second method
```
</CodeGroup>
![Flow Visual image](/images/crewai-flow-4.png)
عند تشغيل هذا التدفق، سيتم تشغيل دالة `logger` بواسطة مخرجات إما `start_method` أو `second_method`.
تُستخدم دالة `or_` للاستماع لعدة دوال وتشغيل دالة المستمع عندما تُصدر أي من الدوال المحددة مخرجاتها.
### المنطق الشرطي: `and`
تتيح لك دالة `and_` في التدفقات الاستماع لعدة دوال وتشغيل دالة المستمع فقط عندما تُصدر جميع الدوال المحددة مخرجاتها.
<CodeGroup>
```python Code
from crewai.flow.flow import Flow, and_, listen, start
class AndExampleFlow(Flow):
@start()
def start_method(self):
self.state["greeting"] = "Hello from the start method"
@listen(start_method)
def second_method(self):
self.state["joke"] = "What do computers eat? Microchips."
@listen(and_(start_method, second_method))
def logger(self):
print("---- Logger ----")
print(self.state)
flow = AndExampleFlow()
flow.plot()
flow.kickoff()
```
```text Output
---- Logger ----
{'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}
```
</CodeGroup>
![Flow Visual image](/images/crewai-flow-5.png)
عند تشغيل هذا التدفق، سيتم تشغيل دالة `logger` فقط عندما يُصدر كل من `start_method` و `second_method` مخرجاتهما.
تُستخدم دالة `and_` للاستماع لعدة دوال وتشغيل دالة المستمع فقط عندما تُصدر جميع الدوال المحددة مخرجاتها.
### الموجّه
يتيح لك مزخرف `@router()` في التدفقات تحديد منطق توجيه شرطي بناءً على مخرجات دالة.
يمكنك تحديد مسارات مختلفة بناءً على مخرجات الدالة، مما يتيح لك التحكم في تدفق التنفيذ ديناميكيًا.
<CodeGroup>
```python Code
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class ExampleState(BaseModel):
success_flag: bool = False
class RouterFlow(Flow[ExampleState]):
@start()
def start_method(self):
print("Starting the structured flow")
random_boolean = random.choice([True, False])
self.state.success_flag = random_boolean
@router(start_method)
def second_method(self):
if self.state.success_flag:
return "success"
else:
return "failed"
@listen("success")
def third_method(self):
print("Third method running")
@listen("failed")
def fourth_method(self):
print("Fourth method running")
flow = RouterFlow()
flow.plot("my_flow_plot")
flow.kickoff()
```
```text Output
Starting the structured flow
Third method running
Fourth method running
```
</CodeGroup>
![Flow Visual image](/images/crewai-flow-6.png)
في المثال أعلاه، تولّد `start_method` قيمة منطقية عشوائية وتعيّنها في الحالة.
تستخدم `second_method` مزخرف `@router()` لتحديد منطق توجيه شرطي بناءً على قيمة المنطقية.
إذا كانت القيمة `True`، تُعيد الدالة `"success"`، وإذا كانت `False`، تُعيد `"failed"`.
تستمع `third_method` و `fourth_method` لمخرجات `second_method` وتُنفَّذ بناءً على القيمة المُعادة.
عند تشغيل هذا التدفق، ستتغير المخرجات بناءً على القيمة المنطقية العشوائية المولّدة بواسطة `start_method`.
### الإنسان في الحلقة (التغذية الراجعة البشرية)
<Note>
يتطلب مزخرف `@human_feedback` **CrewAI الإصدار 1.8.0 أو أعلى**.
</Note>
يتيح مزخرف `@human_feedback` سير عمل يتضمن تدخلًا بشريًا من خلال إيقاف تنفيذ التدفق مؤقتًا لجمع تغذية راجعة من إنسان. هذا مفيد لبوابات الموافقة ومراجعة الجودة ونقاط القرار التي تتطلب حكمًا بشريًا.
```python Code
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Do you approve this content?",
emit=["approved", "rejected", "needs_revision"],
llm="gpt-4o-mini",
default_outcome="needs_revision",
)
def generate_content(self):
return "Content to be reviewed..."
@listen("approved")
def on_approval(self, result: HumanFeedbackResult):
print(f"Approved! Feedback: {result.feedback}")
@listen("rejected")
def on_rejection(self, result: HumanFeedbackResult):
print(f"Rejected. Reason: {result.feedback}")
```
عند تحديد `emit`، يتم تفسير التغذية الراجعة الحرة للإنسان بواسطة LLM وتُختصر إلى إحدى النتائج المحددة، والتي تُشغل بعد ذلك مزخرف `@listen` المقابل.
يمكنك أيضًا استخدام `@human_feedback` دون توجيه لجمع التغذية الراجعة ببساطة:
```python Code
@start()
@human_feedback(message="Any comments on this output?")
def my_method(self):
return "Output for review"
@listen(my_method)
def next_step(self, result: HumanFeedbackResult):
# Access feedback via result.feedback
# Access original output via result.output
pass
```
يمكنك الوصول إلى جميع التغذيات الراجعة المُجمّعة أثناء التدفق عبر `self.last_human_feedback` (الأحدث) أو `self.human_feedback_history` (جميع التغذيات الراجعة كقائمة).
للحصول على دليل كامل حول التغذية الراجعة البشرية في التدفقات، بما في ذلك **التغذية الراجعة غير المتزامنة/غير الحاجبة** مع مزودين مخصصين (Slack، webhooks، إلخ)، انظر [التغذية الراجعة البشرية في التدفقات](/ar/learn/human-feedback-in-flows).
## إضافة Agents إلى التدفقات
يمكن دمج Agents بسلاسة في تدفقاتك، مما يوفر بديلًا خفيف الوزن لفرق Crew الكاملة عندما تحتاج إلى تنفيذ مهام أبسط وأكثر تركيزًا. إليك مثال على كيفية استخدام Agent ضمن تدفق لإجراء أبحاث السوق:
```python
import asyncio
from typing import Any, Dict, List
from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start
# Define a structured output format
class MarketAnalysis(BaseModel):
key_trends: List[str] = Field(description="List of identified market trends")
market_size: str = Field(description="Estimated market size")
competitors: List[str] = Field(description="Major competitors in the space")
# Define flow state
class MarketResearchState(BaseModel):
product: str = ""
analysis: MarketAnalysis | None = None
# Create a flow class
class MarketResearchFlow(Flow[MarketResearchState]):
@start()
def initialize_research(self) -> Dict[str, Any]:
print(f"Starting market research for {self.state.product}")
return {"product": self.state.product}
@listen(initialize_research)
async def analyze_market(self) -> Dict[str, Any]:
# Create an Agent for market research
analyst = Agent(
role="Market Research Analyst",
goal=f"Analyze the market for {self.state.product}",
backstory="You are an experienced market analyst with expertise in "
"identifying market trends and opportunities.",
tools=[SerperDevTool()],
verbose=True,
)
# Define the research query
query = f"""
Research the market for {self.state.product}. Include:
1. Key market trends
2. Market size
3. Major competitors
Format your response according to the specified structure.
"""
# Execute the analysis with structured output format
result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
if result.pydantic:
print("result", result.pydantic)
else:
print("result", result)
# Return the analysis to update the state
return {"analysis": result.pydantic}
@listen(analyze_market)
def present_results(self, analysis) -> None:
print("\nMarket Analysis Results")
print("=====================")
if isinstance(analysis, dict):
# If we got a dict with 'analysis' key, extract the actual analysis object
market_analysis = analysis.get("analysis")
else:
market_analysis = analysis
if market_analysis and isinstance(market_analysis, MarketAnalysis):
print("\nKey Market Trends:")
for trend in market_analysis.key_trends:
print(f"- {trend}")
print(f"\nMarket Size: {market_analysis.market_size}")
print("\nMajor Competitors:")
for competitor in market_analysis.competitors:
print(f"- {competitor}")
else:
print("No structured analysis data available.")
print("Raw analysis:", analysis)
# Usage example
async def run_flow():
flow = MarketResearchFlow()
flow.plot("MarketResearchFlowPlot")
result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
return result
# Run the flow
if __name__ == "__main__":
asyncio.run(run_flow())
```
![Flow Visual image](/images/crewai-flow-7.png)
يوضح هذا المثال عدة ميزات رئيسية لاستخدام Agents في التدفقات:
1. **المخرجات المهيكلة**: استخدام نماذج Pydantic لتحديد تنسيق المخرجات المتوقع (`MarketAnalysis`) يضمن سلامة الأنواع والبيانات المهيكلة في جميع أنحاء التدفق.
2. **إدارة الحالة**: تحافظ حالة التدفق (`MarketResearchState`) على السياق بين الخطوات وتخزّن كلًا من المدخلات والمخرجات.
3. **تكامل الأدوات**: يمكن لـ Agents استخدام أدوات (مثل `WebsiteSearchTool`) لتعزيز قدراتهم.
## إضافة فرق Crew إلى التدفقات
إنشاء تدفق مع فرق Crew متعددة في CrewAI أمر مباشر.
يمكنك إنشاء مشروع CrewAI جديد يتضمن جميع الهيكلية اللازمة لإنشاء تدفق مع فرق Crew متعددة عن طريق تشغيل الأمر التالي:
```bash
crewai create flow name_of_flow
```
سيولّد هذا الأمر مشروع CrewAI جديد مع هيكل المجلدات اللازم. يتضمن المشروع المولّد فريق Crew مُعد مسبقًا يُسمى `poem_crew` ويعمل بالفعل. يمكنك استخدام هذا الفريق كقالب بنسخه ولصقه وتعديله لإنشاء فرق أخرى.
### هيكل المجلدات
بعد تشغيل أمر `crewai create flow name_of_flow`، سترى هيكل مجلدات مشابه للتالي:
| المجلد/الملف | الوصف |
| :--------------------- | :----------------------------------------------------------------- |
| `name_of_flow/` | المجلد الجذر للتدفق. |
| ├── `crews/` | يحتوي على مجلدات لفرق Crew المحددة. |
| │ └── `poem_crew/` | مجلد لـ "poem_crew" مع إعداداته وسكربتاته. |
| │ ├── `config/` | مجلد ملفات الإعداد لـ "poem_crew". |
| │ │ ├── `agents.yaml` | ملف YAML يحدد الـ Agents لـ "poem_crew". |
| │ │ └── `tasks.yaml` | ملف YAML يحدد المهام لـ "poem_crew". |
| │ ├── `poem_crew.py` | سكربت وظائف "poem_crew". |
| ├── `tools/` | مجلد للأدوات الإضافية المُستخدمة في التدفق. |
| │ └── `custom_tool.py` | تنفيذ أداة مخصصة. |
| ├── `main.py` | السكربت الرئيسي لتشغيل التدفق. |
| ├── `README.md` | وصف المشروع والتعليمات. |
| ├── `pyproject.toml` | ملف إعداد تبعيات المشروع والإعدادات. |
| └── `.gitignore` | يحدد الملفات والمجلدات المراد تجاهلها في التحكم بالإصدارات. |
### بناء فرق Crew الخاصة بك
في مجلد `crews`، يمكنك تحديد فرق Crew متعددة. سيكون لكل فريق مجلده الخاص الذي يحتوي على ملفات الإعداد وملف تعريف الفريق. على سبيل المثال، يحتوي مجلد `poem_crew` على:
- `config/agents.yaml`: يحدد الـ Agents للفريق.
- `config/tasks.yaml`: يحدد المهام للفريق.
- `poem_crew.py`: يحتوي على تعريف الفريق، بما في ذلك الـ Agents والمهام والفريق نفسه.
يمكنك نسخ ولصق وتعديل `poem_crew` لإنشاء فرق أخرى.
### ربط فرق Crew في `main.py`
ملف `main.py` هو حيث تنشئ التدفق وتربط فرق Crew معًا. يمكنك تحديد التدفق باستخدام فئة `Flow` والمزخرفات `@start` و `@listen` لتحديد تدفق التنفيذ.
إليك مثال على كيفية ربط `poem_crew` في ملف `main.py`:
```python Code
#!/usr/bin/env python
from random import randint
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew
class PoemState(BaseModel):
sentence_count: int = 1
poem: str = ""
class PoemFlow(Flow[PoemState]):
@start()
def generate_sentence_count(self):
print("Generating sentence count")
self.state.sentence_count = randint(1, 5)
@listen(generate_sentence_count)
def generate_poem(self):
print("Generating poem")
result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})
print("Poem generated", result.raw)
self.state.poem = result.raw
@listen(generate_poem)
def save_poem(self):
print("Saving poem")
with open("poem.txt", "w") as f:
f.write(self.state.poem)
def kickoff():
poem_flow = PoemFlow()
poem_flow.kickoff()
def plot():
poem_flow = PoemFlow()
poem_flow.plot("PoemFlowPlot")
if __name__ == "__main__":
kickoff()
plot()
```
في هذا المثال، تحدد فئة `PoemFlow` تدفقًا يولّد عدد الجمل، ويستخدم `PoemCrew` لتوليد قصيدة، ثم يحفظ القصيدة في ملف. يتم بدء التدفق باستدعاء دالة `kickoff()`. سيتم توليد PoemFlowPlot بواسطة دالة `plot()`.
![Flow Visual image](/images/crewai-flow-8.png)
### تشغيل التدفق
(اختياري) قبل تشغيل التدفق، يمكنك تثبيت التبعيات بتشغيل:
```bash
crewai install
```
بمجرد تثبيت جميع التبعيات، تحتاج إلى تفعيل البيئة الافتراضية بتشغيل:
```bash
source .venv/bin/activate
```
بعد تفعيل البيئة الافتراضية، يمكنك تشغيل التدفق بتنفيذ أحد الأوامر التالية:
```bash
crewai flow kickoff
```
أو
```bash
uv run kickoff
```
سيُنفَّذ التدفق، ويجب أن ترى المخرجات في وحدة التحكم.
## رسم التدفقات
يمكن أن يوفر تصوير سير عمل الذكاء الاصطناعي رؤى قيمة حول هيكل ومسارات تنفيذ تدفقاتك. تقدم CrewAI أداة تصوير قوية تتيح لك إنشاء رسوم بيانية تفاعلية لتدفقاتك، مما يسهّل فهم وتحسين سير عمل الذكاء الاصطناعي.
### ما هي الرسوم البيانية؟
الرسوم البيانية في CrewAI هي تمثيلات بصرية لسير عمل الذكاء الاصطناعي. تعرض المهام المختلفة واتصالاتها وتدفق البيانات بينها. يساعد هذا التصوير في فهم تسلسل العمليات وتحديد الاختناقات وضمان توافق منطق سير العمل مع توقعاتك.
### كيفية إنشاء رسم بياني
توفر CrewAI طريقتين مريحتين لإنشاء رسوم بيانية لتدفقاتك:
#### الخيار 1: استخدام دالة `plot()`
إذا كنت تعمل مباشرة مع مثيل تدفق، يمكنك إنشاء رسم بياني باستدعاء دالة `plot()` على كائن التدفق. ستُنشئ هذه الدالة ملف HTML يحتوي على الرسم البياني التفاعلي لتدفقك.
```python Code
# Assuming you have a flow instance
flow.plot("my_flow_plot")
```
سيُنشئ هذا ملفًا باسم `my_flow_plot.html` في مجلدك الحالي. يمكنك فتح هذا الملف في متصفح ويب لعرض الرسم البياني التفاعلي.
#### الخيار 2: استخدام سطر الأوامر
إذا كنت تعمل ضمن مشروع CrewAI منظم، يمكنك إنشاء رسم بياني باستخدام سطر الأوامر. هذا مفيد بشكل خاص للمشاريع الأكبر حيث تريد تصوير إعداد التدفق بالكامل.
```bash
crewai flow plot
```
سيُنشئ هذا الأمر ملف HTML مع الرسم البياني لتدفقك، مشابهًا لدالة `plot()`. سيتم حفظ الملف في مجلد مشروعك، ويمكنك فتحه في متصفح ويب لاستكشاف التدفق.
### فهم الرسم البياني
سيعرض الرسم البياني المولّد عُقدًا تمثل المهام في تدفقك، مع حواف موجّهة تشير إلى تدفق التنفيذ. الرسم البياني تفاعلي، مما يتيح لك التكبير والتصغير والتمرير فوق العقد لرؤية تفاصيل إضافية.
من خلال تصوير تدفقاتك، يمكنك الحصول على فهم أوضح لهيكل سير العمل، مما يسهّل تصحيح الأخطاء وتحسين عمليات الذكاء الاصطناعي والتواصل بشأنها مع الآخرين.
### الخلاصة
رسم تدفقاتك هو ميزة قوية في CrewAI تعزز قدرتك على تصميم وإدارة سير عمل الذكاء الاصطناعي المعقدة. سواء اخترت استخدام دالة `plot()` أو سطر الأوامر، فإن إنشاء الرسوم البيانية سيوفر لك تمثيلًا بصريًا لسير عملك، مما يساعد في التطوير والعرض.
## الخطوات التالية
إذا كنت مهتمًا باستكشاف أمثلة إضافية للتدفقات، لدينا مجموعة متنوعة من التوصيات في مستودع الأمثلة. إليك أربعة أمثلة تدفق محددة، كل منها يعرض حالات استخدام فريدة لمساعدتك في مطابقة نوع مشكلتك الحالية مع مثال محدد:
1. **تدفق الرد التلقائي على البريد الإلكتروني**: يوضح هذا المثال حلقة لا نهائية حيث تعمل مهمة خلفية باستمرار لأتمتة ردود البريد الإلكتروني. إنها حالة استخدام رائعة للمهام التي تحتاج إلى التنفيذ بشكل متكرر دون تدخل يدوي. [عرض المثال](https://github.com/crewAIInc/crewAI-examples/tree/main/email_auto_responder_flow)
2. **تدفق تقييم العملاء المحتملين**: يعرض هذا التدفق إضافة تغذية راجعة بشرية والتعامل مع فروع شرطية مختلفة باستخدام الموجّه. إنه مثال ممتاز لكيفية دمج اتخاذ القرارات الديناميكية والرقابة البشرية في سير عملك. [عرض المثال](https://github.com/crewAIInc/crewAI-examples/tree/main/lead-score-flow)
3. **تدفق كتابة كتاب**: يتفوق هذا المثال في ربط فرق Crew متعددة معًا، حيث تُستخدم مخرجات فريق واحد بواسطة فريق آخر. على وجه التحديد، يقوم فريق واحد بوضع مخطط لكتاب كامل، ويقوم فريق آخر بإنشاء فصول بناءً على المخطط. في النهاية، يتم ربط كل شيء لإنتاج كتاب كامل. هذا التدفق مثالي للعمليات المعقدة متعددة الخطوات التي تتطلب تنسيقًا بين مهام مختلفة. [عرض المثال](https://github.com/crewAIInc/crewAI-examples/tree/main/write_a_book_with_flows)
4. **تدفق مساعد الاجتماعات**: يوضح هذا التدفق كيفية بث حدث واحد لتشغيل إجراءات متابعة متعددة. على سبيل المثال، بعد اكتمال اجتماع، يمكن للتدفق تحديث لوحة Trello وإرسال رسالة Slack وحفظ النتائج. إنه مثال رائع للتعامل مع نتائج متعددة من حدث واحد، مما يجعله مثاليًا لإدارة المهام الشاملة وأنظمة الإشعارات. [عرض المثال](https://github.com/crewAIInc/crewAI-examples/tree/main/meeting_assistant_flow)
من خلال استكشاف هذه الأمثلة، يمكنك الحصول على رؤى حول كيفية الاستفادة من تدفقات CrewAI لحالات استخدام متنوعة، من أتمتة المهام المتكررة إلى إدارة العمليات المعقدة متعددة الخطوات مع اتخاذ القرارات الديناميكية والتغذية الراجعة البشرية.
أيضًا، شاهد فيديو YouTube الخاص بنا حول كيفية استخدام التدفقات في CrewAI أدناه!
<iframe
className="w-full aspect-video rounded-xl"
src="https://www.youtube.com/embed/MTb5my6VOT8"
title="CrewAI Flows overview"
frameBorder="0"
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
referrerPolicy="strict-origin-when-cross-origin"
allowFullScreen
></iframe>
## تشغيل التدفقات
هناك طريقتان لتشغيل التدفق:
### استخدام واجهة Flow API
يمكنك تشغيل تدفق برمجيًا عن طريق إنشاء مثيل من فئة التدفق واستدعاء دالة `kickoff()`:
```python
flow = ExampleFlow()
result = flow.kickoff()
```
### بث تنفيذ التدفق
للحصول على رؤية فورية لتنفيذ التدفق، يمكنك تفعيل البث لتلقي المخرجات فور توليدها:
```python
class StreamingFlow(Flow):
stream = True # Enable streaming
@start()
def research(self):
# Your flow implementation
pass
# Iterate over streaming output
flow = StreamingFlow()
streaming = flow.kickoff()
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access final result
result = streaming.result
```
تعرّف على المزيد حول البث في دليل [بث تنفيذ التدفق](/ar/learn/streaming-flow-execution).
## الذاكرة في التدفقات
يتمتع كل تدفق تلقائيًا بإمكانية الوصول إلى نظام [الذاكرة](/concepts/memory) الموحد في CrewAI. يمكنك تخزين الذكريات واسترجاعها واستخراجها مباشرة داخل أي دالة تدفق باستخدام ثلاث دوال مساعدة مدمجة.
### الدوال المدمجة
| الدالة | الوصف |
| :--- | :--- |
| `self.remember(content, **kwargs)` | تخزين المحتوى في الذاكرة. تقبل `scope` و `categories` و `metadata` و `importance` اختياريًا. |
| `self.recall(query, **kwargs)` | استرجاع الذكريات ذات الصلة. تقبل `scope` و `categories` و `limit` و `depth` اختياريًا. |
| `self.extract_memories(content)` | تفكيك النص الخام إلى عبارات ذاكرة منفصلة ومستقلة. |
يتم إنشاء مثيل `Memory()` افتراضي تلقائيًا عند تهيئة التدفق. يمكنك أيضًا تمرير مثيل مخصص:
```python
from crewai.flow.flow import Flow
from crewai import Memory
custom_memory = Memory(
recency_weight=0.5,
recency_half_life_days=7,
embedder={"provider": "ollama", "config": {"model_name": "mxbai-embed-large"}},
)
flow = MyFlow(memory=custom_memory)
```
### مثال: تدفق البحث والتحليل
```python
from crewai.flow.flow import Flow, listen, start
class ResearchAnalysisFlow(Flow):
@start()
def gather_data(self):
# Simulate research findings
findings = (
"PostgreSQL handles 10k concurrent connections with connection pooling. "
"MySQL caps at around 5k. MongoDB scales horizontally but adds complexity."
)
# Extract atomic facts and remember each one
memories = self.extract_memories(findings)
for mem in memories:
self.remember(mem, scope="/research/databases")
return findings
@listen(gather_data)
def analyze(self, raw_findings):
# Recall relevant past research (from this run or previous runs)
past = self.recall("database performance and scaling", limit=10, depth="shallow")
context_lines = [f"- {m.record.content}" for m in past]
context = "\n".join(context_lines) if context_lines else "No prior context."
return {
"new_findings": raw_findings,
"prior_context": context,
"total_memories": len(past),
}
flow = ResearchAnalysisFlow()
result = flow.kickoff()
print(result)
```
نظرًا لأن الذاكرة تستمر عبر عمليات التشغيل (مدعومة بـ LanceDB على القرص)، فإن خطوة `analyze` ستستدعي النتائج من عمليات التنفيذ السابقة أيضًا -- مما يتيح تدفقات تتعلم وتراكم المعرفة بمرور الوقت.
انظر [وثائق الذاكرة](/concepts/memory) لمزيد من التفاصيل حول النطاقات والشرائح والتسجيل المركب وإعداد المُضمِّن والمزيد.
### استخدام CLI
بدءًا من الإصدار 0.103.0، يمكنك تشغيل التدفقات باستخدام أمر `crewai run`:
```shell
crewai run
```
يكتشف هذا الأمر تلقائيًا ما إذا كان مشروعك تدفقًا (بناءً على إعداد `type = "flow"` في pyproject.toml الخاص بك) ويشغّله وفقًا لذلك. هذه هي الطريقة الموصى بها لتشغيل التدفقات من سطر الأوامر.
للتوافق مع الإصدارات السابقة، يمكنك أيضًا استخدام:
```shell
crewai flow kickoff
```
ومع ذلك، فإن أمر `crewai run` هو الطريقة المفضلة الآن لأنه يعمل لكل من فرق Crew والتدفقات.