mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-05 17:22:36 +00:00
## Summary
- Reverts `b0e2fda` ("fix(flow): add execution_id separate from state.id", COR-48): removes `Flow.execution_id` and points `current_flow_id` / `current_flow_request_id` back at `flow_id` (i.e. `state.id`). The separate per-run tracking id was no longer the right abstraction once `restore_from_state_id` reshapes how `state.id` is assigned;
- Adds an optional `restore_from_state_id` kwarg to `Flow.kickoff` / `Flow.kickoff_async` that hydrates state from a previously-persisted flow's latest snapshot
- Reassigns `state.id` to a fresh value (or `inputs["id"]` if pinned) so the new run's `@persist` writes don't extend the source's history
- Existing `inputs["id"]` resume, `@persist`, and `from_checkpoint` paths are unchanged
## Problem
`@persist` only supports *resume* today: `kickoff(inputs={"id": <uuid>})` hydrates state and continues writing under the same `flow_uuid`. There's no way to **fork** — hydrate from a snapshot but persist under a separate key, leaving the source's history intact. This PR adds that.
| | `state.id` after kickoff | `@persist` writes land under |
|---|---|---|
| `inputs["id"]` (resume) | supplied id | supplied id (extends history) |
| `restore_from_state_id` (fork) | fresh id, or `inputs["id"]` if pinned | new id (source preserved) |
## Behavior
| `inputs.id` | `restore_from_state_id` | Effect |
|---|---|---|
| — | — | Fresh kickoff |
| set | — | Existing resume |
| — | UUID | Fork — new `state.id`, hydrated from source |
| set | UUID | Fork into a pinned `state.id`, hydrated from source |
- Source not found → silent fallback (mirrors existing resume)
- Both `from_checkpoint` and `restore_from_state_id` set → `ValueError`
- `restore_from_state_id=None` → byte-identical to current main
## Design
Fork hydration runs before the existing `inputs` block in `kickoff_async`. On a hit, it calls the same `_restore_state` primitive used by resume, then overwrites `state.id` with a fresh UUID (or `inputs["id"]`). A `fork_succeeded` flag gates the existing `inputs["id"]` path so we don't double-load. `_completed_methods` / `_is_execution_resuming` are intentionally untouched — skip-completed-methods remains the territory of `apply_checkpoint` and `from_pending`.
## Test plan
- [ ] `pytest tests/test_flow_persistence.py` — 5 new tests (four-row matrix, not-found fallback, default no-op, conflict raise) + 6 existing as regression
- [ ] `pytest tests/test_flow.py` — broader flow suite
- [ ] Manual end-to-end against an HITL `@persist` flow
227 lines
9.6 KiB
Plaintext
227 lines
9.6 KiB
Plaintext
---
|
|
title: إتقان إدارة حالة Flow
|
|
description: دليل شامل لإدارة الحالة وحفظها والاستفادة منها في CrewAI Flows لبناء تطبيقات AI قوية.
|
|
icon: diagram-project
|
|
mode: "wide"
|
|
---
|
|
|
|
## فهم قوة الحالة في Flows
|
|
|
|
إدارة الحالة هي العمود الفقري لأي سير عمل AI متطور. في CrewAI Flows، يتيح لك نظام الحالة الحفاظ على السياق ومشاركة البيانات بين الخطوات وبناء منطق تطبيق معقد. إتقان إدارة الحالة ضروري لإنشاء تطبيقات AI موثوقة وقابلة للصيانة وقوية.
|
|
|
|
### لماذا تهم إدارة الحالة
|
|
|
|
تمكّنك إدارة الحالة الفعّالة من:
|
|
|
|
1. **الحفاظ على السياق عبر خطوات التنفيذ** - تمرير المعلومات بسلاسة بين مراحل سير العمل المختلفة
|
|
2. **بناء منطق شرطي معقد** - اتخاذ قرارات بناءً على البيانات المتراكمة
|
|
3. **إنشاء تطبيقات مستمرة** - حفظ واستعادة تقدم سير العمل
|
|
4. **معالجة الأخطاء بلطف** - تنفيذ أنماط استرداد لتطبيقات أكثر قوة
|
|
5. **توسيع تطبيقاتك** - دعم سير العمل المعقدة بتنظيم بيانات مناسب
|
|
6. **تمكين التطبيقات الحوارية** - تخزين والوصول إلى سجل المحادثات للتفاعلات الواعية بالسياق
|
|
|
|
## أساسيات إدارة الحالة
|
|
|
|
### نهجان لإدارة الحالة
|
|
|
|
يوفر CrewAI طريقتين لإدارة الحالة في Flows:
|
|
|
|
1. **الحالة غير المنظمة** - استخدام كائنات شبيهة بالقاموس للمرونة
|
|
2. **الحالة المنظمة** - استخدام نماذج Pydantic لسلامة الأنواع والتحقق
|
|
|
|
### مثال الحالة غير المنظمة
|
|
|
|
```python
|
|
from crewai.flow.flow import Flow, listen, start
|
|
|
|
class UnstructuredStateFlow(Flow):
|
|
@start()
|
|
def initialize_data(self):
|
|
self.state["user_name"] = "Alex"
|
|
self.state["preferences"] = {"theme": "dark", "language": "English"}
|
|
self.state["items"] = []
|
|
return "Initialized"
|
|
|
|
@listen(initialize_data)
|
|
def process_data(self, previous_result):
|
|
user = self.state["user_name"]
|
|
self.state["items"].append("item1")
|
|
self.state["processed"] = True
|
|
return "Processed"
|
|
|
|
flow = UnstructuredStateFlow()
|
|
result = flow.kickoff()
|
|
```
|
|
|
|
### مثال الحالة المنظمة
|
|
|
|
```python
|
|
from crewai.flow.flow import Flow, listen, start
|
|
from pydantic import BaseModel, Field
|
|
from typing import List, Dict, Optional
|
|
|
|
class AppState(BaseModel):
|
|
user_name: str = ""
|
|
items: List[str] = []
|
|
processed: bool = False
|
|
completion_percentage: float = 0.0
|
|
|
|
class StructuredStateFlow(Flow[AppState]):
|
|
@start()
|
|
def initialize_data(self):
|
|
self.state.user_name = "Taylor"
|
|
return "Initialized"
|
|
|
|
@listen(initialize_data)
|
|
def process_data(self, previous_result):
|
|
self.state.items.append("item1")
|
|
self.state.processed = True
|
|
self.state.completion_percentage = 50.0
|
|
return "Processed"
|
|
|
|
flow = StructuredStateFlow()
|
|
result = flow.kickoff()
|
|
```
|
|
|
|
### فوائد الحالة المنظمة
|
|
|
|
1. **سلامة الأنواع** - اكتشاف أخطاء الأنواع في وقت التطوير
|
|
2. **توثيق ذاتي** - نموذج الحالة يوثّق بوضوح البيانات المتاحة
|
|
3. **التحقق** - التحقق التلقائي من أنواع البيانات والقيود
|
|
4. **دعم IDE** - إكمال تلقائي وتوثيق مضمّن
|
|
5. **قيم افتراضية** - تعريف بدائل سهلة للبيانات المفقودة
|
|
|
|
## حفظ حالة Flow
|
|
|
|
يوفر مزخرف `@persist()` حفظ حالة تلقائي عند نقاط رئيسية في التنفيذ.
|
|
|
|
```python
|
|
from crewai.flow.flow import Flow, listen, start
|
|
from crewai.flow.persistence import persist
|
|
from pydantic import BaseModel
|
|
|
|
class CounterState(BaseModel):
|
|
value: int = 0
|
|
|
|
@persist()
|
|
class PersistentCounterFlow(Flow[CounterState]):
|
|
@start()
|
|
def increment(self):
|
|
self.state.value += 1
|
|
return self.state.value
|
|
|
|
@listen(increment)
|
|
def double(self, value):
|
|
self.state.value = value * 2
|
|
return self.state.value
|
|
```
|
|
|
|
#### تفرع الحالة المستمرة
|
|
|
|
يدعم `@persist` نمطين متميزين للترطيب في `kickoff` / `kickoff_async`. استخدم **استئناف** (`inputs["id"]`) لمواصلة نفس النسب؛ استخدم **تفرع** (`restore_from_state_id`) لبدء نسبٍ جديد من لقطة:
|
|
|
|
| | `state.id` بعد kickoff | كتابات `@persist` تذهب إلى |
|
|
|---|---|---|
|
|
| `inputs["id"]` (استئناف) | المعرّف المقدم | المعرّف المقدم (يمد التاريخ) |
|
|
| `restore_from_state_id` (تفرع) | معرّف جديد، أو `inputs["id"]` إذا ثُبّت | المعرّف الجديد (المصدر محفوظ) |
|
|
|
|
```python
|
|
from crewai.flow.flow import Flow, start
|
|
from crewai.flow.persistence import persist
|
|
from pydantic import BaseModel
|
|
|
|
class CounterState(BaseModel):
|
|
id: str = ""
|
|
counter: int = 0
|
|
|
|
@persist
|
|
class CounterFlow(Flow[CounterState]):
|
|
@start()
|
|
def step(self):
|
|
self.state.counter += 1
|
|
|
|
# التشغيل 1: حالة جديدة، العداد 0 -> 1
|
|
flow_1 = CounterFlow()
|
|
flow_1.kickoff()
|
|
|
|
# التفرع: الترطيب من أحدث لقطة لـ flow_1، لكن الكتابة تحت state.id جديد
|
|
flow_2 = CounterFlow()
|
|
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
|
|
# يبدأ flow_2 بـ counter=1 (مرطّب)، ثم تزيده step() إلى 2.
|
|
# تاريخ flow_uuid لـ flow_1 لم يتغيّر.
|
|
```
|
|
|
|
ملاحظات السلوك:
|
|
|
|
- `restore_from_state_id` غير موجود في الاستمرارية → يعود kickoff بصمت إلى السلوك الافتراضي (يعكس سلوك `inputs["id"]` عند عدم العثور عليه). لا يُطلق أي استثناء.
|
|
- الجمع بين `restore_from_state_id` و `from_checkpoint` يطلق `ValueError` — يستهدفان نظامي حالة مختلفين (`@persist` مقابل Checkpointing) ولا يمكن الجمع بينهما.
|
|
- `restore_from_state_id=None` (افتراضي) متطابق بايت ببايت مع kickoff بدون المعامل.
|
|
- تثبيت `inputs["id"]` أثناء التفرع يعني أن التشغيل الجديد يشارك مفتاح الاستمرارية مع تدفق آخر — عادةً ما تريد فقط `restore_from_state_id`.
|
|
|
|
## أنماط حالة متقدمة
|
|
|
|
### المنطق الشرطي المبني على الحالة
|
|
|
|
```python
|
|
from crewai.flow.flow import Flow, listen, router, start
|
|
from pydantic import BaseModel
|
|
|
|
class PaymentState(BaseModel):
|
|
amount: float = 0.0
|
|
is_approved: bool = False
|
|
retry_count: int = 0
|
|
|
|
class PaymentFlow(Flow[PaymentState]):
|
|
@start()
|
|
def process_payment(self):
|
|
self.state.amount = 100.0
|
|
self.state.is_approved = self.state.amount < 1000
|
|
return "Payment processed"
|
|
|
|
@router(process_payment)
|
|
def check_approval(self, previous_result):
|
|
if self.state.is_approved:
|
|
return "approved"
|
|
elif self.state.retry_count < 3:
|
|
return "retry"
|
|
else:
|
|
return "rejected"
|
|
|
|
@listen("approved")
|
|
def handle_approval(self):
|
|
return f"Payment of ${self.state.amount} approved!"
|
|
|
|
@listen("retry")
|
|
def handle_retry(self):
|
|
self.state.retry_count += 1
|
|
return "Retry initiated"
|
|
|
|
@listen("rejected")
|
|
def handle_rejection(self):
|
|
return f"Payment of ${self.state.amount} rejected after {self.state.retry_count} retries."
|
|
```
|
|
|
|
## أفضل الممارسات لإدارة الحالة
|
|
|
|
1. **اجعل الحالة مركّزة** - صمم الحالة لتحتوي فقط على ما هو ضروري
|
|
2. **استخدم الحالة المنظمة للـ Flows المعقدة** - مع نمو التعقيد تصبح الحالة المنظمة أكثر قيمة
|
|
3. **وثّق انتقالات الحالة** - للـ Flows المعقدة، وثّق كيف تتغير الحالة عبر التنفيذ
|
|
4. **عالج أخطاء الحالة بلطف** - طبّق معالجة أخطاء للوصول إلى الحالة
|
|
5. **استخدم الحالة لتتبع التقدم** - استفد من الحالة لتتبع التقدم في Flows طويلة التشغيل
|
|
6. **استخدم العمليات غير المتغيرة عند الإمكان** - خاصة مع الحالة المنظمة
|
|
|
|
## الخلاصة
|
|
|
|
إتقان إدارة الحالة في CrewAI Flows يمنحك القدرة على بناء تطبيقات AI متطورة وقوية تحافظ على السياق وتتخذ قرارات معقدة وتقدم نتائج متسقة.
|
|
|
|
<Check>
|
|
لقد أتقنت الآن مفاهيم وممارسات إدارة الحالة في CrewAI Flows! بهذه المعرفة، يمكنك إنشاء سير عمل AI قوية تحافظ على السياق بفعالية وتشارك البيانات بين الخطوات وتبني منطق تطبيق متطور.
|
|
</Check>
|
|
|
|
## الخطوات التالية
|
|
|
|
- جرّب الحالة المنظمة وغير المنظمة في Flows
|
|
- جرّب تطبيق حفظ الحالة لسير العمل طويلة التشغيل
|
|
- استكشف [بناء أول Crew](/ar/guides/crews/first-crew) لمعرفة كيف تعمل Crews وFlows معًا
|
|
- اطلع على [توثيق مرجع Flow](/ar/concepts/flows) لمزيد من الميزات المتقدمة
|