Files
crewAI/docs/ar/guides/flows/mastering-flow-state.mdx
Tiago Freire cd2b9ee38a feat(flow): add restore_from_state_id kickoff parameter (#5674)
## Summary

- Reverts `b0e2fda` ("fix(flow): add execution_id separate from state.id", COR-48): removes `Flow.execution_id` and points `current_flow_id` / `current_flow_request_id` back at `flow_id` (i.e. `state.id`). The separate per-run tracking id was no longer the right abstraction once `restore_from_state_id` reshapes how `state.id` is assigned;

- Adds an optional `restore_from_state_id` kwarg to `Flow.kickoff` / `Flow.kickoff_async` that hydrates state from a previously-persisted flow's latest snapshot

- Reassigns `state.id` to a fresh value (or `inputs["id"]` if pinned) so the new run's `@persist` writes don't extend the source's history

- Existing `inputs["id"]` resume, `@persist`, and `from_checkpoint` paths are unchanged

## Problem
`@persist` only supports *resume* today: `kickoff(inputs={"id": <uuid>})` hydrates state and continues writing under the same `flow_uuid`. There's no way to **fork** — hydrate from a snapshot but persist under a separate key, leaving the source's history intact. This PR adds that.

| | `state.id` after kickoff | `@persist` writes land under |
|---|---|---|
| `inputs["id"]` (resume) | supplied id | supplied id (extends history) |
| `restore_from_state_id` (fork) | fresh id, or `inputs["id"]` if pinned | new id (source preserved) |

## Behavior

| `inputs.id` | `restore_from_state_id` | Effect |
|---|---|---|
| — | — | Fresh kickoff |
| set | — | Existing resume |
| — | UUID | Fork — new `state.id`, hydrated from source |
| set | UUID | Fork into a pinned `state.id`, hydrated from source |

- Source not found → silent fallback (mirrors existing resume)
- Both `from_checkpoint` and `restore_from_state_id` set → `ValueError`
- `restore_from_state_id=None` → byte-identical to current main

## Design
Fork hydration runs before the existing `inputs` block in `kickoff_async`. On a hit, it calls the same `_restore_state` primitive used by resume, then overwrites `state.id` with a fresh UUID (or `inputs["id"]`). A `fork_succeeded` flag gates the existing `inputs["id"]` path so we don't double-load. `_completed_methods` / `_is_execution_resuming` are intentionally untouched — skip-completed-methods remains the territory of `apply_checkpoint` and `from_pending`.

## Test plan
- [ ] `pytest tests/test_flow_persistence.py` — 5 new tests (four-row matrix, not-found fallback, default no-op, conflict raise) + 6 existing as regression
- [ ] `pytest tests/test_flow.py` — broader flow suite
- [ ] Manual end-to-end against an HITL `@persist` flow
2026-05-01 11:46:07 -04:00

227 lines
9.6 KiB
Plaintext

---
title: إتقان إدارة حالة Flow
description: دليل شامل لإدارة الحالة وحفظها والاستفادة منها في CrewAI Flows لبناء تطبيقات AI قوية.
icon: diagram-project
mode: "wide"
---
## فهم قوة الحالة في Flows
إدارة الحالة هي العمود الفقري لأي سير عمل AI متطور. في CrewAI Flows، يتيح لك نظام الحالة الحفاظ على السياق ومشاركة البيانات بين الخطوات وبناء منطق تطبيق معقد. إتقان إدارة الحالة ضروري لإنشاء تطبيقات AI موثوقة وقابلة للصيانة وقوية.
### لماذا تهم إدارة الحالة
تمكّنك إدارة الحالة الفعّالة من:
1. **الحفاظ على السياق عبر خطوات التنفيذ** - تمرير المعلومات بسلاسة بين مراحل سير العمل المختلفة
2. **بناء منطق شرطي معقد** - اتخاذ قرارات بناءً على البيانات المتراكمة
3. **إنشاء تطبيقات مستمرة** - حفظ واستعادة تقدم سير العمل
4. **معالجة الأخطاء بلطف** - تنفيذ أنماط استرداد لتطبيقات أكثر قوة
5. **توسيع تطبيقاتك** - دعم سير العمل المعقدة بتنظيم بيانات مناسب
6. **تمكين التطبيقات الحوارية** - تخزين والوصول إلى سجل المحادثات للتفاعلات الواعية بالسياق
## أساسيات إدارة الحالة
### نهجان لإدارة الحالة
يوفر CrewAI طريقتين لإدارة الحالة في Flows:
1. **الحالة غير المنظمة** - استخدام كائنات شبيهة بالقاموس للمرونة
2. **الحالة المنظمة** - استخدام نماذج Pydantic لسلامة الأنواع والتحقق
### مثال الحالة غير المنظمة
```python
from crewai.flow.flow import Flow, listen, start
class UnstructuredStateFlow(Flow):
@start()
def initialize_data(self):
self.state["user_name"] = "Alex"
self.state["preferences"] = {"theme": "dark", "language": "English"}
self.state["items"] = []
return "Initialized"
@listen(initialize_data)
def process_data(self, previous_result):
user = self.state["user_name"]
self.state["items"].append("item1")
self.state["processed"] = True
return "Processed"
flow = UnstructuredStateFlow()
result = flow.kickoff()
```
### مثال الحالة المنظمة
```python
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
class AppState(BaseModel):
user_name: str = ""
items: List[str] = []
processed: bool = False
completion_percentage: float = 0.0
class StructuredStateFlow(Flow[AppState]):
@start()
def initialize_data(self):
self.state.user_name = "Taylor"
return "Initialized"
@listen(initialize_data)
def process_data(self, previous_result):
self.state.items.append("item1")
self.state.processed = True
self.state.completion_percentage = 50.0
return "Processed"
flow = StructuredStateFlow()
result = flow.kickoff()
```
### فوائد الحالة المنظمة
1. **سلامة الأنواع** - اكتشاف أخطاء الأنواع في وقت التطوير
2. **توثيق ذاتي** - نموذج الحالة يوثّق بوضوح البيانات المتاحة
3. **التحقق** - التحقق التلقائي من أنواع البيانات والقيود
4. **دعم IDE** - إكمال تلقائي وتوثيق مضمّن
5. **قيم افتراضية** - تعريف بدائل سهلة للبيانات المفقودة
## حفظ حالة Flow
يوفر مزخرف `@persist()` حفظ حالة تلقائي عند نقاط رئيسية في التنفيذ.
```python
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
value: int = 0
@persist()
class PersistentCounterFlow(Flow[CounterState]):
@start()
def increment(self):
self.state.value += 1
return self.state.value
@listen(increment)
def double(self, value):
self.state.value = value * 2
return self.state.value
```
#### تفرع الحالة المستمرة
يدعم `@persist` نمطين متميزين للترطيب في `kickoff` / `kickoff_async`. استخدم **استئناف** (`inputs["id"]`) لمواصلة نفس النسب؛ استخدم **تفرع** (`restore_from_state_id`) لبدء نسبٍ جديد من لقطة:
| | `state.id` بعد kickoff | كتابات `@persist` تذهب إلى |
|---|---|---|
| `inputs["id"]` (استئناف) | المعرّف المقدم | المعرّف المقدم (يمد التاريخ) |
| `restore_from_state_id` (تفرع) | معرّف جديد، أو `inputs["id"]` إذا ثُبّت | المعرّف الجديد (المصدر محفوظ) |
```python
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@start()
def step(self):
self.state.counter += 1
# التشغيل 1: حالة جديدة، العداد 0 -> 1
flow_1 = CounterFlow()
flow_1.kickoff()
# التفرع: الترطيب من أحدث لقطة لـ flow_1، لكن الكتابة تحت state.id جديد
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# يبدأ flow_2 بـ counter=1 (مرطّب)، ثم تزيده step() إلى 2.
# تاريخ flow_uuid لـ flow_1 لم يتغيّر.
```
ملاحظات السلوك:
- `restore_from_state_id` غير موجود في الاستمرارية → يعود kickoff بصمت إلى السلوك الافتراضي (يعكس سلوك `inputs["id"]` عند عدم العثور عليه). لا يُطلق أي استثناء.
- الجمع بين `restore_from_state_id` و `from_checkpoint` يطلق `ValueError` — يستهدفان نظامي حالة مختلفين (`@persist` مقابل Checkpointing) ولا يمكن الجمع بينهما.
- `restore_from_state_id=None` (افتراضي) متطابق بايت ببايت مع kickoff بدون المعامل.
- تثبيت `inputs["id"]` أثناء التفرع يعني أن التشغيل الجديد يشارك مفتاح الاستمرارية مع تدفق آخر — عادةً ما تريد فقط `restore_from_state_id`.
## أنماط حالة متقدمة
### المنطق الشرطي المبني على الحالة
```python
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class PaymentState(BaseModel):
amount: float = 0.0
is_approved: bool = False
retry_count: int = 0
class PaymentFlow(Flow[PaymentState]):
@start()
def process_payment(self):
self.state.amount = 100.0
self.state.is_approved = self.state.amount < 1000
return "Payment processed"
@router(process_payment)
def check_approval(self, previous_result):
if self.state.is_approved:
return "approved"
elif self.state.retry_count < 3:
return "retry"
else:
return "rejected"
@listen("approved")
def handle_approval(self):
return f"Payment of ${self.state.amount} approved!"
@listen("retry")
def handle_retry(self):
self.state.retry_count += 1
return "Retry initiated"
@listen("rejected")
def handle_rejection(self):
return f"Payment of ${self.state.amount} rejected after {self.state.retry_count} retries."
```
## أفضل الممارسات لإدارة الحالة
1. **اجعل الحالة مركّزة** - صمم الحالة لتحتوي فقط على ما هو ضروري
2. **استخدم الحالة المنظمة للـ Flows المعقدة** - مع نمو التعقيد تصبح الحالة المنظمة أكثر قيمة
3. **وثّق انتقالات الحالة** - للـ Flows المعقدة، وثّق كيف تتغير الحالة عبر التنفيذ
4. **عالج أخطاء الحالة بلطف** - طبّق معالجة أخطاء للوصول إلى الحالة
5. **استخدم الحالة لتتبع التقدم** - استفد من الحالة لتتبع التقدم في Flows طويلة التشغيل
6. **استخدم العمليات غير المتغيرة عند الإمكان** - خاصة مع الحالة المنظمة
## الخلاصة
إتقان إدارة الحالة في CrewAI Flows يمنحك القدرة على بناء تطبيقات AI متطورة وقوية تحافظ على السياق وتتخذ قرارات معقدة وتقدم نتائج متسقة.
<Check>
لقد أتقنت الآن مفاهيم وممارسات إدارة الحالة في CrewAI Flows! بهذه المعرفة، يمكنك إنشاء سير عمل AI قوية تحافظ على السياق بفعالية وتشارك البيانات بين الخطوات وتبني منطق تطبيق متطور.
</Check>
## الخطوات التالية
- جرّب الحالة المنظمة وغير المنظمة في Flows
- جرّب تطبيق حفظ الحالة لسير العمل طويلة التشغيل
- استكشف [بناء أول Crew](/ar/guides/crews/first-crew) لمعرفة كيف تعمل Crews وFlows معًا
- اطلع على [توثيق مرجع Flow](/ar/concepts/flows) لمزيد من الميزات المتقدمة