mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-06 01:32:36 +00:00
## Summary
- Reverts `b0e2fda` ("fix(flow): add execution_id separate from state.id", COR-48): removes `Flow.execution_id` and points `current_flow_id` / `current_flow_request_id` back at `flow_id` (i.e. `state.id`). The separate per-run tracking id was no longer the right abstraction once `restore_from_state_id` reshapes how `state.id` is assigned;
- Adds an optional `restore_from_state_id` kwarg to `Flow.kickoff` / `Flow.kickoff_async` that hydrates state from a previously-persisted flow's latest snapshot
- Reassigns `state.id` to a fresh value (or `inputs["id"]` if pinned) so the new run's `@persist` writes don't extend the source's history
- Existing `inputs["id"]` resume, `@persist`, and `from_checkpoint` paths are unchanged
## Problem
`@persist` only supports *resume* today: `kickoff(inputs={"id": <uuid>})` hydrates state and continues writing under the same `flow_uuid`. There's no way to **fork** — hydrate from a snapshot but persist under a separate key, leaving the source's history intact. This PR adds that.
| | `state.id` after kickoff | `@persist` writes land under |
|---|---|---|
| `inputs["id"]` (resume) | supplied id | supplied id (extends history) |
| `restore_from_state_id` (fork) | fresh id, or `inputs["id"]` if pinned | new id (source preserved) |
## Behavior
| `inputs.id` | `restore_from_state_id` | Effect |
|---|---|---|
| — | — | Fresh kickoff |
| set | — | Existing resume |
| — | UUID | Fork — new `state.id`, hydrated from source |
| set | UUID | Fork into a pinned `state.id`, hydrated from source |
- Source not found → silent fallback (mirrors existing resume)
- Both `from_checkpoint` and `restore_from_state_id` set → `ValueError`
- `restore_from_state_id=None` → byte-identical to current main
## Design
Fork hydration runs before the existing `inputs` block in `kickoff_async`. On a hit, it calls the same `_restore_state` primitive used by resume, then overwrites `state.id` with a fresh UUID (or `inputs["id"]`). A `fork_succeeded` flag gates the existing `inputs["id"]` path so we don't double-load. `_completed_methods` / `_is_execution_resuming` are intentionally untouched — skip-completed-methods remains the territory of `apply_checkpoint` and `from_pending`.
## Test plan
- [ ] `pytest tests/test_flow_persistence.py` — 5 new tests (four-row matrix, not-found fallback, default no-op, conflict raise) + 6 existing as regression
- [ ] `pytest tests/test_flow.py` — broader flow suite
- [ ] Manual end-to-end against an HITL `@persist` flow
163 lines
7.0 KiB
Plaintext
163 lines
7.0 KiB
Plaintext
---
|
|
title: بنية الإنتاج
|
|
description: أفضل الممارسات لبناء تطبيقات ذكاء اصطناعي جاهزة للإنتاج مع CrewAI
|
|
icon: server
|
|
mode: "wide"
|
|
---
|
|
|
|
# عقلية التدفق أولاً
|
|
|
|
عند بناء تطبيقات ذكاء اصطناعي إنتاجية مع CrewAI، **نوصي بالبدء بتدفق (Flow)**.
|
|
|
|
بينما يمكن تشغيل أطقم أو وكلاء فرديين، فإن تغليفهم في تدفق يوفر الهيكل اللازم لتطبيق متين وقابل للتوسع.
|
|
|
|
## لماذا التدفقات؟
|
|
|
|
1. **إدارة الحالة**: توفر التدفقات طريقة مدمجة لإدارة الحالة عبر مراحل مختلفة من تطبيقك. هذا ضروري لتمرير البيانات بين الأطقم والحفاظ على السياق ومعالجة مدخلات المستخدم.
|
|
2. **التحكم**: تتيح لك التدفقات تحديد مسارات تنفيذ دقيقة، بما في ذلك الحلقات والشرطيات ومنطق التفريع. هذا أساسي لمعالجة الحالات الاستثنائية وضمان سلوك تطبيقك بشكل متوقع.
|
|
3. **المراقبة**: توفر التدفقات هيكلًا واضحًا يسهّل تتبع التنفيذ وتصحيح الأخطاء ومراقبة الأداء. نوصي باستخدام [تتبع CrewAI](/ar/observability/tracing) للحصول على رؤى تفصيلية. ما عليك سوى تشغيل `crewai login` لتفعيل ميزات المراقبة المجانية.
|
|
|
|
## البنية
|
|
|
|
يبدو تطبيق CrewAI الإنتاجي النموذجي هكذا:
|
|
|
|
```mermaid
|
|
graph TD
|
|
Start((Start)) --> Flow[Flow Orchestrator]
|
|
Flow --> State{State Management}
|
|
State --> Step1[Step 1: Data Gathering]
|
|
Step1 --> Crew1[Research Crew]
|
|
Crew1 --> State
|
|
State --> Step2{Condition Check}
|
|
Step2 -- "Valid" --> Step3[Step 3: Execution]
|
|
Step3 --> Crew2[Action Crew]
|
|
Step2 -- "Invalid" --> End((End))
|
|
Crew2 --> End
|
|
```
|
|
|
|
### 1. فئة التدفق
|
|
فئة `Flow` هي نقطة الدخول. تحدد مخطط الحالة والطرق التي تنفذ منطقك.
|
|
|
|
```python
|
|
from crewai.flow.flow import Flow, listen, start
|
|
from pydantic import BaseModel
|
|
|
|
class AppState(BaseModel):
|
|
user_input: str = ""
|
|
research_results: str = ""
|
|
final_report: str = ""
|
|
|
|
class ProductionFlow(Flow[AppState]):
|
|
@start()
|
|
def gather_input(self):
|
|
# ... منطق الحصول على المدخلات ...
|
|
pass
|
|
|
|
@listen(gather_input)
|
|
def run_research_crew(self):
|
|
# ... تشغيل طاقم ...
|
|
pass
|
|
```
|
|
|
|
### 2. إدارة الحالة
|
|
استخدم نماذج Pydantic لتعريف حالتك. يضمن هذا أمان الأنواع ويوضح البيانات المتاحة في كل مرحلة.
|
|
|
|
- **اجعلها بسيطة**: خزّن فقط ما تحتاجه للاستمرار بين المراحل.
|
|
- **استخدم بيانات منظمة**: تجنب القواميس غير المنظمة قدر الإمكان.
|
|
|
|
### 3. الأطقم كوحدات عمل
|
|
فوّض المهام المعقدة إلى الأطقم. يجب أن يكون الطاقم مركّزًا على هدف محدد (مثل "البحث في موضوع"، "كتابة مقال مدونة").
|
|
|
|
- **لا تبالغ في هندسة الأطقم**: اجعلها مركّزة.
|
|
- **مرر الحالة بشكل صريح**: مرر البيانات الضرورية من حالة التدفق إلى مدخلات الطاقم.
|
|
|
|
```python
|
|
@listen(gather_input)
|
|
def run_research_crew(self):
|
|
crew = ResearchCrew()
|
|
result = crew.kickoff(inputs={"topic": self.state.user_input})
|
|
self.state.research_results = result.raw
|
|
```
|
|
|
|
## عناصر التحكم الأولية
|
|
|
|
استفد من عناصر التحكم الأولية في CrewAI لإضافة المتانة والتحكم إلى أطقمك.
|
|
|
|
### 1. حواجز المهام
|
|
استخدم [حواجز المهام](/ar/concepts/tasks#task-guardrails) للتحقق من مخرجات المهام قبل قبولها. يضمن هذا أن وكلاءك ينتجون نتائج عالية الجودة.
|
|
|
|
```python
|
|
def validate_content(result: TaskOutput) -> Tuple[bool, Any]:
|
|
if len(result.raw) < 100:
|
|
return (False, "Content is too short. Please expand.")
|
|
return (True, result.raw)
|
|
|
|
task = Task(
|
|
...,
|
|
guardrail=validate_content
|
|
)
|
|
```
|
|
|
|
### 2. المخرجات المنظمة
|
|
استخدم دائمًا المخرجات المنظمة (`output_pydantic` أو `output_json`) عند تمرير البيانات بين المهام أو إلى تطبيقك. يمنع هذا أخطاء التحليل ويضمن أمان الأنواع.
|
|
|
|
```python
|
|
class ResearchResult(BaseModel):
|
|
summary: str
|
|
sources: List[str]
|
|
|
|
task = Task(
|
|
...,
|
|
output_pydantic=ResearchResult
|
|
)
|
|
```
|
|
|
|
### 3. خطافات LLM
|
|
استخدم [خطافات LLM](/ar/learn/llm-hooks) لفحص أو تعديل الرسائل قبل إرسالها إلى LLM، أو لتنقية الاستجابات.
|
|
|
|
```python
|
|
@before_llm_call
|
|
def log_request(context):
|
|
print(f"Agent {context.agent.role} is calling the LLM...")
|
|
```
|
|
|
|
## أنماط النشر
|
|
|
|
عند نشر تدفقك، ضع في اعتبارك ما يلي:
|
|
|
|
### CrewAI Enterprise
|
|
أسهل طريقة لنشر تدفقك هي استخدام CrewAI Enterprise. تتعامل مع البنية التحتية والمصادقة والمراقبة نيابة عنك.
|
|
|
|
راجع [دليل النشر](/ar/enterprise/guides/deploy-to-amp) للبدء.
|
|
|
|
```bash
|
|
crewai deploy create
|
|
```
|
|
|
|
### التنفيذ غير المتزامن
|
|
للمهام طويلة التشغيل، استخدم `kickoff_async` لتجنب حظر واجهتك البرمجية.
|
|
|
|
### الاستمرارية
|
|
استخدم مزيّن `@persist` لحفظ حالة تدفقك في قاعدة بيانات. يتيح لك هذا استئناف التنفيذ إذا تعطلت العملية أو إذا كنت بحاجة لانتظار مدخلات بشرية.
|
|
|
|
```python
|
|
@persist
|
|
class ProductionFlow(Flow[AppState]):
|
|
# ...
|
|
```
|
|
|
|
افتراضيًا، يستأنف `@persist` تدفقًا عند توفير `kickoff(inputs={"id": <uuid>})`، مما يمدّ نفس تاريخ `flow_uuid`. لـ **تفرع** تدفق مستمر إلى نسبٍ جديد — ترطيب الحالة من تشغيل سابق ولكن الكتابة تحت `state.id` جديد — مرّر `restore_from_state_id`:
|
|
|
|
```python
|
|
flow.kickoff(restore_from_state_id="<previous-run-state-id>")
|
|
```
|
|
|
|
يحصل التشغيل الجديد على `state.id` جديد (مولّد تلقائيًا، أو `inputs["id"]` إذا تم تثبيته) لذا لا تمتد كتابات `@persist` الخاصة به إلى تاريخ المصدر. الجمع مع `from_checkpoint` يطلق `ValueError`؛ اختر مصدر ترطيب واحدًا.
|
|
|
|
## الخلاصة
|
|
|
|
- **ابدأ بتدفق.**
|
|
- **حدد حالة واضحة.**
|
|
- **استخدم الأطقم للمهام المعقدة.**
|
|
- **انشر مع API واستمرارية.**
|