From cd2b9ee38a1d40eb64d811b967ccb23d3df58745 Mon Sep 17 00:00:00 2001 From: Tiago Freire Date: Fri, 1 May 2026 12:46:07 -0300 Subject: [PATCH] feat(flow): add `restore_from_state_id` kickoff parameter (#5674) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary - Reverts `b0e2fda` ("fix(flow): add execution_id separate from state.id", COR-48): removes `Flow.execution_id` and points `current_flow_id` / `current_flow_request_id` back at `flow_id` (i.e. `state.id`). The separate per-run tracking id was no longer the right abstraction once `restore_from_state_id` reshapes how `state.id` is assigned; - Adds an optional `restore_from_state_id` kwarg to `Flow.kickoff` / `Flow.kickoff_async` that hydrates state from a previously-persisted flow's latest snapshot - Reassigns `state.id` to a fresh value (or `inputs["id"]` if pinned) so the new run's `@persist` writes don't extend the source's history - Existing `inputs["id"]` resume, `@persist`, and `from_checkpoint` paths are unchanged ## Problem `@persist` only supports *resume* today: `kickoff(inputs={"id": })` hydrates state and continues writing under the same `flow_uuid`. There's no way to **fork** — hydrate from a snapshot but persist under a separate key, leaving the source's history intact. This PR adds that. | | `state.id` after kickoff | `@persist` writes land under | |---|---|---| | `inputs["id"]` (resume) | supplied id | supplied id (extends history) | | `restore_from_state_id` (fork) | fresh id, or `inputs["id"]` if pinned | new id (source preserved) | ## Behavior | `inputs.id` | `restore_from_state_id` | Effect | |---|---|---| | — | — | Fresh kickoff | | set | — | Existing resume | | — | UUID | Fork — new `state.id`, hydrated from source | | set | UUID | Fork into a pinned `state.id`, hydrated from source | - Source not found → silent fallback (mirrors existing resume) - Both `from_checkpoint` and `restore_from_state_id` set → `ValueError` - `restore_from_state_id=None` → byte-identical to current main ## Design Fork hydration runs before the existing `inputs` block in `kickoff_async`. On a hit, it calls the same `_restore_state` primitive used by resume, then overwrites `state.id` with a fresh UUID (or `inputs["id"]`). A `fork_succeeded` flag gates the existing `inputs["id"]` path so we don't double-load. `_completed_methods` / `_is_execution_resuming` are intentionally untouched — skip-completed-methods remains the territory of `apply_checkpoint` and `from_pending`. ## Test plan - [ ] `pytest tests/test_flow_persistence.py` — 5 new tests (four-row matrix, not-found fallback, default no-op, conflict raise) + 6 existing as regression - [ ] `pytest tests/test_flow.py` — broader flow suite - [ ] Manual end-to-end against an HITL `@persist` flow --- docs/ar/concepts/flows.mdx | 36 +++ docs/ar/concepts/production-architecture.mdx | 8 + docs/ar/guides/flows/mastering-flow-state.mdx | 42 +++ docs/en/concepts/flows.mdx | 36 +++ docs/en/concepts/production-architecture.mdx | 8 + docs/en/guides/flows/mastering-flow-state.mdx | 42 +++ docs/ko/concepts/flows.mdx | 36 +++ docs/ko/concepts/production-architecture.mdx | 8 + docs/ko/guides/flows/mastering-flow-state.mdx | 42 +++ docs/pt-BR/concepts/flows.mdx | 36 +++ .../concepts/production-architecture.mdx | 8 + .../guides/flows/mastering-flow-state.mdx | 42 +++ lib/crewai/src/crewai/flow/flow.py | 126 ++++++--- lib/crewai/tests/test_crew.py | 8 +- lib/crewai/tests/test_flow_execution_id.py | 127 --------- lib/crewai/tests/test_flow_persistence.py | 240 ++++++++++++++++++ 16 files changed, 683 insertions(+), 162 deletions(-) delete mode 100644 lib/crewai/tests/test_flow_execution_id.py diff --git a/docs/ar/concepts/flows.mdx b/docs/ar/concepts/flows.mdx index 8c01bdd97..2aa62c2d9 100644 --- a/docs/ar/concepts/flows.mdx +++ b/docs/ar/concepts/flows.mdx @@ -380,6 +380,42 @@ class AnotherFlow(Flow[dict]): print("Method-level persisted runs:", self.state["runs"]) ``` +### تفرع الحالة المستمرة + +يدعم `@persist` نمطين متميزين للترطيب في `kickoff` / `kickoff_async`: + +- `kickoff(inputs={"id": })` — **استئناف**: يحمّل أحدث لقطة لـ UUID المقدم ويستمر في الكتابة تحت نفس `flow_uuid`. يمتد التاريخ. +- `kickoff(restore_from_state_id=)` — **تفرع**: يحمّل أحدث لقطة لـ UUID المقدم، يرطّب حالة التشغيل الجديد منها، ثم يعيّن `state.id` جديدًا (مولّدًا تلقائيًا، أو `inputs["id"]` إذا تم تثبيته). تذهب كتابات `@persist` للتشغيل الجديد تحت `state.id` الجديد؛ يتم الحفاظ على تاريخ تدفق المصدر. + +```python +from crewai.flow.flow import Flow, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class CounterState(BaseModel): + id: str = "" + counter: int = 0 + +@persist +class CounterFlow(Flow[CounterState]): + @start() + def step(self): + self.state.counter += 1 + print(f"[id={self.state.id}] counter={self.state.counter}") + +# التشغيل 1: حالة جديدة، العداد 0 -> 1، محفوظ تحت flow_1.state.id +flow_1 = CounterFlow() +flow_1.kickoff() + +# التفرع: ترطيب من أحدث لقطة لـ flow_1، لكن باستخدام state.id جديد +flow_2 = CounterFlow() +flow_2.kickoff(restore_from_state_id=flow_1.state.id) +# يبدأ flow_2.state.counter بـ 1 (مرطّب)، ثم تزيده step() إلى 2. +# flow_2.state.id != flow_1.state.id؛ تاريخ flow_1 لم يتغيّر. +``` + +إذا لم يطابق `restore_from_state_id` المقدم أي حالة مستمرة، يعود kickoff بصمت إلى السلوك الافتراضي — نفس سلوك `inputs["id"]` عند عدم العثور عليه. الجمع بين `restore_from_state_id` و `from_checkpoint` يطلق `ValueError`؛ اختر مصدر ترطيب واحدًا. تثبيت `inputs["id"]` أثناء التفرع يشارك مفتاح الاستمرارية مع تدفق آخر — عادةً ما تريد استخدام `restore_from_state_id` فقط. + ### كيف تعمل 1. **تعريف الحالة الفريد** diff --git a/docs/ar/concepts/production-architecture.mdx b/docs/ar/concepts/production-architecture.mdx index 19ba0cecb..9dee3a734 100644 --- a/docs/ar/concepts/production-architecture.mdx +++ b/docs/ar/concepts/production-architecture.mdx @@ -146,6 +146,14 @@ class ProductionFlow(Flow[AppState]): # ... ``` +افتراضيًا، يستأنف `@persist` تدفقًا عند توفير `kickoff(inputs={"id": })`، مما يمدّ نفس تاريخ `flow_uuid`. لـ **تفرع** تدفق مستمر إلى نسبٍ جديد — ترطيب الحالة من تشغيل سابق ولكن الكتابة تحت `state.id` جديد — مرّر `restore_from_state_id`: + +```python +flow.kickoff(restore_from_state_id="") +``` + +يحصل التشغيل الجديد على `state.id` جديد (مولّد تلقائيًا، أو `inputs["id"]` إذا تم تثبيته) لذا لا تمتد كتابات `@persist` الخاصة به إلى تاريخ المصدر. الجمع مع `from_checkpoint` يطلق `ValueError`؛ اختر مصدر ترطيب واحدًا. + ## الخلاصة - **ابدأ بتدفق.** diff --git a/docs/ar/guides/flows/mastering-flow-state.mdx b/docs/ar/guides/flows/mastering-flow-state.mdx index 64874e39c..09e56c3df 100644 --- a/docs/ar/guides/flows/mastering-flow-state.mdx +++ b/docs/ar/guides/flows/mastering-flow-state.mdx @@ -116,6 +116,48 @@ class PersistentCounterFlow(Flow[CounterState]): return self.state.value ``` +#### تفرع الحالة المستمرة + +يدعم `@persist` نمطين متميزين للترطيب في `kickoff` / `kickoff_async`. استخدم **استئناف** (`inputs["id"]`) لمواصلة نفس النسب؛ استخدم **تفرع** (`restore_from_state_id`) لبدء نسبٍ جديد من لقطة: + +| | `state.id` بعد kickoff | كتابات `@persist` تذهب إلى | +|---|---|---| +| `inputs["id"]` (استئناف) | المعرّف المقدم | المعرّف المقدم (يمد التاريخ) | +| `restore_from_state_id` (تفرع) | معرّف جديد، أو `inputs["id"]` إذا ثُبّت | المعرّف الجديد (المصدر محفوظ) | + +```python +from crewai.flow.flow import Flow, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class CounterState(BaseModel): + id: str = "" + counter: int = 0 + +@persist +class CounterFlow(Flow[CounterState]): + @start() + def step(self): + self.state.counter += 1 + +# التشغيل 1: حالة جديدة، العداد 0 -> 1 +flow_1 = CounterFlow() +flow_1.kickoff() + +# التفرع: الترطيب من أحدث لقطة لـ flow_1، لكن الكتابة تحت state.id جديد +flow_2 = CounterFlow() +flow_2.kickoff(restore_from_state_id=flow_1.state.id) +# يبدأ flow_2 بـ counter=1 (مرطّب)، ثم تزيده step() إلى 2. +# تاريخ flow_uuid لـ flow_1 لم يتغيّر. +``` + +ملاحظات السلوك: + +- `restore_from_state_id` غير موجود في الاستمرارية → يعود kickoff بصمت إلى السلوك الافتراضي (يعكس سلوك `inputs["id"]` عند عدم العثور عليه). لا يُطلق أي استثناء. +- الجمع بين `restore_from_state_id` و `from_checkpoint` يطلق `ValueError` — يستهدفان نظامي حالة مختلفين (`@persist` مقابل Checkpointing) ولا يمكن الجمع بينهما. +- `restore_from_state_id=None` (افتراضي) متطابق بايت ببايت مع kickoff بدون المعامل. +- تثبيت `inputs["id"]` أثناء التفرع يعني أن التشغيل الجديد يشارك مفتاح الاستمرارية مع تدفق آخر — عادةً ما تريد فقط `restore_from_state_id`. + ## أنماط حالة متقدمة ### المنطق الشرطي المبني على الحالة diff --git a/docs/en/concepts/flows.mdx b/docs/en/concepts/flows.mdx index defbd3e01..5aaeaf8ee 100644 --- a/docs/en/concepts/flows.mdx +++ b/docs/en/concepts/flows.mdx @@ -380,6 +380,42 @@ class AnotherFlow(Flow[dict]): print("Method-level persisted runs:", self.state["runs"]) ``` +### Forking Persisted State + +`@persist` supports two distinct hydration modes on `kickoff` / `kickoff_async`: + +- `kickoff(inputs={"id": })` — **resume**: load the latest snapshot for the supplied UUID and continue writing under the same `flow_uuid`. The history extends. +- `kickoff(restore_from_state_id=)` — **fork**: load the latest snapshot for the supplied UUID, hydrate the new run's state from it, and assign a fresh `state.id` (auto-generated, or `inputs["id"]` if pinned). The new run's `@persist` writes land under the new `state.id`; the source flow's history is preserved. + +```python +from crewai.flow.flow import Flow, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class CounterState(BaseModel): + id: str = "" + counter: int = 0 + +@persist +class CounterFlow(Flow[CounterState]): + @start() + def step(self): + self.state.counter += 1 + print(f"[id={self.state.id}] counter={self.state.counter}") + +# Run 1: fresh state, counter 0 -> 1, persisted under flow_1.state.id +flow_1 = CounterFlow() +flow_1.kickoff() + +# Fork: hydrate from flow_1's latest snapshot, but use a NEW state.id +flow_2 = CounterFlow() +flow_2.kickoff(restore_from_state_id=flow_1.state.id) +# flow_2.state.counter starts at 1 (hydrated), then step() bumps it to 2. +# flow_2.state.id != flow_1.state.id; flow_1's history is unchanged. +``` + +If the supplied `restore_from_state_id` does not match any persisted state, the kickoff falls back silently — same as the existing `inputs["id"]` resume not-found behavior. Combining `restore_from_state_id` with `from_checkpoint` raises a `ValueError`; pick one hydration source. Pinning `inputs["id"]` while forking shares a persistence key with another flow — usually you want only `restore_from_state_id`. + ### How It Works 1. **Unique State Identification** diff --git a/docs/en/concepts/production-architecture.mdx b/docs/en/concepts/production-architecture.mdx index ad668056f..82f873860 100644 --- a/docs/en/concepts/production-architecture.mdx +++ b/docs/en/concepts/production-architecture.mdx @@ -146,6 +146,14 @@ class ProductionFlow(Flow[AppState]): # ... ``` +By default, `@persist` resumes a flow when `kickoff(inputs={"id": })` is supplied, extending the same `flow_uuid` history. To **fork** a persisted flow into a new lineage — hydrate state from a previous run but write under a fresh `state.id` — pass `restore_from_state_id`: + +```python +flow.kickoff(restore_from_state_id="") +``` + +The new run gets a fresh `state.id` (auto-generated, or `inputs["id"]` if pinned) so its `@persist` writes don't extend the source's history. Combining with `from_checkpoint` raises a `ValueError`; pick one hydration source. + ## Summary - **Start with a Flow.** diff --git a/docs/en/guides/flows/mastering-flow-state.mdx b/docs/en/guides/flows/mastering-flow-state.mdx index 8bf99f43e..e2df53f67 100644 --- a/docs/en/guides/flows/mastering-flow-state.mdx +++ b/docs/en/guides/flows/mastering-flow-state.mdx @@ -346,6 +346,48 @@ class SelectivePersistFlow(Flow): return f"Complete with count {self.state['count']}" ``` +#### Forking Persisted State + +`@persist` supports two distinct hydration modes on `kickoff` / `kickoff_async`. Use **resume** (`inputs["id"]`) to continue the same lineage; use **fork** (`restore_from_state_id`) to start a new lineage seeded from a snapshot: + +| | `state.id` after kickoff | `@persist` writes land under | +|---|---|---| +| `inputs["id"]` (resume) | supplied id | supplied id (extends history) | +| `restore_from_state_id` (fork) | fresh id, or `inputs["id"]` if pinned | new id (source preserved) | + +```python +from crewai.flow.flow import Flow, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class CounterState(BaseModel): + id: str = "" + counter: int = 0 + +@persist +class CounterFlow(Flow[CounterState]): + @start() + def step(self): + self.state.counter += 1 + +# Run 1: fresh state, counter 0 -> 1 +flow_1 = CounterFlow() +flow_1.kickoff() + +# Fork: hydrate from flow_1's latest snapshot, but write under a NEW state.id +flow_2 = CounterFlow() +flow_2.kickoff(restore_from_state_id=flow_1.state.id) +# flow_2 starts with counter=1 (hydrated), then step() bumps it to 2. +# flow_1's flow_uuid history is unchanged. +``` + +Behavior notes: + +- `restore_from_state_id` not found in persistence → the kickoff falls back silently to default behavior (mirrors the existing `inputs["id"]` resume not-found behavior). No exception is raised. +- Combining `restore_from_state_id` with `from_checkpoint` raises a `ValueError` — they target different state systems (`@persist` vs. Checkpointing) and cannot be combined. +- `restore_from_state_id=None` (default) is byte-identical to a kickoff without the parameter. +- Pinning `inputs["id"]` while forking means the new run shares a persistence key with another flow — usually you want only `restore_from_state_id`. + ## Advanced State Patterns diff --git a/docs/ko/concepts/flows.mdx b/docs/ko/concepts/flows.mdx index 13f7d6933..68ba7ec6b 100644 --- a/docs/ko/concepts/flows.mdx +++ b/docs/ko/concepts/flows.mdx @@ -373,6 +373,42 @@ class AnotherFlow(Flow[dict]): print("Method-level persisted runs:", self.state["runs"]) ``` +### 영속 상태 포크하기 + +`@persist`는 `kickoff` / `kickoff_async`에서 두 가지 별개의 하이드레이션 모드를 지원합니다: + +- `kickoff(inputs={"id": })` — **재개(resume)**: 제공된 UUID에 대한 최신 스냅샷을 로드하고 동일한 `flow_uuid` 아래에서 계속 기록합니다. 기록이 확장됩니다. +- `kickoff(restore_from_state_id=)` — **포크(fork)**: 제공된 UUID에 대한 최신 스냅샷을 로드하고 새 실행의 상태를 하이드레이트한 후, 새로운 `state.id`(자동 생성, 또는 `inputs["id"]`가 고정된 경우 그 값)를 할당합니다. 새 실행의 `@persist` 기록은 새로운 `state.id` 아래에 저장되며, 원본 플로우의 기록은 보존됩니다. + +```python +from crewai.flow.flow import Flow, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class CounterState(BaseModel): + id: str = "" + counter: int = 0 + +@persist +class CounterFlow(Flow[CounterState]): + @start() + def step(self): + self.state.counter += 1 + print(f"[id={self.state.id}] counter={self.state.counter}") + +# 실행 1: 새 상태, counter 0 -> 1, flow_1.state.id 아래에 저장됨 +flow_1 = CounterFlow() +flow_1.kickoff() + +# 포크: flow_1의 최신 스냅샷에서 하이드레이트하지만, 새 state.id를 사용 +flow_2 = CounterFlow() +flow_2.kickoff(restore_from_state_id=flow_1.state.id) +# flow_2.state.counter는 1(하이드레이트)로 시작하고, step()이 2로 증가시킵니다. +# flow_2.state.id != flow_1.state.id; flow_1의 기록은 변경되지 않습니다. +``` + +제공된 `restore_from_state_id`가 어떤 영속 상태와도 일치하지 않으면, kickoff는 조용히 기본 동작으로 폴백됩니다 — 기존 `inputs["id"]`의 미발견 동작과 동일합니다. `restore_from_state_id`를 `from_checkpoint`와 결합하면 `ValueError`가 발생합니다; 하나의 하이드레이션 소스를 선택하세요. 포크 중 `inputs["id"]`를 고정하면 다른 플로우와 영속 키를 공유하게 됩니다 — 일반적으로 `restore_from_state_id`만 사용하는 것이 좋습니다. + ### 작동 방식 1. **고유 상태 식별** diff --git a/docs/ko/concepts/production-architecture.mdx b/docs/ko/concepts/production-architecture.mdx index d393874cc..112e744a9 100644 --- a/docs/ko/concepts/production-architecture.mdx +++ b/docs/ko/concepts/production-architecture.mdx @@ -146,6 +146,14 @@ class ProductionFlow(Flow[AppState]): # ... ``` +기본적으로, `@persist`는 `kickoff(inputs={"id": })`가 제공될 때 플로우를 재개하여 동일한 `flow_uuid` 기록을 확장합니다. 영속된 플로우를 새 계보로 **포크**하려면 — 이전 실행에서 상태를 하이드레이트하지만 새로운 `state.id` 아래에 기록 — `restore_from_state_id`를 전달하세요: + +```python +flow.kickoff(restore_from_state_id="") +``` + +새 실행은 새로운 `state.id`(자동 생성, 또는 `inputs["id"]`가 고정된 경우 그 값)를 받아 `@persist` 기록이 원본의 기록을 확장하지 않도록 합니다. `from_checkpoint`와 결합하면 `ValueError`가 발생합니다; 하나의 하이드레이션 소스를 선택하세요. + ## 요약 - **Flow로 시작하세요.** diff --git a/docs/ko/guides/flows/mastering-flow-state.mdx b/docs/ko/guides/flows/mastering-flow-state.mdx index 83b442f31..eafd24b29 100644 --- a/docs/ko/guides/flows/mastering-flow-state.mdx +++ b/docs/ko/guides/flows/mastering-flow-state.mdx @@ -346,6 +346,48 @@ class SelectivePersistFlow(Flow): return f"Complete with count {self.state['count']}" ``` +#### 영속 상태 포크하기 + +`@persist`는 `kickoff` / `kickoff_async`에서 두 가지 별개의 하이드레이션 모드를 지원합니다. 동일한 계보를 계속하려면 **재개**(`inputs["id"]`)를 사용하고, 스냅샷에서 시작하는 새 계보를 시작하려면 **포크**(`restore_from_state_id`)를 사용하세요: + +| | kickoff 후 `state.id` | `@persist` 기록 위치 | +|---|---|---| +| `inputs["id"]` (재개) | 제공된 id | 제공된 id (기록 확장) | +| `restore_from_state_id` (포크) | 새 id, 또는 고정 시 `inputs["id"]` | 새 id (원본 보존) | + +```python +from crewai.flow.flow import Flow, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class CounterState(BaseModel): + id: str = "" + counter: int = 0 + +@persist +class CounterFlow(Flow[CounterState]): + @start() + def step(self): + self.state.counter += 1 + +# 실행 1: 새 상태, counter 0 -> 1 +flow_1 = CounterFlow() +flow_1.kickoff() + +# 포크: flow_1의 최신 스냅샷에서 하이드레이트, 단 새 state.id에 기록 +flow_2 = CounterFlow() +flow_2.kickoff(restore_from_state_id=flow_1.state.id) +# flow_2는 counter=1(하이드레이트)로 시작하고, step()이 2로 증가시킵니다. +# flow_1의 flow_uuid 기록은 변경되지 않습니다. +``` + +동작 노트: + +- `restore_from_state_id`가 영속에서 발견되지 않음 → kickoff는 조용히 기본 동작으로 폴백됩니다 (기존 `inputs["id"]`의 미발견 동작 미러링). 예외는 발생하지 않습니다. +- `restore_from_state_id`를 `from_checkpoint`와 결합하면 `ValueError`가 발생합니다 — 서로 다른 상태 시스템(`@persist` 대 Checkpointing)을 대상으로 하므로 결합할 수 없습니다. +- `restore_from_state_id=None`(기본값)은 매개변수 없는 kickoff와 바이트 단위로 동일합니다. +- 포크 중 `inputs["id"]`를 고정하면 새 실행이 다른 플로우와 영속 키를 공유함을 의미합니다 — 일반적으로 `restore_from_state_id`만 사용하는 것이 좋습니다. + ## 고급 상태 패턴 ### 상태 기반 조건부 로직 diff --git a/docs/pt-BR/concepts/flows.mdx b/docs/pt-BR/concepts/flows.mdx index 2cac627b2..5cd5324f5 100644 --- a/docs/pt-BR/concepts/flows.mdx +++ b/docs/pt-BR/concepts/flows.mdx @@ -193,6 +193,42 @@ Para um controle mais granular, você pode aplicar @persist em métodos específ # (O código não é traduzido) ``` +### Forking de Estado Persistido + +`@persist` suporta dois modos distintos de hidratação em `kickoff` / `kickoff_async`: + +- `kickoff(inputs={"id": })` — **resume**: carrega o snapshot mais recente do UUID informado e continua escrevendo sob o mesmo `flow_uuid`. O histórico se estende. +- `kickoff(restore_from_state_id=)` — **fork**: carrega o snapshot mais recente do UUID informado, hidrata o estado da nova execução a partir dele, e atribui um novo `state.id` (auto-gerado, ou `inputs["id"]` se fixado). As escritas do `@persist` da nova execução vão para o novo `state.id`; o histórico do flow de origem é preservado. + +```python +from crewai.flow.flow import Flow, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class CounterState(BaseModel): + id: str = "" + counter: int = 0 + +@persist +class CounterFlow(Flow[CounterState]): + @start() + def step(self): + self.state.counter += 1 + print(f"[id={self.state.id}] counter={self.state.counter}") + +# Execução 1: estado novo, counter 0 -> 1, persistido sob flow_1.state.id +flow_1 = CounterFlow() +flow_1.kickoff() + +# Fork: hidrata do snapshot mais recente de flow_1, mas usa um state.id NOVO +flow_2 = CounterFlow() +flow_2.kickoff(restore_from_state_id=flow_1.state.id) +# flow_2.state.counter começa em 1 (hidratado), e step() incrementa para 2. +# flow_2.state.id != flow_1.state.id; o histórico de flow_1 não é alterado. +``` + +Se o `restore_from_state_id` informado não corresponder a nenhum estado persistido, o kickoff retorna silenciosamente ao comportamento padrão — o mesmo comportamento do `inputs["id"]` quando não encontrado. Combinar `restore_from_state_id` com `from_checkpoint` lança um `ValueError`; escolha uma única fonte de hidratação. Fixar `inputs["id"]` durante o fork compartilha uma chave de persistência com outro flow — geralmente você quer apenas `restore_from_state_id`. + ### Como Funciona 1. **Identificação Única do Estado** diff --git a/docs/pt-BR/concepts/production-architecture.mdx b/docs/pt-BR/concepts/production-architecture.mdx index ac1e17801..87b001e97 100644 --- a/docs/pt-BR/concepts/production-architecture.mdx +++ b/docs/pt-BR/concepts/production-architecture.mdx @@ -146,6 +146,14 @@ class ProductionFlow(Flow[AppState]): # ... ``` +Por padrão, `@persist` retoma um flow quando `kickoff(inputs={"id": })` é informado, estendendo o mesmo histórico do `flow_uuid`. Para **forkar** um flow persistido em uma nova linhagem — hidratar o estado a partir de uma execução anterior mas escrever sob um novo `state.id` — passe `restore_from_state_id`: + +```python +flow.kickoff(restore_from_state_id="") +``` + +A nova execução recebe um novo `state.id` (auto-gerado, ou `inputs["id"]` se fixado), então suas escritas do `@persist` não estendem o histórico da origem. Combinar com `from_checkpoint` lança um `ValueError`; escolha uma única fonte de hidratação. + ## Resumo - **Comece com um Flow.** diff --git a/docs/pt-BR/guides/flows/mastering-flow-state.mdx b/docs/pt-BR/guides/flows/mastering-flow-state.mdx index 442ab7dbb..9bc02d6f3 100644 --- a/docs/pt-BR/guides/flows/mastering-flow-state.mdx +++ b/docs/pt-BR/guides/flows/mastering-flow-state.mdx @@ -167,6 +167,48 @@ Para mais controle, você pode aplicar `@persist()` em métodos específicos: # código não traduzido ``` +#### Forking de Estado Persistido + +`@persist` suporta dois modos distintos de hidratação em `kickoff` / `kickoff_async`. Use **resume** (`inputs["id"]`) para continuar a mesma linhagem; use **fork** (`restore_from_state_id`) para iniciar uma nova linhagem a partir de um snapshot: + +| | `state.id` após o kickoff | Escritas do `@persist` vão para | +|---|---|---| +| `inputs["id"]` (resume) | id informado | id informado (estende o histórico) | +| `restore_from_state_id` (fork) | id novo, ou `inputs["id"]` se fixado | id novo (origem preservada) | + +```python +from crewai.flow.flow import Flow, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class CounterState(BaseModel): + id: str = "" + counter: int = 0 + +@persist +class CounterFlow(Flow[CounterState]): + @start() + def step(self): + self.state.counter += 1 + +# Execução 1: estado novo, counter 0 -> 1 +flow_1 = CounterFlow() +flow_1.kickoff() + +# Fork: hidrata do snapshot mais recente de flow_1, mas escreve sob um state.id NOVO +flow_2 = CounterFlow() +flow_2.kickoff(restore_from_state_id=flow_1.state.id) +# flow_2 começa com counter=1 (hidratado), e step() incrementa para 2. +# O histórico do flow_uuid de flow_1 não é alterado. +``` + +Notas sobre o comportamento: + +- `restore_from_state_id` não encontrado na persistência → o kickoff retorna silenciosamente ao comportamento padrão (espelha o comportamento de `inputs["id"]` quando não encontrado). Nenhuma exceção é lançada. +- Combinar `restore_from_state_id` com `from_checkpoint` lança um `ValueError` — eles miram sistemas de estado diferentes (`@persist` vs. Checkpointing) e não podem ser combinados. +- `restore_from_state_id=None` (padrão) é byte-idêntico a um kickoff sem o parâmetro. +- Fixar `inputs["id"]` durante o fork significa que a nova execução compartilha uma chave de persistência com outro flow — geralmente você quer apenas `restore_from_state_id`. + ## Padrões Avançados de Estado ### Lógica Condicional Baseada no Estado diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 95e6a9a15..d22794873 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1074,7 +1074,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): _human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict) _input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list) _state: Any = PrivateAttr(default=None) - _execution_id: str = PrivateAttr(default_factory=lambda: str(uuid4())) def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override] class _FlowGeneric(cls): # type: ignore[valid-type,misc] @@ -1865,27 +1864,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): except (AttributeError, TypeError): return "" # Safely handle any unexpected attribute access issues - @property - def execution_id(self) -> str: - """Stable identifier for this flow execution. - - Separate from ``flow_id`` / ``state.id``, which consumers may - override via ``kickoff(inputs={"id": ...})`` to resume a persisted - flow. ``execution_id`` is never affected by ``inputs`` and stays - stable for the lifetime of a single run, so it is the correct key - for telemetry, tracing, and any external correlation that must - uniquely identify a single execution even when callers pass an - ``id`` in ``inputs``. - - Defaults to a fresh ``uuid4`` per ``Flow`` instance; assign to - override when an outer system already has an execution identity. - """ - return self._execution_id - - @execution_id.setter - def execution_id(self, value: str) -> None: - self._execution_id = value - def _initialize_state(self, inputs: dict[str, Any]) -> None: """Initialize or update flow state with new inputs. @@ -2054,6 +2032,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): inputs: dict[str, Any] | None = None, input_files: dict[str, FileInput] | None = None, from_checkpoint: CheckpointConfig | None = None, + restore_from_state_id: str | None = None, ) -> Any | FlowStreamingOutput: """Start the flow execution in a synchronous context. @@ -2065,10 +2044,24 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): input_files: Optional dict of named file inputs for the flow. from_checkpoint: Optional checkpoint config. If ``restore_from`` is set, the flow resumes from that checkpoint. + restore_from_state_id: Optional UUID of a previously-persisted flow + whose latest snapshot should hydrate this run's state. The new + run is assigned a fresh ``state.id`` (or ``inputs["id"]`` if + pinned), so its ``@persist`` writes land under a separate + persistence key and the source flow's history is preserved. + If the referenced state is not found, the kickoff falls back + silently to baseline behavior. Cannot be combined with + ``from_checkpoint``; passing both raises ``ValueError``. Returns: The final output from the flow or FlowStreamingOutput if streaming. """ + if from_checkpoint is not None and restore_from_state_id is not None: + raise ValueError( + "Cannot combine `from_checkpoint` and `restore_from_state_id`. " + "These parameters target different state systems " + "(Checkpointing and @persist) and cannot be used together." + ) restored = apply_checkpoint(self, from_checkpoint) if restored is not None: return restored.kickoff(inputs=inputs, input_files=input_files) @@ -2090,7 +2083,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): def run_flow() -> None: try: self.stream = False - result = self.kickoff(inputs=inputs, input_files=input_files) + result = self.kickoff( + inputs=inputs, + input_files=input_files, + restore_from_state_id=restore_from_state_id, + ) result_holder.append(result) except Exception as e: # HumanFeedbackPending is expected control flow, not an error @@ -2113,7 +2110,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): return streaming_output async def _run_flow() -> Any: - return await self.kickoff_async(inputs, input_files) + return await self.kickoff_async( + inputs, + input_files, + restore_from_state_id=restore_from_state_id, + ) try: asyncio.get_running_loop() @@ -2128,6 +2129,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): inputs: dict[str, Any] | None = None, input_files: dict[str, FileInput] | None = None, from_checkpoint: CheckpointConfig | None = None, + restore_from_state_id: str | None = None, ) -> Any | FlowStreamingOutput: """Start the flow execution asynchronously. @@ -2141,10 +2143,23 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): input_files: Optional dict of named file inputs for the flow. from_checkpoint: Optional checkpoint config. If ``restore_from`` is set, the flow resumes from that checkpoint. + restore_from_state_id: Optional UUID of a previously-persisted flow + whose latest snapshot should hydrate this run's state. The new + run is assigned a fresh ``state.id`` (or ``inputs["id"]`` if + pinned), so subsequent ``@persist`` writes land under a + separate persistence key. If the referenced state is not + found, falls back silently to baseline. Cannot be combined + with ``from_checkpoint``; passing both raises ``ValueError``. Returns: The final output from the flow, which is the result of the last executed method. """ + if from_checkpoint is not None and restore_from_state_id is not None: + raise ValueError( + "Cannot combine `from_checkpoint` and `restore_from_state_id`. " + "These parameters target different state systems " + "(Checkpointing and @persist) and cannot be used together." + ) restored = apply_checkpoint(self, from_checkpoint) if restored is not None: return await restored.kickoff_async(inputs=inputs, input_files=input_files) @@ -2167,7 +2182,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): try: self.stream = False result = await self.kickoff_async( - inputs=inputs, input_files=input_files + inputs=inputs, + input_files=input_files, + restore_from_state_id=restore_from_state_id, ) result_holder.append(result) except Exception as e: @@ -2199,9 +2216,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): flow_id_token = None request_id_token = None if current_flow_id.get() is None: - flow_id_token = current_flow_id.set(self.execution_id) + flow_id_token = current_flow_id.set(self.flow_id) if current_flow_request_id.get() is None: - request_id_token = current_flow_request_id.set(self.execution_id) + request_id_token = current_flow_request_id.set(self.flow_id) try: # Reset flow state for fresh execution unless restoring from persistence @@ -2224,16 +2241,54 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): if self._completed_methods: self._is_execution_resuming = True + # Fork hydration: when restore_from_state_id is set and persistence is + # available, hydrate self._state from the source UUID's latest snapshot + # and reassign state.id to a fresh value so subsequent @persist writes + # don't extend the source flow's history. If the source state is not + # found, fall through silently to the existing inputs handling. + fork_succeeded = False + if restore_from_state_id is not None and self.persistence is not None: + stored_state = self.persistence.load_state(restore_from_state_id) + if stored_state: + self._log_flow_event( + f"Forking flow state from UUID: {restore_from_state_id}" + ) + self._restore_state(stored_state) + # Pin to inputs["id"] when provided, otherwise mint a fresh + # UUID. NOTE: pinning inputs.id while forking shares a + # persistence key with another flow — usually you want only + # restore_from_state_id. + new_state_id = (inputs.get("id") if inputs else None) or str( + uuid4() + ) + if isinstance(self._state, dict): + self._state["id"] = new_state_id + elif isinstance(self._state, BaseModel): + setattr(self._state, "id", new_state_id) # noqa: B010 + fork_succeeded = True + else: + self._log_flow_event( + "No flow state found for restore_from_state_id: " + f"{restore_from_state_id}; proceeding without hydration", + color="yellow", + ) + if inputs: - # Override the id in the state if it exists in inputs - if "id" in inputs: + # Override the id in the state if it exists in inputs. + # Skip when the fork already assigned state.id above. + if "id" in inputs and not fork_succeeded: if isinstance(self._state, dict): self._state["id"] = inputs["id"] elif isinstance(self._state, BaseModel): setattr(self._state, "id", inputs["id"]) # noqa: B010 # If persistence is enabled, attempt to restore the stored state using the provided id. - if "id" in inputs and self.persistence is not None: + # Skip when the fork already restored self._state above. + if ( + "id" in inputs + and self.persistence is not None + and not fork_succeeded + ): restore_uuid = inputs["id"] stored_state = self.persistence.load_state(restore_uuid) if stored_state: @@ -2416,6 +2471,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): inputs: dict[str, Any] | None = None, input_files: dict[str, FileInput] | None = None, from_checkpoint: CheckpointConfig | None = None, + restore_from_state_id: str | None = None, ) -> Any | FlowStreamingOutput: """Native async method to start the flow execution. Alias for kickoff_async. @@ -2424,11 +2480,19 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): input_files: Optional dict of named file inputs for the flow. from_checkpoint: Optional checkpoint config. If ``restore_from`` is set, the flow resumes from that checkpoint. + restore_from_state_id: Optional UUID of a previously-persisted flow + whose latest snapshot should hydrate this run's state. See + ``kickoff_async`` for full semantics. Returns: The final output from the flow, which is the result of the last executed method. """ - return await self.kickoff_async(inputs, input_files, from_checkpoint) + return await self.kickoff_async( + inputs, + input_files, + from_checkpoint, + restore_from_state_id=restore_from_state_id, + ) async def _replay_recorded_events(self) -> None: """Dispatch recorded ``MethodExecution*`` events from the event record.""" diff --git a/lib/crewai/tests/test_crew.py b/lib/crewai/tests/test_crew.py index 071395c91..21c582620 100644 --- a/lib/crewai/tests/test_crew.py +++ b/lib/crewai/tests/test_crew.py @@ -4519,8 +4519,8 @@ def test_sets_flow_context_when_using_crewbase_pattern_inside_flow(): flow.kickoff() assert captured_crew is not None - assert captured_crew._flow_id == flow.execution_id # type: ignore[attr-defined] - assert captured_crew._request_id == flow.execution_id # type: ignore[attr-defined] + assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined] + assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined] def test_sets_flow_context_when_outside_flow(researcher, writer): @@ -4554,8 +4554,8 @@ def test_sets_flow_context_when_inside_flow(researcher, writer): flow = MyFlow() result = flow.kickoff() - assert result._flow_id == flow.execution_id # type: ignore[attr-defined] - assert result._request_id == flow.execution_id # type: ignore[attr-defined] + assert result._flow_id == flow.flow_id # type: ignore[attr-defined] + assert result._request_id == flow.flow_id # type: ignore[attr-defined] def test_reset_knowledge_with_no_crew_knowledge(researcher, writer): diff --git a/lib/crewai/tests/test_flow_execution_id.py b/lib/crewai/tests/test_flow_execution_id.py deleted file mode 100644 index 95088d4b6..000000000 --- a/lib/crewai/tests/test_flow_execution_id.py +++ /dev/null @@ -1,127 +0,0 @@ -"""Regression tests for ``Flow.execution_id``. - -``execution_id`` is the stable tracking identifier for a single flow run. -It must stay independent of ``state.id`` so that consumers passing an -``id`` in ``inputs`` (used for persistence restore) cannot destabilize -the identity used by telemetry, tracing, and external correlation. -""" - -from __future__ import annotations - -from typing import Any - -import pytest -from crewai.flow.flow import Flow, FlowState, start -from crewai.flow.flow_context import current_flow_id, current_flow_request_id - - -class _CaptureState(FlowState): - captured_flow_id: str = "" - captured_state_id: str = "" - captured_current_flow_id: str = "" - captured_execution_id: str = "" - - -class _IdentityCaptureFlow(Flow[_CaptureState]): - initial_state = _CaptureState - - @start() - def capture(self) -> None: - self.state.captured_flow_id = self.flow_id - self.state.captured_state_id = self.state.id - self.state.captured_current_flow_id = current_flow_id.get() or "" - self.state.captured_execution_id = self.execution_id - - -def test_execution_id_defaults_to_fresh_uuid_per_instance() -> None: - a = _IdentityCaptureFlow() - b = _IdentityCaptureFlow() - - assert a.execution_id - assert b.execution_id - assert a.execution_id != b.execution_id - - -def test_execution_id_survives_consumer_id_in_inputs() -> None: - flow = _IdentityCaptureFlow() - original_execution_id = flow.execution_id - - flow.kickoff(inputs={"id": "consumer-supplied-id"}) - - assert flow.state.id == "consumer-supplied-id" - assert flow.flow_id == "consumer-supplied-id" - assert flow.execution_id == original_execution_id - assert flow.execution_id != "consumer-supplied-id" - - -def test_two_runs_with_same_consumer_id_have_distinct_execution_ids() -> None: - flow_a = _IdentityCaptureFlow() - flow_b = _IdentityCaptureFlow() - - colliding_id = "shared-consumer-id" - flow_a.kickoff(inputs={"id": colliding_id}) - flow_b.kickoff(inputs={"id": colliding_id}) - - assert flow_a.state.id == colliding_id - assert flow_b.state.id == colliding_id - assert flow_a.execution_id != flow_b.execution_id - - -def test_execution_id_is_writable() -> None: - flow = _IdentityCaptureFlow() - flow.execution_id = "external-task-id" - - assert flow.execution_id == "external-task-id" - - flow.kickoff(inputs={"id": "consumer-supplied-id"}) - assert flow.execution_id == "external-task-id" - assert flow.state.id == "consumer-supplied-id" - - -def test_current_flow_id_context_var_matches_execution_id() -> None: - flow = _IdentityCaptureFlow() - flow.execution_id = "external-task-id" - - flow.kickoff(inputs={"id": "consumer-supplied-id"}) - - assert flow.state.captured_current_flow_id == "external-task-id" - assert flow.state.captured_flow_id == "consumer-supplied-id" - assert flow.state.captured_execution_id == "external-task-id" - - -def test_execution_id_not_included_in_serialized_state() -> None: - flow = _IdentityCaptureFlow() - flow.execution_id = "external-task-id" - flow.kickoff() - - dumped = flow.state.model_dump() - assert "execution_id" not in dumped - assert "_execution_id" not in dumped - assert dumped["id"] == flow.state.id - - -def test_dict_state_flow_also_exposes_stable_execution_id() -> None: - class DictFlow(Flow[dict[str, Any]]): - initial_state = dict # type: ignore[assignment] - - @start() - def noop(self) -> None: - pass - - flow = DictFlow() - original = flow.execution_id - flow.kickoff(inputs={"id": "consumer-supplied-id"}) - - assert flow.state["id"] == "consumer-supplied-id" - assert flow.execution_id == original - - -@pytest.fixture(autouse=True) -def _reset_flow_context_vars(): - yield - for var in (current_flow_id, current_flow_request_id): - try: - var.set(None) - except LookupError: - # ContextVar was never set in this context; nothing to reset. - pass diff --git a/lib/crewai/tests/test_flow_persistence.py b/lib/crewai/tests/test_flow_persistence.py index 06bbf7231..65655f26b 100644 --- a/lib/crewai/tests/test_flow_persistence.py +++ b/lib/crewai/tests/test_flow_persistence.py @@ -3,6 +3,7 @@ import os from typing import Dict, List +import pytest from crewai.flow.flow import Flow, FlowState, listen, start from crewai.flow.persistence import persist from crewai.flow.persistence.sqlite import SQLiteFlowPersistence @@ -248,3 +249,242 @@ def test_persistence_with_base_model(tmp_path): assert message.type == "text" assert message.content == "Hello, World!" assert isinstance(flow.state._unwrap(), State) + + +def test_fork_with_restore_from_state_id(tmp_path): + """Fork: restore_from_state_id hydrates state from source flow_uuid; new run gets a + fresh state.id; source's history is preserved (the fork's @persist writes go under + the new state.id, not the source's).""" + db_path = os.path.join(tmp_path, "test_flows.db") + persistence = SQLiteFlowPersistence(db_path) + + class ForkableFlow(Flow[TestState]): + @start() + @persist(persistence) + def step(self): + self.state.counter += 1 + + # Run 1: build up source state. counter goes 0 -> 1. + flow1 = ForkableFlow(persistence=persistence) + flow1.kickoff() + source_uuid = flow1.state.id + assert flow1.state.counter == 1 + + # Resume on the same uuid bumps counter to 2 in the SAME flow_uuid history. + flow1b = ForkableFlow(persistence=persistence) + flow1b.kickoff(inputs={"id": source_uuid}) + assert flow1b.state.counter == 2 + assert persistence.load_state(source_uuid)["counter"] == 2 + + # Fork: hydrate from source, but persist under a fresh state.id. + flow2 = ForkableFlow(persistence=persistence) + flow2.kickoff(restore_from_state_id=source_uuid) + + # Fork has a different state.id from the source. + assert flow2.state.id != source_uuid + # Hydrated from source's latest snapshot (counter=2), then incremented to 3. + assert flow2.state.counter == 3 + + # Source's history is unchanged after the fork. + assert persistence.load_state(source_uuid)["counter"] == 2 + + # Fork's writes landed under its own state.id. + assert persistence.load_state(flow2.state.id)["counter"] == 3 + + +def test_fork_with_pinned_state_id(tmp_path): + """Fork into a pinned state.id (inputs.id supplied alongside restore_from_state_id): + the new run uses inputs.id as state.id and hydrates from restore_from_state_id.""" + db_path = os.path.join(tmp_path, "test_flows.db") + persistence = SQLiteFlowPersistence(db_path) + + class PinnableFlow(Flow[TestState]): + @start() + @persist(persistence) + def step(self): + self.state.counter += 1 + + flow1 = PinnableFlow(persistence=persistence) + flow1.kickoff() + source_uuid = flow1.state.id + assert flow1.state.counter == 1 + + pinned_uuid = "pinned-fork-uuid-1234" + flow2 = PinnableFlow(persistence=persistence) + flow2.kickoff( + inputs={"id": pinned_uuid}, + restore_from_state_id=source_uuid, + ) + + # state.id pinned to inputs.id, NOT the source uuid. + assert flow2.state.id == pinned_uuid + # Hydrated from source: counter started at 1, step incremented to 2. + assert flow2.state.counter == 2 + # Source's history is unchanged. + assert persistence.load_state(source_uuid)["counter"] == 1 + # Fork's writes are under the pinned uuid. + assert persistence.load_state(pinned_uuid)["counter"] == 2 + + +def test_restore_from_state_id_not_found_silent_fallback(tmp_path): + """Lookup miss on restore_from_state_id silently falls through to default behavior.""" + db_path = os.path.join(tmp_path, "test_flows.db") + persistence = SQLiteFlowPersistence(db_path) + + class FallbackFlow(Flow[TestState]): + @start() + @persist(persistence) + def step(self): + self.state.counter += 1 + + flow = FallbackFlow(persistence=persistence) + # No source UUID exists — should not raise. + flow.kickoff(restore_from_state_id="no-such-uuid") + + # Default state path: counter starts at 0 and step increments to 1. + assert flow.state.counter == 1 + # state.id is the auto-generated one, NOT the missing source. + assert flow.state.id != "no-such-uuid" + + +def test_restore_from_state_id_none_is_no_op(tmp_path): + """restore_from_state_id=None (default) preserves baseline kickoff behavior.""" + db_path = os.path.join(tmp_path, "test_flows.db") + persistence = SQLiteFlowPersistence(db_path) + + class BaselineFlow(Flow[TestState]): + @start() + @persist(persistence) + def step(self): + self.state.counter += 1 + + flow = BaselineFlow(persistence=persistence) + flow.kickoff(restore_from_state_id=None) + assert flow.state.counter == 1 + + +def test_fork_conflict_with_from_checkpoint_raises(): + """Passing both from_checkpoint and restore_from_state_id raises ValueError, naming + both parameters.""" + from crewai.state import CheckpointConfig + + class ConflictFlow(Flow[TestState]): + @start() + def step(self): + pass + + flow = ConflictFlow() + with pytest.raises(ValueError) as excinfo: + flow.kickoff( + from_checkpoint=CheckpointConfig(), + restore_from_state_id="some-uuid", + ) + msg = str(excinfo.value) + assert "from_checkpoint" in msg + assert "restore_from_state_id" in msg + + +@pytest.mark.asyncio +async def test_fork_via_kickoff_async(tmp_path): + """kickoff_async honors restore_from_state_id: hydrates from source, mints fresh + state.id, persists under the new id, source history preserved.""" + db_path = os.path.join(tmp_path, "test_flows.db") + persistence = SQLiteFlowPersistence(db_path) + + class AsyncForkableFlow(Flow[TestState]): + @start() + @persist(persistence) + def step(self): + self.state.counter += 1 + + flow1 = AsyncForkableFlow(persistence=persistence) + await flow1.kickoff_async() + source_uuid = flow1.state.id + assert flow1.state.counter == 1 + + flow2 = AsyncForkableFlow(persistence=persistence) + await flow2.kickoff_async(restore_from_state_id=source_uuid) + + assert flow2.state.id != source_uuid + assert flow2.state.counter == 2 + assert persistence.load_state(source_uuid)["counter"] == 1 + assert persistence.load_state(flow2.state.id)["counter"] == 2 + + +@pytest.mark.asyncio +async def test_fork_via_akickoff(tmp_path): + """akickoff is the public async alias and must accept restore_from_state_id with + the same semantics as kickoff_async.""" + db_path = os.path.join(tmp_path, "test_flows.db") + persistence = SQLiteFlowPersistence(db_path) + + class AkickoffForkableFlow(Flow[TestState]): + @start() + @persist(persistence) + def step(self): + self.state.counter += 1 + + flow1 = AkickoffForkableFlow(persistence=persistence) + await flow1.akickoff() + source_uuid = flow1.state.id + assert flow1.state.counter == 1 + + flow2 = AkickoffForkableFlow(persistence=persistence) + await flow2.akickoff(restore_from_state_id=source_uuid) + + assert flow2.state.id != source_uuid + assert flow2.state.counter == 2 + assert persistence.load_state(source_uuid)["counter"] == 1 + assert persistence.load_state(flow2.state.id)["counter"] == 2 + + +@pytest.mark.asyncio +async def test_akickoff_pinned_fork(tmp_path): + """akickoff with both inputs.id and restore_from_state_id pins state.id while + hydrating from the source.""" + db_path = os.path.join(tmp_path, "test_flows.db") + persistence = SQLiteFlowPersistence(db_path) + + class PinnableAsyncFlow(Flow[TestState]): + @start() + @persist(persistence) + def step(self): + self.state.counter += 1 + + flow1 = PinnableAsyncFlow(persistence=persistence) + await flow1.akickoff() + source_uuid = flow1.state.id + + pinned_uuid = "pinned-akickoff-fork-uuid" + flow2 = PinnableAsyncFlow(persistence=persistence) + await flow2.akickoff( + inputs={"id": pinned_uuid}, + restore_from_state_id=source_uuid, + ) + + assert flow2.state.id == pinned_uuid + assert flow2.state.counter == 2 + assert persistence.load_state(source_uuid)["counter"] == 1 + assert persistence.load_state(pinned_uuid)["counter"] == 2 + + +@pytest.mark.asyncio +async def test_akickoff_fork_conflict_with_from_checkpoint_raises(): + """akickoff must raise the same conflict ValueError as kickoff/kickoff_async when + both from_checkpoint and restore_from_state_id are set.""" + from crewai.state import CheckpointConfig + + class AsyncConflictFlow(Flow[TestState]): + @start() + def step(self): + pass + + flow = AsyncConflictFlow() + with pytest.raises(ValueError) as excinfo: + await flow.akickoff( + from_checkpoint=CheckpointConfig(), + restore_from_state_id="some-uuid", + ) + msg = str(excinfo.value) + assert "from_checkpoint" in msg + assert "restore_from_state_id" in msg