feat(flow): add restore_from_state_id kickoff parameter (#5674)

## Summary

- Reverts `b0e2fda` ("fix(flow): add execution_id separate from state.id", COR-48): removes `Flow.execution_id` and points `current_flow_id` / `current_flow_request_id` back at `flow_id` (i.e. `state.id`). The separate per-run tracking id was no longer the right abstraction once `restore_from_state_id` reshapes how `state.id` is assigned;

- Adds an optional `restore_from_state_id` kwarg to `Flow.kickoff` / `Flow.kickoff_async` that hydrates state from a previously-persisted flow's latest snapshot

- Reassigns `state.id` to a fresh value (or `inputs["id"]` if pinned) so the new run's `@persist` writes don't extend the source's history

- Existing `inputs["id"]` resume, `@persist`, and `from_checkpoint` paths are unchanged

## Problem
`@persist` only supports *resume* today: `kickoff(inputs={"id": <uuid>})` hydrates state and continues writing under the same `flow_uuid`. There's no way to **fork** — hydrate from a snapshot but persist under a separate key, leaving the source's history intact. This PR adds that.

| | `state.id` after kickoff | `@persist` writes land under |
|---|---|---|
| `inputs["id"]` (resume) | supplied id | supplied id (extends history) |
| `restore_from_state_id` (fork) | fresh id, or `inputs["id"]` if pinned | new id (source preserved) |

## Behavior

| `inputs.id` | `restore_from_state_id` | Effect |
|---|---|---|
| — | — | Fresh kickoff |
| set | — | Existing resume |
| — | UUID | Fork — new `state.id`, hydrated from source |
| set | UUID | Fork into a pinned `state.id`, hydrated from source |

- Source not found → silent fallback (mirrors existing resume)
- Both `from_checkpoint` and `restore_from_state_id` set → `ValueError`
- `restore_from_state_id=None` → byte-identical to current main

## Design
Fork hydration runs before the existing `inputs` block in `kickoff_async`. On a hit, it calls the same `_restore_state` primitive used by resume, then overwrites `state.id` with a fresh UUID (or `inputs["id"]`). A `fork_succeeded` flag gates the existing `inputs["id"]` path so we don't double-load. `_completed_methods` / `_is_execution_resuming` are intentionally untouched — skip-completed-methods remains the territory of `apply_checkpoint` and `from_pending`.

## Test plan
- [ ] `pytest tests/test_flow_persistence.py` — 5 new tests (four-row matrix, not-found fallback, default no-op, conflict raise) + 6 existing as regression
- [ ] `pytest tests/test_flow.py` — broader flow suite
- [ ] Manual end-to-end against an HITL `@persist` flow
This commit is contained in:
Tiago Freire
2026-05-01 12:46:07 -03:00
committed by GitHub
parent 07c4a30f2e
commit cd2b9ee38a
16 changed files with 683 additions and 162 deletions

View File

@@ -380,6 +380,42 @@ class AnotherFlow(Flow[dict]):
print("Method-level persisted runs:", self.state["runs"])
```
### تفرع الحالة المستمرة
يدعم `@persist` نمطين متميزين للترطيب في `kickoff` / `kickoff_async`:
- `kickoff(inputs={"id": <uuid>})` — **استئناف**: يحمّل أحدث لقطة لـ UUID المقدم ويستمر في الكتابة تحت نفس `flow_uuid`. يمتد التاريخ.
- `kickoff(restore_from_state_id=<uuid>)` — **تفرع**: يحمّل أحدث لقطة لـ UUID المقدم، يرطّب حالة التشغيل الجديد منها، ثم يعيّن `state.id` جديدًا (مولّدًا تلقائيًا، أو `inputs["id"]` إذا تم تثبيته). تذهب كتابات `@persist` للتشغيل الجديد تحت `state.id` الجديد؛ يتم الحفاظ على تاريخ تدفق المصدر.
```python
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@start()
def step(self):
self.state.counter += 1
print(f"[id={self.state.id}] counter={self.state.counter}")
# التشغيل 1: حالة جديدة، العداد 0 -> 1، محفوظ تحت flow_1.state.id
flow_1 = CounterFlow()
flow_1.kickoff()
# التفرع: ترطيب من أحدث لقطة لـ flow_1، لكن باستخدام state.id جديد
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# يبدأ flow_2.state.counter بـ 1 (مرطّب)، ثم تزيده step() إلى 2.
# flow_2.state.id != flow_1.state.id؛ تاريخ flow_1 لم يتغيّر.
```
إذا لم يطابق `restore_from_state_id` المقدم أي حالة مستمرة، يعود kickoff بصمت إلى السلوك الافتراضي — نفس سلوك `inputs["id"]` عند عدم العثور عليه. الجمع بين `restore_from_state_id` و `from_checkpoint` يطلق `ValueError`؛ اختر مصدر ترطيب واحدًا. تثبيت `inputs["id"]` أثناء التفرع يشارك مفتاح الاستمرارية مع تدفق آخر — عادةً ما تريد استخدام `restore_from_state_id` فقط.
### كيف تعمل
1. **تعريف الحالة الفريد**

View File

@@ -146,6 +146,14 @@ class ProductionFlow(Flow[AppState]):
# ...
```
افتراضيًا، يستأنف `@persist` تدفقًا عند توفير `kickoff(inputs={"id": <uuid>})`، مما يمدّ نفس تاريخ `flow_uuid`. لـ **تفرع** تدفق مستمر إلى نسبٍ جديد — ترطيب الحالة من تشغيل سابق ولكن الكتابة تحت `state.id` جديد — مرّر `restore_from_state_id`:
```python
flow.kickoff(restore_from_state_id="<previous-run-state-id>")
```
يحصل التشغيل الجديد على `state.id` جديد (مولّد تلقائيًا، أو `inputs["id"]` إذا تم تثبيته) لذا لا تمتد كتابات `@persist` الخاصة به إلى تاريخ المصدر. الجمع مع `from_checkpoint` يطلق `ValueError`؛ اختر مصدر ترطيب واحدًا.
## الخلاصة
- **ابدأ بتدفق.**

View File

@@ -116,6 +116,48 @@ class PersistentCounterFlow(Flow[CounterState]):
return self.state.value
```
#### تفرع الحالة المستمرة
يدعم `@persist` نمطين متميزين للترطيب في `kickoff` / `kickoff_async`. استخدم **استئناف** (`inputs["id"]`) لمواصلة نفس النسب؛ استخدم **تفرع** (`restore_from_state_id`) لبدء نسبٍ جديد من لقطة:
| | `state.id` بعد kickoff | كتابات `@persist` تذهب إلى |
|---|---|---|
| `inputs["id"]` (استئناف) | المعرّف المقدم | المعرّف المقدم (يمد التاريخ) |
| `restore_from_state_id` (تفرع) | معرّف جديد، أو `inputs["id"]` إذا ثُبّت | المعرّف الجديد (المصدر محفوظ) |
```python
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@start()
def step(self):
self.state.counter += 1
# التشغيل 1: حالة جديدة، العداد 0 -> 1
flow_1 = CounterFlow()
flow_1.kickoff()
# التفرع: الترطيب من أحدث لقطة لـ flow_1، لكن الكتابة تحت state.id جديد
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# يبدأ flow_2 بـ counter=1 (مرطّب)، ثم تزيده step() إلى 2.
# تاريخ flow_uuid لـ flow_1 لم يتغيّر.
```
ملاحظات السلوك:
- `restore_from_state_id` غير موجود في الاستمرارية → يعود kickoff بصمت إلى السلوك الافتراضي (يعكس سلوك `inputs["id"]` عند عدم العثور عليه). لا يُطلق أي استثناء.
- الجمع بين `restore_from_state_id` و `from_checkpoint` يطلق `ValueError` — يستهدفان نظامي حالة مختلفين (`@persist` مقابل Checkpointing) ولا يمكن الجمع بينهما.
- `restore_from_state_id=None` (افتراضي) متطابق بايت ببايت مع kickoff بدون المعامل.
- تثبيت `inputs["id"]` أثناء التفرع يعني أن التشغيل الجديد يشارك مفتاح الاستمرارية مع تدفق آخر — عادةً ما تريد فقط `restore_from_state_id`.
## أنماط حالة متقدمة
### المنطق الشرطي المبني على الحالة

View File

@@ -380,6 +380,42 @@ class AnotherFlow(Flow[dict]):
print("Method-level persisted runs:", self.state["runs"])
```
### Forking Persisted State
`@persist` supports two distinct hydration modes on `kickoff` / `kickoff_async`:
- `kickoff(inputs={"id": <uuid>})` — **resume**: load the latest snapshot for the supplied UUID and continue writing under the same `flow_uuid`. The history extends.
- `kickoff(restore_from_state_id=<uuid>)` — **fork**: load the latest snapshot for the supplied UUID, hydrate the new run's state from it, and assign a fresh `state.id` (auto-generated, or `inputs["id"]` if pinned). The new run's `@persist` writes land under the new `state.id`; the source flow's history is preserved.
```python
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@start()
def step(self):
self.state.counter += 1
print(f"[id={self.state.id}] counter={self.state.counter}")
# Run 1: fresh state, counter 0 -> 1, persisted under flow_1.state.id
flow_1 = CounterFlow()
flow_1.kickoff()
# Fork: hydrate from flow_1's latest snapshot, but use a NEW state.id
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2.state.counter starts at 1 (hydrated), then step() bumps it to 2.
# flow_2.state.id != flow_1.state.id; flow_1's history is unchanged.
```
If the supplied `restore_from_state_id` does not match any persisted state, the kickoff falls back silently — same as the existing `inputs["id"]` resume not-found behavior. Combining `restore_from_state_id` with `from_checkpoint` raises a `ValueError`; pick one hydration source. Pinning `inputs["id"]` while forking shares a persistence key with another flow — usually you want only `restore_from_state_id`.
### How It Works
1. **Unique State Identification**

View File

@@ -146,6 +146,14 @@ class ProductionFlow(Flow[AppState]):
# ...
```
By default, `@persist` resumes a flow when `kickoff(inputs={"id": <uuid>})` is supplied, extending the same `flow_uuid` history. To **fork** a persisted flow into a new lineage — hydrate state from a previous run but write under a fresh `state.id` — pass `restore_from_state_id`:
```python
flow.kickoff(restore_from_state_id="<previous-run-state-id>")
```
The new run gets a fresh `state.id` (auto-generated, or `inputs["id"]` if pinned) so its `@persist` writes don't extend the source's history. Combining with `from_checkpoint` raises a `ValueError`; pick one hydration source.
## Summary
- **Start with a Flow.**

View File

@@ -346,6 +346,48 @@ class SelectivePersistFlow(Flow):
return f"Complete with count {self.state['count']}"
```
#### Forking Persisted State
`@persist` supports two distinct hydration modes on `kickoff` / `kickoff_async`. Use **resume** (`inputs["id"]`) to continue the same lineage; use **fork** (`restore_from_state_id`) to start a new lineage seeded from a snapshot:
| | `state.id` after kickoff | `@persist` writes land under |
|---|---|---|
| `inputs["id"]` (resume) | supplied id | supplied id (extends history) |
| `restore_from_state_id` (fork) | fresh id, or `inputs["id"]` if pinned | new id (source preserved) |
```python
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@start()
def step(self):
self.state.counter += 1
# Run 1: fresh state, counter 0 -> 1
flow_1 = CounterFlow()
flow_1.kickoff()
# Fork: hydrate from flow_1's latest snapshot, but write under a NEW state.id
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2 starts with counter=1 (hydrated), then step() bumps it to 2.
# flow_1's flow_uuid history is unchanged.
```
Behavior notes:
- `restore_from_state_id` not found in persistence → the kickoff falls back silently to default behavior (mirrors the existing `inputs["id"]` resume not-found behavior). No exception is raised.
- Combining `restore_from_state_id` with `from_checkpoint` raises a `ValueError` — they target different state systems (`@persist` vs. Checkpointing) and cannot be combined.
- `restore_from_state_id=None` (default) is byte-identical to a kickoff without the parameter.
- Pinning `inputs["id"]` while forking means the new run shares a persistence key with another flow — usually you want only `restore_from_state_id`.
## Advanced State Patterns

View File

@@ -373,6 +373,42 @@ class AnotherFlow(Flow[dict]):
print("Method-level persisted runs:", self.state["runs"])
```
### 영속 상태 포크하기
`@persist`는 `kickoff` / `kickoff_async`에서 두 가지 별개의 하이드레이션 모드를 지원합니다:
- `kickoff(inputs={"id": <uuid>})` — **재개(resume)**: 제공된 UUID에 대한 최신 스냅샷을 로드하고 동일한 `flow_uuid` 아래에서 계속 기록합니다. 기록이 확장됩니다.
- `kickoff(restore_from_state_id=<uuid>)` — **포크(fork)**: 제공된 UUID에 대한 최신 스냅샷을 로드하고 새 실행의 상태를 하이드레이트한 후, 새로운 `state.id`(자동 생성, 또는 `inputs["id"]`가 고정된 경우 그 값)를 할당합니다. 새 실행의 `@persist` 기록은 새로운 `state.id` 아래에 저장되며, 원본 플로우의 기록은 보존됩니다.
```python
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@start()
def step(self):
self.state.counter += 1
print(f"[id={self.state.id}] counter={self.state.counter}")
# 실행 1: 새 상태, counter 0 -> 1, flow_1.state.id 아래에 저장됨
flow_1 = CounterFlow()
flow_1.kickoff()
# 포크: flow_1의 최신 스냅샷에서 하이드레이트하지만, 새 state.id를 사용
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2.state.counter는 1(하이드레이트)로 시작하고, step()이 2로 증가시킵니다.
# flow_2.state.id != flow_1.state.id; flow_1의 기록은 변경되지 않습니다.
```
제공된 `restore_from_state_id`가 어떤 영속 상태와도 일치하지 않으면, kickoff는 조용히 기본 동작으로 폴백됩니다 — 기존 `inputs["id"]`의 미발견 동작과 동일합니다. `restore_from_state_id`를 `from_checkpoint`와 결합하면 `ValueError`가 발생합니다; 하나의 하이드레이션 소스를 선택하세요. 포크 중 `inputs["id"]`를 고정하면 다른 플로우와 영속 키를 공유하게 됩니다 — 일반적으로 `restore_from_state_id`만 사용하는 것이 좋습니다.
### 작동 방식
1. **고유 상태 식별**

View File

@@ -146,6 +146,14 @@ class ProductionFlow(Flow[AppState]):
# ...
```
기본적으로, `@persist`는 `kickoff(inputs={"id": <uuid>})`가 제공될 때 플로우를 재개하여 동일한 `flow_uuid` 기록을 확장합니다. 영속된 플로우를 새 계보로 **포크**하려면 — 이전 실행에서 상태를 하이드레이트하지만 새로운 `state.id` 아래에 기록 — `restore_from_state_id`를 전달하세요:
```python
flow.kickoff(restore_from_state_id="<previous-run-state-id>")
```
새 실행은 새로운 `state.id`(자동 생성, 또는 `inputs["id"]`가 고정된 경우 그 값)를 받아 `@persist` 기록이 원본의 기록을 확장하지 않도록 합니다. `from_checkpoint`와 결합하면 `ValueError`가 발생합니다; 하나의 하이드레이션 소스를 선택하세요.
## 요약
- **Flow로 시작하세요.**

View File

@@ -346,6 +346,48 @@ class SelectivePersistFlow(Flow):
return f"Complete with count {self.state['count']}"
```
#### 영속 상태 포크하기
`@persist`는 `kickoff` / `kickoff_async`에서 두 가지 별개의 하이드레이션 모드를 지원합니다. 동일한 계보를 계속하려면 **재개**(`inputs["id"]`)를 사용하고, 스냅샷에서 시작하는 새 계보를 시작하려면 **포크**(`restore_from_state_id`)를 사용하세요:
| | kickoff 후 `state.id` | `@persist` 기록 위치 |
|---|---|---|
| `inputs["id"]` (재개) | 제공된 id | 제공된 id (기록 확장) |
| `restore_from_state_id` (포크) | 새 id, 또는 고정 시 `inputs["id"]` | 새 id (원본 보존) |
```python
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@start()
def step(self):
self.state.counter += 1
# 실행 1: 새 상태, counter 0 -> 1
flow_1 = CounterFlow()
flow_1.kickoff()
# 포크: flow_1의 최신 스냅샷에서 하이드레이트, 단 새 state.id에 기록
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2는 counter=1(하이드레이트)로 시작하고, step()이 2로 증가시킵니다.
# flow_1의 flow_uuid 기록은 변경되지 않습니다.
```
동작 노트:
- `restore_from_state_id`가 영속에서 발견되지 않음 → kickoff는 조용히 기본 동작으로 폴백됩니다 (기존 `inputs["id"]`의 미발견 동작 미러링). 예외는 발생하지 않습니다.
- `restore_from_state_id`를 `from_checkpoint`와 결합하면 `ValueError`가 발생합니다 — 서로 다른 상태 시스템(`@persist` 대 Checkpointing)을 대상으로 하므로 결합할 수 없습니다.
- `restore_from_state_id=None`(기본값)은 매개변수 없는 kickoff와 바이트 단위로 동일합니다.
- 포크 중 `inputs["id"]`를 고정하면 새 실행이 다른 플로우와 영속 키를 공유함을 의미합니다 — 일반적으로 `restore_from_state_id`만 사용하는 것이 좋습니다.
## 고급 상태 패턴
### 상태 기반 조건부 로직

View File

@@ -193,6 +193,42 @@ Para um controle mais granular, você pode aplicar @persist em métodos específ
# (O código não é traduzido)
```
### Forking de Estado Persistido
`@persist` suporta dois modos distintos de hidratação em `kickoff` / `kickoff_async`:
- `kickoff(inputs={"id": <uuid>})` — **resume**: carrega o snapshot mais recente do UUID informado e continua escrevendo sob o mesmo `flow_uuid`. O histórico se estende.
- `kickoff(restore_from_state_id=<uuid>)` — **fork**: carrega o snapshot mais recente do UUID informado, hidrata o estado da nova execução a partir dele, e atribui um novo `state.id` (auto-gerado, ou `inputs["id"]` se fixado). As escritas do `@persist` da nova execução vão para o novo `state.id`; o histórico do flow de origem é preservado.
```python
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@start()
def step(self):
self.state.counter += 1
print(f"[id={self.state.id}] counter={self.state.counter}")
# Execução 1: estado novo, counter 0 -> 1, persistido sob flow_1.state.id
flow_1 = CounterFlow()
flow_1.kickoff()
# Fork: hidrata do snapshot mais recente de flow_1, mas usa um state.id NOVO
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2.state.counter começa em 1 (hidratado), e step() incrementa para 2.
# flow_2.state.id != flow_1.state.id; o histórico de flow_1 não é alterado.
```
Se o `restore_from_state_id` informado não corresponder a nenhum estado persistido, o kickoff retorna silenciosamente ao comportamento padrão — o mesmo comportamento do `inputs["id"]` quando não encontrado. Combinar `restore_from_state_id` com `from_checkpoint` lança um `ValueError`; escolha uma única fonte de hidratação. Fixar `inputs["id"]` durante o fork compartilha uma chave de persistência com outro flow — geralmente você quer apenas `restore_from_state_id`.
### Como Funciona
1. **Identificação Única do Estado**

View File

@@ -146,6 +146,14 @@ class ProductionFlow(Flow[AppState]):
# ...
```
Por padrão, `@persist` retoma um flow quando `kickoff(inputs={"id": <uuid>})` é informado, estendendo o mesmo histórico do `flow_uuid`. Para **forkar** um flow persistido em uma nova linhagem — hidratar o estado a partir de uma execução anterior mas escrever sob um novo `state.id` — passe `restore_from_state_id`:
```python
flow.kickoff(restore_from_state_id="<previous-run-state-id>")
```
A nova execução recebe um novo `state.id` (auto-gerado, ou `inputs["id"]` se fixado), então suas escritas do `@persist` não estendem o histórico da origem. Combinar com `from_checkpoint` lança um `ValueError`; escolha uma única fonte de hidratação.
## Resumo
- **Comece com um Flow.**

View File

@@ -167,6 +167,48 @@ Para mais controle, você pode aplicar `@persist()` em métodos específicos:
# código não traduzido
```
#### Forking de Estado Persistido
`@persist` suporta dois modos distintos de hidratação em `kickoff` / `kickoff_async`. Use **resume** (`inputs["id"]`) para continuar a mesma linhagem; use **fork** (`restore_from_state_id`) para iniciar uma nova linhagem a partir de um snapshot:
| | `state.id` após o kickoff | Escritas do `@persist` vão para |
|---|---|---|
| `inputs["id"]` (resume) | id informado | id informado (estende o histórico) |
| `restore_from_state_id` (fork) | id novo, ou `inputs["id"]` se fixado | id novo (origem preservada) |
```python
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@start()
def step(self):
self.state.counter += 1
# Execução 1: estado novo, counter 0 -> 1
flow_1 = CounterFlow()
flow_1.kickoff()
# Fork: hidrata do snapshot mais recente de flow_1, mas escreve sob um state.id NOVO
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2 começa com counter=1 (hidratado), e step() incrementa para 2.
# O histórico do flow_uuid de flow_1 não é alterado.
```
Notas sobre o comportamento:
- `restore_from_state_id` não encontrado na persistência → o kickoff retorna silenciosamente ao comportamento padrão (espelha o comportamento de `inputs["id"]` quando não encontrado). Nenhuma exceção é lançada.
- Combinar `restore_from_state_id` com `from_checkpoint` lança um `ValueError` — eles miram sistemas de estado diferentes (`@persist` vs. Checkpointing) e não podem ser combinados.
- `restore_from_state_id=None` (padrão) é byte-idêntico a um kickoff sem o parâmetro.
- Fixar `inputs["id"]` durante o fork significa que a nova execução compartilha uma chave de persistência com outro flow — geralmente você quer apenas `restore_from_state_id`.
## Padrões Avançados de Estado
### Lógica Condicional Baseada no Estado

View File

@@ -1074,7 +1074,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict)
_input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list)
_state: Any = PrivateAttr(default=None)
_execution_id: str = PrivateAttr(default_factory=lambda: str(uuid4()))
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
class _FlowGeneric(cls): # type: ignore[valid-type,misc]
@@ -1865,27 +1864,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
except (AttributeError, TypeError):
return "" # Safely handle any unexpected attribute access issues
@property
def execution_id(self) -> str:
"""Stable identifier for this flow execution.
Separate from ``flow_id`` / ``state.id``, which consumers may
override via ``kickoff(inputs={"id": ...})`` to resume a persisted
flow. ``execution_id`` is never affected by ``inputs`` and stays
stable for the lifetime of a single run, so it is the correct key
for telemetry, tracing, and any external correlation that must
uniquely identify a single execution even when callers pass an
``id`` in ``inputs``.
Defaults to a fresh ``uuid4`` per ``Flow`` instance; assign to
override when an outer system already has an execution identity.
"""
return self._execution_id
@execution_id.setter
def execution_id(self, value: str) -> None:
self._execution_id = value
def _initialize_state(self, inputs: dict[str, Any]) -> None:
"""Initialize or update flow state with new inputs.
@@ -2054,6 +2032,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> Any | FlowStreamingOutput:
"""Start the flow execution in a synchronous context.
@@ -2065,10 +2044,24 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
input_files: Optional dict of named file inputs for the flow.
from_checkpoint: Optional checkpoint config. If ``restore_from``
is set, the flow resumes from that checkpoint.
restore_from_state_id: Optional UUID of a previously-persisted flow
whose latest snapshot should hydrate this run's state. The new
run is assigned a fresh ``state.id`` (or ``inputs["id"]`` if
pinned), so its ``@persist`` writes land under a separate
persistence key and the source flow's history is preserved.
If the referenced state is not found, the kickoff falls back
silently to baseline behavior. Cannot be combined with
``from_checkpoint``; passing both raises ``ValueError``.
Returns:
The final output from the flow or FlowStreamingOutput if streaming.
"""
if from_checkpoint is not None and restore_from_state_id is not None:
raise ValueError(
"Cannot combine `from_checkpoint` and `restore_from_state_id`. "
"These parameters target different state systems "
"(Checkpointing and @persist) and cannot be used together."
)
restored = apply_checkpoint(self, from_checkpoint)
if restored is not None:
return restored.kickoff(inputs=inputs, input_files=input_files)
@@ -2090,7 +2083,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
def run_flow() -> None:
try:
self.stream = False
result = self.kickoff(inputs=inputs, input_files=input_files)
result = self.kickoff(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
@@ -2113,7 +2110,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return streaming_output
async def _run_flow() -> Any:
return await self.kickoff_async(inputs, input_files)
return await self.kickoff_async(
inputs,
input_files,
restore_from_state_id=restore_from_state_id,
)
try:
asyncio.get_running_loop()
@@ -2128,6 +2129,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> Any | FlowStreamingOutput:
"""Start the flow execution asynchronously.
@@ -2141,10 +2143,23 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
input_files: Optional dict of named file inputs for the flow.
from_checkpoint: Optional checkpoint config. If ``restore_from``
is set, the flow resumes from that checkpoint.
restore_from_state_id: Optional UUID of a previously-persisted flow
whose latest snapshot should hydrate this run's state. The new
run is assigned a fresh ``state.id`` (or ``inputs["id"]`` if
pinned), so subsequent ``@persist`` writes land under a
separate persistence key. If the referenced state is not
found, falls back silently to baseline. Cannot be combined
with ``from_checkpoint``; passing both raises ``ValueError``.
Returns:
The final output from the flow, which is the result of the last executed method.
"""
if from_checkpoint is not None and restore_from_state_id is not None:
raise ValueError(
"Cannot combine `from_checkpoint` and `restore_from_state_id`. "
"These parameters target different state systems "
"(Checkpointing and @persist) and cannot be used together."
)
restored = apply_checkpoint(self, from_checkpoint)
if restored is not None:
return await restored.kickoff_async(inputs=inputs, input_files=input_files)
@@ -2167,7 +2182,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
try:
self.stream = False
result = await self.kickoff_async(
inputs=inputs, input_files=input_files
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
result_holder.append(result)
except Exception as e:
@@ -2199,9 +2216,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
flow_id_token = None
request_id_token = None
if current_flow_id.get() is None:
flow_id_token = current_flow_id.set(self.execution_id)
flow_id_token = current_flow_id.set(self.flow_id)
if current_flow_request_id.get() is None:
request_id_token = current_flow_request_id.set(self.execution_id)
request_id_token = current_flow_request_id.set(self.flow_id)
try:
# Reset flow state for fresh execution unless restoring from persistence
@@ -2224,16 +2241,54 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if self._completed_methods:
self._is_execution_resuming = True
# Fork hydration: when restore_from_state_id is set and persistence is
# available, hydrate self._state from the source UUID's latest snapshot
# and reassign state.id to a fresh value so subsequent @persist writes
# don't extend the source flow's history. If the source state is not
# found, fall through silently to the existing inputs handling.
fork_succeeded = False
if restore_from_state_id is not None and self.persistence is not None:
stored_state = self.persistence.load_state(restore_from_state_id)
if stored_state:
self._log_flow_event(
f"Forking flow state from UUID: {restore_from_state_id}"
)
self._restore_state(stored_state)
# Pin to inputs["id"] when provided, otherwise mint a fresh
# UUID. NOTE: pinning inputs.id while forking shares a
# persistence key with another flow — usually you want only
# restore_from_state_id.
new_state_id = (inputs.get("id") if inputs else None) or str(
uuid4()
)
if isinstance(self._state, dict):
self._state["id"] = new_state_id
elif isinstance(self._state, BaseModel):
setattr(self._state, "id", new_state_id) # noqa: B010
fork_succeeded = True
else:
self._log_flow_event(
"No flow state found for restore_from_state_id: "
f"{restore_from_state_id}; proceeding without hydration",
color="yellow",
)
if inputs:
# Override the id in the state if it exists in inputs
if "id" in inputs:
# Override the id in the state if it exists in inputs.
# Skip when the fork already assigned state.id above.
if "id" in inputs and not fork_succeeded:
if isinstance(self._state, dict):
self._state["id"] = inputs["id"]
elif isinstance(self._state, BaseModel):
setattr(self._state, "id", inputs["id"]) # noqa: B010
# If persistence is enabled, attempt to restore the stored state using the provided id.
if "id" in inputs and self.persistence is not None:
# Skip when the fork already restored self._state above.
if (
"id" in inputs
and self.persistence is not None
and not fork_succeeded
):
restore_uuid = inputs["id"]
stored_state = self.persistence.load_state(restore_uuid)
if stored_state:
@@ -2416,6 +2471,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
) -> Any | FlowStreamingOutput:
"""Native async method to start the flow execution. Alias for kickoff_async.
@@ -2424,11 +2480,19 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
input_files: Optional dict of named file inputs for the flow.
from_checkpoint: Optional checkpoint config. If ``restore_from``
is set, the flow resumes from that checkpoint.
restore_from_state_id: Optional UUID of a previously-persisted flow
whose latest snapshot should hydrate this run's state. See
``kickoff_async`` for full semantics.
Returns:
The final output from the flow, which is the result of the last executed method.
"""
return await self.kickoff_async(inputs, input_files, from_checkpoint)
return await self.kickoff_async(
inputs,
input_files,
from_checkpoint,
restore_from_state_id=restore_from_state_id,
)
async def _replay_recorded_events(self) -> None:
"""Dispatch recorded ``MethodExecution*`` events from the event record."""

View File

@@ -4519,8 +4519,8 @@ def test_sets_flow_context_when_using_crewbase_pattern_inside_flow():
flow.kickoff()
assert captured_crew is not None
assert captured_crew._flow_id == flow.execution_id # type: ignore[attr-defined]
assert captured_crew._request_id == flow.execution_id # type: ignore[attr-defined]
assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined]
assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined]
def test_sets_flow_context_when_outside_flow(researcher, writer):
@@ -4554,8 +4554,8 @@ def test_sets_flow_context_when_inside_flow(researcher, writer):
flow = MyFlow()
result = flow.kickoff()
assert result._flow_id == flow.execution_id # type: ignore[attr-defined]
assert result._request_id == flow.execution_id # type: ignore[attr-defined]
assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
def test_reset_knowledge_with_no_crew_knowledge(researcher, writer):

View File

@@ -1,127 +0,0 @@
"""Regression tests for ``Flow.execution_id``.
``execution_id`` is the stable tracking identifier for a single flow run.
It must stay independent of ``state.id`` so that consumers passing an
``id`` in ``inputs`` (used for persistence restore) cannot destabilize
the identity used by telemetry, tracing, and external correlation.
"""
from __future__ import annotations
from typing import Any
import pytest
from crewai.flow.flow import Flow, FlowState, start
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
class _CaptureState(FlowState):
captured_flow_id: str = ""
captured_state_id: str = ""
captured_current_flow_id: str = ""
captured_execution_id: str = ""
class _IdentityCaptureFlow(Flow[_CaptureState]):
initial_state = _CaptureState
@start()
def capture(self) -> None:
self.state.captured_flow_id = self.flow_id
self.state.captured_state_id = self.state.id
self.state.captured_current_flow_id = current_flow_id.get() or ""
self.state.captured_execution_id = self.execution_id
def test_execution_id_defaults_to_fresh_uuid_per_instance() -> None:
a = _IdentityCaptureFlow()
b = _IdentityCaptureFlow()
assert a.execution_id
assert b.execution_id
assert a.execution_id != b.execution_id
def test_execution_id_survives_consumer_id_in_inputs() -> None:
flow = _IdentityCaptureFlow()
original_execution_id = flow.execution_id
flow.kickoff(inputs={"id": "consumer-supplied-id"})
assert flow.state.id == "consumer-supplied-id"
assert flow.flow_id == "consumer-supplied-id"
assert flow.execution_id == original_execution_id
assert flow.execution_id != "consumer-supplied-id"
def test_two_runs_with_same_consumer_id_have_distinct_execution_ids() -> None:
flow_a = _IdentityCaptureFlow()
flow_b = _IdentityCaptureFlow()
colliding_id = "shared-consumer-id"
flow_a.kickoff(inputs={"id": colliding_id})
flow_b.kickoff(inputs={"id": colliding_id})
assert flow_a.state.id == colliding_id
assert flow_b.state.id == colliding_id
assert flow_a.execution_id != flow_b.execution_id
def test_execution_id_is_writable() -> None:
flow = _IdentityCaptureFlow()
flow.execution_id = "external-task-id"
assert flow.execution_id == "external-task-id"
flow.kickoff(inputs={"id": "consumer-supplied-id"})
assert flow.execution_id == "external-task-id"
assert flow.state.id == "consumer-supplied-id"
def test_current_flow_id_context_var_matches_execution_id() -> None:
flow = _IdentityCaptureFlow()
flow.execution_id = "external-task-id"
flow.kickoff(inputs={"id": "consumer-supplied-id"})
assert flow.state.captured_current_flow_id == "external-task-id"
assert flow.state.captured_flow_id == "consumer-supplied-id"
assert flow.state.captured_execution_id == "external-task-id"
def test_execution_id_not_included_in_serialized_state() -> None:
flow = _IdentityCaptureFlow()
flow.execution_id = "external-task-id"
flow.kickoff()
dumped = flow.state.model_dump()
assert "execution_id" not in dumped
assert "_execution_id" not in dumped
assert dumped["id"] == flow.state.id
def test_dict_state_flow_also_exposes_stable_execution_id() -> None:
class DictFlow(Flow[dict[str, Any]]):
initial_state = dict # type: ignore[assignment]
@start()
def noop(self) -> None:
pass
flow = DictFlow()
original = flow.execution_id
flow.kickoff(inputs={"id": "consumer-supplied-id"})
assert flow.state["id"] == "consumer-supplied-id"
assert flow.execution_id == original
@pytest.fixture(autouse=True)
def _reset_flow_context_vars():
yield
for var in (current_flow_id, current_flow_request_id):
try:
var.set(None)
except LookupError:
# ContextVar was never set in this context; nothing to reset.
pass

View File

@@ -3,6 +3,7 @@
import os
from typing import Dict, List
import pytest
from crewai.flow.flow import Flow, FlowState, listen, start
from crewai.flow.persistence import persist
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
@@ -248,3 +249,242 @@ def test_persistence_with_base_model(tmp_path):
assert message.type == "text"
assert message.content == "Hello, World!"
assert isinstance(flow.state._unwrap(), State)
def test_fork_with_restore_from_state_id(tmp_path):
"""Fork: restore_from_state_id hydrates state from source flow_uuid; new run gets a
fresh state.id; source's history is preserved (the fork's @persist writes go under
the new state.id, not the source's)."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class ForkableFlow(Flow[TestState]):
@start()
@persist(persistence)
def step(self):
self.state.counter += 1
# Run 1: build up source state. counter goes 0 -> 1.
flow1 = ForkableFlow(persistence=persistence)
flow1.kickoff()
source_uuid = flow1.state.id
assert flow1.state.counter == 1
# Resume on the same uuid bumps counter to 2 in the SAME flow_uuid history.
flow1b = ForkableFlow(persistence=persistence)
flow1b.kickoff(inputs={"id": source_uuid})
assert flow1b.state.counter == 2
assert persistence.load_state(source_uuid)["counter"] == 2
# Fork: hydrate from source, but persist under a fresh state.id.
flow2 = ForkableFlow(persistence=persistence)
flow2.kickoff(restore_from_state_id=source_uuid)
# Fork has a different state.id from the source.
assert flow2.state.id != source_uuid
# Hydrated from source's latest snapshot (counter=2), then incremented to 3.
assert flow2.state.counter == 3
# Source's history is unchanged after the fork.
assert persistence.load_state(source_uuid)["counter"] == 2
# Fork's writes landed under its own state.id.
assert persistence.load_state(flow2.state.id)["counter"] == 3
def test_fork_with_pinned_state_id(tmp_path):
"""Fork into a pinned state.id (inputs.id supplied alongside restore_from_state_id):
the new run uses inputs.id as state.id and hydrates from restore_from_state_id."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class PinnableFlow(Flow[TestState]):
@start()
@persist(persistence)
def step(self):
self.state.counter += 1
flow1 = PinnableFlow(persistence=persistence)
flow1.kickoff()
source_uuid = flow1.state.id
assert flow1.state.counter == 1
pinned_uuid = "pinned-fork-uuid-1234"
flow2 = PinnableFlow(persistence=persistence)
flow2.kickoff(
inputs={"id": pinned_uuid},
restore_from_state_id=source_uuid,
)
# state.id pinned to inputs.id, NOT the source uuid.
assert flow2.state.id == pinned_uuid
# Hydrated from source: counter started at 1, step incremented to 2.
assert flow2.state.counter == 2
# Source's history is unchanged.
assert persistence.load_state(source_uuid)["counter"] == 1
# Fork's writes are under the pinned uuid.
assert persistence.load_state(pinned_uuid)["counter"] == 2
def test_restore_from_state_id_not_found_silent_fallback(tmp_path):
"""Lookup miss on restore_from_state_id silently falls through to default behavior."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class FallbackFlow(Flow[TestState]):
@start()
@persist(persistence)
def step(self):
self.state.counter += 1
flow = FallbackFlow(persistence=persistence)
# No source UUID exists — should not raise.
flow.kickoff(restore_from_state_id="no-such-uuid")
# Default state path: counter starts at 0 and step increments to 1.
assert flow.state.counter == 1
# state.id is the auto-generated one, NOT the missing source.
assert flow.state.id != "no-such-uuid"
def test_restore_from_state_id_none_is_no_op(tmp_path):
"""restore_from_state_id=None (default) preserves baseline kickoff behavior."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class BaselineFlow(Flow[TestState]):
@start()
@persist(persistence)
def step(self):
self.state.counter += 1
flow = BaselineFlow(persistence=persistence)
flow.kickoff(restore_from_state_id=None)
assert flow.state.counter == 1
def test_fork_conflict_with_from_checkpoint_raises():
"""Passing both from_checkpoint and restore_from_state_id raises ValueError, naming
both parameters."""
from crewai.state import CheckpointConfig
class ConflictFlow(Flow[TestState]):
@start()
def step(self):
pass
flow = ConflictFlow()
with pytest.raises(ValueError) as excinfo:
flow.kickoff(
from_checkpoint=CheckpointConfig(),
restore_from_state_id="some-uuid",
)
msg = str(excinfo.value)
assert "from_checkpoint" in msg
assert "restore_from_state_id" in msg
@pytest.mark.asyncio
async def test_fork_via_kickoff_async(tmp_path):
"""kickoff_async honors restore_from_state_id: hydrates from source, mints fresh
state.id, persists under the new id, source history preserved."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class AsyncForkableFlow(Flow[TestState]):
@start()
@persist(persistence)
def step(self):
self.state.counter += 1
flow1 = AsyncForkableFlow(persistence=persistence)
await flow1.kickoff_async()
source_uuid = flow1.state.id
assert flow1.state.counter == 1
flow2 = AsyncForkableFlow(persistence=persistence)
await flow2.kickoff_async(restore_from_state_id=source_uuid)
assert flow2.state.id != source_uuid
assert flow2.state.counter == 2
assert persistence.load_state(source_uuid)["counter"] == 1
assert persistence.load_state(flow2.state.id)["counter"] == 2
@pytest.mark.asyncio
async def test_fork_via_akickoff(tmp_path):
"""akickoff is the public async alias and must accept restore_from_state_id with
the same semantics as kickoff_async."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class AkickoffForkableFlow(Flow[TestState]):
@start()
@persist(persistence)
def step(self):
self.state.counter += 1
flow1 = AkickoffForkableFlow(persistence=persistence)
await flow1.akickoff()
source_uuid = flow1.state.id
assert flow1.state.counter == 1
flow2 = AkickoffForkableFlow(persistence=persistence)
await flow2.akickoff(restore_from_state_id=source_uuid)
assert flow2.state.id != source_uuid
assert flow2.state.counter == 2
assert persistence.load_state(source_uuid)["counter"] == 1
assert persistence.load_state(flow2.state.id)["counter"] == 2
@pytest.mark.asyncio
async def test_akickoff_pinned_fork(tmp_path):
"""akickoff with both inputs.id and restore_from_state_id pins state.id while
hydrating from the source."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class PinnableAsyncFlow(Flow[TestState]):
@start()
@persist(persistence)
def step(self):
self.state.counter += 1
flow1 = PinnableAsyncFlow(persistence=persistence)
await flow1.akickoff()
source_uuid = flow1.state.id
pinned_uuid = "pinned-akickoff-fork-uuid"
flow2 = PinnableAsyncFlow(persistence=persistence)
await flow2.akickoff(
inputs={"id": pinned_uuid},
restore_from_state_id=source_uuid,
)
assert flow2.state.id == pinned_uuid
assert flow2.state.counter == 2
assert persistence.load_state(source_uuid)["counter"] == 1
assert persistence.load_state(pinned_uuid)["counter"] == 2
@pytest.mark.asyncio
async def test_akickoff_fork_conflict_with_from_checkpoint_raises():
"""akickoff must raise the same conflict ValueError as kickoff/kickoff_async when
both from_checkpoint and restore_from_state_id are set."""
from crewai.state import CheckpointConfig
class AsyncConflictFlow(Flow[TestState]):
@start()
def step(self):
pass
flow = AsyncConflictFlow()
with pytest.raises(ValueError) as excinfo:
await flow.akickoff(
from_checkpoint=CheckpointConfig(),
restore_from_state_id="some-uuid",
)
msg = str(excinfo.value)
assert "from_checkpoint" in msg
assert "restore_from_state_id" in msg