diff --git a/docs/ar/concepts/flows.mdx b/docs/ar/concepts/flows.mdx index 8c01bdd97..1b23c5177 100644 --- a/docs/ar/concepts/flows.mdx +++ b/docs/ar/concepts/flows.mdx @@ -380,6 +380,33 @@ class AnotherFlow(Flow[dict]): print("Method-level persisted runs:", self.state["runs"]) ``` +### مفتاح استمرارية مخصص + +افتراضيًا، يستخدم `@persist` الحقل `state.id` المُولّد تلقائيًا كمفتاح للاستمرارية. إذا كان لتدفقك معرّف خاص به — مثل `conversation_id` مشترك بين عدة جلسات — يمكنك تمرير الوسيط `key` ليستخدم `@persist` تلك السمة كـ UUID للتدفق: + +```python +from crewai.flow.flow import Flow, listen, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class ConversationState(BaseModel): + conversation_id: str + turn: int = 0 + +@persist(key="conversation_id") # استخدام حقل مخصص كمفتاح للاستمرارية +class ConversationFlow(Flow[ConversationState]): + @start() + def begin(self): + self.state.turn += 1 + print(f"Conversation {self.state.conversation_id} turn {self.state.turn}") + +# إعادة تشغيل المحادثة بنفس conversation_id يُعيد تحميل الحالة السابقة +flow = ConversationFlow(conversation_id="user-42") +flow.kickoff() +``` + +يقرأ المزخرف القيمة من `state[key]` للحالات من نوع dict، ومن `getattr(state, key)` للحالات من نوع Pydantic / كائن. إذا كانت السمة المحددة غير موجودة أو قيمتها falsy عند الحفظ، يُطلق `@persist` خطأ `ValueError` مثل `Flow state is missing required persistence key 'conversation_id'`. عند حذف `key`، يظل السلوك الأصلي قائمًا ويُستخدم `state.id`. + ### كيف تعمل 1. **تعريف الحالة الفريد** diff --git a/docs/ar/concepts/production-architecture.mdx b/docs/ar/concepts/production-architecture.mdx index 19ba0cecb..ff69cfd1f 100644 --- a/docs/ar/concepts/production-architecture.mdx +++ b/docs/ar/concepts/production-architecture.mdx @@ -146,6 +146,15 @@ class ProductionFlow(Flow[AppState]): # ... ``` +افتراضيًا، يستخدم `@persist` الحقل `state.id` المُولّد تلقائيًا كمفتاح للحالة المحفوظة. إذا كان تطبيقك يمتلك معرّفًا طبيعيًا بالفعل — مثل `conversation_id` يربط عدة تشغيلات بنفس جلسة المستخدم — مرّره كـ `key` ليستخدمه المزخرف كـ UUID للتدفق. يُطلق `ValueError` إذا كانت السمة المحددة غير موجودة أو قيمتها falsy عند الحفظ. + +```python +@persist(key="conversation_id") +class ProductionFlow(Flow[AppState]): + # يجب أن يحتوي AppState على conversation_id؛ استئناف الجلسة يُعيد تحميل الحالة السابقة + ... +``` + ## الخلاصة - **ابدأ بتدفق.** diff --git a/docs/ar/guides/flows/mastering-flow-state.mdx b/docs/ar/guides/flows/mastering-flow-state.mdx index 64874e39c..7a8c6f931 100644 --- a/docs/ar/guides/flows/mastering-flow-state.mdx +++ b/docs/ar/guides/flows/mastering-flow-state.mdx @@ -116,6 +116,33 @@ class PersistentCounterFlow(Flow[CounterState]): return self.state.value ``` +### استخدام مفتاح استمرارية مخصص + +افتراضيًا، يستخدم `@persist()` الحقل `state.id` المُولّد تلقائيًا كمفتاح للحالة المحفوظة. عندما يكون لمجالك معرّف طبيعي بالفعل — مثل `conversation_id` يربط عدة تشغيلات للتدفق بنفس جلسة المستخدم — مرّره كوسيط `key` ليستخدمه `@persist` كـ UUID للتدفق بدلًا من `id`: + +```python +from crewai.flow.flow import Flow, listen, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class ConversationState(BaseModel): + conversation_id: str + history: list[str] = [] + +@persist(key="conversation_id") +class ConversationFlow(Flow[ConversationState]): + @start() + def greet(self): + self.state.history.append("hello") + return self.state.history + +# تشغيل ثانٍ بنفس conversation_id يُعيد تحميل الحالة السابقة +flow = ConversationFlow(conversation_id="user-42") +flow.kickoff() +``` + +بالنسبة للحالات من نوع dict يقرأ `@persist` القيمة من `state[key]`، ولحالات Pydantic / الكائنات يقرأها من `getattr(state, key)`. إذا كانت السمة المحددة غير موجودة أو قيمتها falsy عند حفظ الحالة، يُطلق `@persist` خطأ `ValueError` مثل `Flow state is missing required persistence key 'conversation_id'`، فيظهر الفشل فورًا بدلًا من فقد بيانات الاستمرارية بصمت. استدعاء `@persist()` بدون `key` يحافظ على السلوك الأصلي ويستخدم `state.id`. + ## أنماط حالة متقدمة ### المنطق الشرطي المبني على الحالة diff --git a/docs/en/concepts/flows.mdx b/docs/en/concepts/flows.mdx index defbd3e01..6f4c53b6a 100644 --- a/docs/en/concepts/flows.mdx +++ b/docs/en/concepts/flows.mdx @@ -380,6 +380,33 @@ class AnotherFlow(Flow[dict]): print("Method-level persisted runs:", self.state["runs"]) ``` +### Custom Persistence Key + +By default, `@persist` uses the auto-generated `state.id` field as the persistence key. If your flow models its own identifier — for example a `conversation_id` shared across sessions — you can pass a `key` argument and `@persist` will use that attribute as the flow UUID instead: + +```python +from crewai.flow.flow import Flow, listen, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class ConversationState(BaseModel): + conversation_id: str + turn: int = 0 + +@persist(key="conversation_id") # Use a custom field as the persistence key +class ConversationFlow(Flow[ConversationState]): + @start() + def begin(self): + self.state.turn += 1 + print(f"Conversation {self.state.conversation_id} turn {self.state.turn}") + +# Resuming the same conversation reloads its prior state by conversation_id +flow = ConversationFlow(conversation_id="user-42") +flow.kickoff() +``` + +The decorator reads the value at `state[key]` for dict states, or `getattr(state, key)` for Pydantic / object states. If the named attribute is missing or falsy at save time, `@persist` raises a `ValueError` such as `Flow state is missing required persistence key 'conversation_id'`. When `key` is omitted, the existing behavior is preserved and `state.id` is used. + ### How It Works 1. **Unique State Identification** diff --git a/docs/en/concepts/production-architecture.mdx b/docs/en/concepts/production-architecture.mdx index ad668056f..669a1693d 100644 --- a/docs/en/concepts/production-architecture.mdx +++ b/docs/en/concepts/production-architecture.mdx @@ -146,6 +146,15 @@ class ProductionFlow(Flow[AppState]): # ... ``` +By default `@persist` keys saved state by the auto-generated `state.id`. If your application already has a natural identifier — for example a `conversation_id` that ties multiple runs to the same user session — pass it as `key` and the decorator will use that attribute as the flow UUID. A `ValueError` is raised if the named attribute is missing or falsy at save time. + +```python +@persist(key="conversation_id") +class ProductionFlow(Flow[AppState]): + # AppState must expose conversation_id; resuming a session reloads its prior state + ... +``` + ## Summary - **Start with a Flow.** diff --git a/docs/en/guides/flows/mastering-flow-state.mdx b/docs/en/guides/flows/mastering-flow-state.mdx index 8bf99f43e..3325fad1e 100644 --- a/docs/en/guides/flows/mastering-flow-state.mdx +++ b/docs/en/guides/flows/mastering-flow-state.mdx @@ -346,6 +346,33 @@ class SelectivePersistFlow(Flow): return f"Complete with count {self.state['count']}" ``` +#### Using a Custom Persistence Key + +By default, `@persist()` keys persisted state by the flow's auto-generated `state.id`. When your domain already has a natural identifier — for example a `conversation_id` that ties multiple flow runs to the same user session — pass it as the `key` argument and `@persist` will use that attribute as the flow UUID instead of `id`: + +```python +from crewai.flow.flow import Flow, listen, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class ConversationState(BaseModel): + conversation_id: str + history: list[str] = [] + +@persist(key="conversation_id") +class ConversationFlow(Flow[ConversationState]): + @start() + def greet(self): + self.state.history.append("hello") + return self.state.history + +# A second run with the same conversation_id reloads the prior state +flow = ConversationFlow(conversation_id="user-42") +flow.kickoff() +``` + +For dict-based states `@persist` reads `state[key]`, and for Pydantic / object states it reads `getattr(state, key)`. If the named attribute is missing or falsy when state is being saved, `@persist` raises a `ValueError` like `Flow state is missing required persistence key 'conversation_id'`, so the failure surfaces immediately rather than silently dropping persisted data. Calling `@persist()` without `key` keeps the original behavior of using `state.id`. + ## Advanced State Patterns diff --git a/docs/ko/concepts/flows.mdx b/docs/ko/concepts/flows.mdx index 13f7d6933..6f9c32465 100644 --- a/docs/ko/concepts/flows.mdx +++ b/docs/ko/concepts/flows.mdx @@ -373,6 +373,33 @@ class AnotherFlow(Flow[dict]): print("Method-level persisted runs:", self.state["runs"]) ``` +### 사용자 지정 영속성 키 + +기본적으로 `@persist`는 자동 생성된 `state.id` 필드를 영속성 키로 사용합니다. 여러 세션에 걸쳐 공유되는 `conversation_id`처럼 플로우에 자체 식별자가 있는 경우, `key` 인자를 전달하면 `@persist`가 해당 속성을 플로우 UUID로 사용합니다: + +```python +from crewai.flow.flow import Flow, listen, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class ConversationState(BaseModel): + conversation_id: str + turn: int = 0 + +@persist(key="conversation_id") # 사용자 지정 필드를 영속성 키로 사용 +class ConversationFlow(Flow[ConversationState]): + @start() + def begin(self): + self.state.turn += 1 + print(f"Conversation {self.state.conversation_id} turn {self.state.turn}") + +# 동일한 conversation_id로 다시 실행하면 이전 상태가 다시 로드됩니다 +flow = ConversationFlow(conversation_id="user-42") +flow.kickoff() +``` + +이 데코레이터는 dict 상태의 경우 `state[key]`에서, Pydantic / 객체 상태의 경우 `getattr(state, key)`에서 값을 읽습니다. 저장 시점에 지정된 속성이 없거나 falsy 값이면, `@persist`는 `Flow state is missing required persistence key 'conversation_id'`와 같은 `ValueError`를 발생시킵니다. `key`를 생략하면 기존 동작이 유지되어 `state.id`가 사용됩니다. + ### 작동 방식 1. **고유 상태 식별** diff --git a/docs/ko/concepts/production-architecture.mdx b/docs/ko/concepts/production-architecture.mdx index d393874cc..59c18fdbe 100644 --- a/docs/ko/concepts/production-architecture.mdx +++ b/docs/ko/concepts/production-architecture.mdx @@ -146,6 +146,15 @@ class ProductionFlow(Flow[AppState]): # ... ``` +기본적으로 `@persist`는 자동 생성된 `state.id`를 저장된 상태의 키로 사용합니다. 애플리케이션에 이미 자연스러운 식별자가 있는 경우 — 예를 들어 같은 사용자 세션에 속한 여러 실행을 묶는 `conversation_id` — `key`로 전달하면 데코레이터가 해당 속성을 플로우 UUID로 사용합니다. 저장 시점에 지정된 속성이 없거나 falsy 값이면 `ValueError`가 발생합니다. + +```python +@persist(key="conversation_id") +class ProductionFlow(Flow[AppState]): + # AppState는 conversation_id를 노출해야 합니다; 세션을 재개하면 이전 상태가 다시 로드됩니다 + ... +``` + ## 요약 - **Flow로 시작하세요.** diff --git a/docs/ko/guides/flows/mastering-flow-state.mdx b/docs/ko/guides/flows/mastering-flow-state.mdx index 83b442f31..9b34b8e1e 100644 --- a/docs/ko/guides/flows/mastering-flow-state.mdx +++ b/docs/ko/guides/flows/mastering-flow-state.mdx @@ -346,6 +346,33 @@ class SelectivePersistFlow(Flow): return f"Complete with count {self.state['count']}" ``` +#### 사용자 지정 영속성 키 사용하기 + +기본적으로 `@persist()`는 자동 생성된 `state.id`를 영속 상태의 키로 사용합니다. 도메인에 이미 자연스러운 식별자가 있는 경우 — 예를 들어 같은 사용자 세션에 속한 여러 플로우 실행을 묶는 `conversation_id` — `key` 인자로 전달하면 `@persist`는 `id` 대신 해당 속성을 플로우 UUID로 사용합니다: + +```python +from crewai.flow.flow import Flow, listen, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class ConversationState(BaseModel): + conversation_id: str + history: list[str] = [] + +@persist(key="conversation_id") +class ConversationFlow(Flow[ConversationState]): + @start() + def greet(self): + self.state.history.append("hello") + return self.state.history + +# 동일한 conversation_id로 두 번째 실행 시 이전 상태가 다시 로드됩니다 +flow = ConversationFlow(conversation_id="user-42") +flow.kickoff() +``` + +dict 기반 상태의 경우 `@persist`는 `state[key]`를 읽고, Pydantic / 객체 상태의 경우 `getattr(state, key)`를 읽습니다. 상태가 저장될 때 지정된 속성이 없거나 falsy 값이면 `@persist`는 `Flow state is missing required persistence key 'conversation_id'`와 같은 `ValueError`를 발생시켜, 영속 데이터가 조용히 손실되는 대신 즉시 실패가 드러나도록 합니다. `key` 없이 `@persist()`를 호출하면 기존 동작대로 `state.id`가 사용됩니다. + ## 고급 상태 패턴 ### 상태 기반 조건부 로직 diff --git a/docs/pt-BR/concepts/flows.mdx b/docs/pt-BR/concepts/flows.mdx index 2cac627b2..b23d24c0d 100644 --- a/docs/pt-BR/concepts/flows.mdx +++ b/docs/pt-BR/concepts/flows.mdx @@ -193,6 +193,33 @@ Para um controle mais granular, você pode aplicar @persist em métodos específ # (O código não é traduzido) ``` +### Chave de Persistência Personalizada + +Por padrão, `@persist` usa o campo `state.id` gerado automaticamente como chave de persistência. Se o seu flow já possui um identificador natural — por exemplo um `conversation_id` compartilhado entre sessões — você pode passar o argumento `key` e `@persist` usará esse atributo como UUID do flow: + +```python +from crewai.flow.flow import Flow, listen, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class ConversationState(BaseModel): + conversation_id: str + turn: int = 0 + +@persist(key="conversation_id") # Usa um campo personalizado como chave de persistência +class ConversationFlow(Flow[ConversationState]): + @start() + def begin(self): + self.state.turn += 1 + print(f"Conversa {self.state.conversation_id} turno {self.state.turn}") + +# Retomar a mesma conversa recarrega o estado anterior pelo conversation_id +flow = ConversationFlow(conversation_id="user-42") +flow.kickoff() +``` + +O decorador lê o valor em `state[key]` para estados do tipo dicionário ou `getattr(state, key)` para estados Pydantic / objetos. Se o atributo informado estiver ausente ou for *falsy* no momento de salvar, `@persist` lança um `ValueError` como `Flow state is missing required persistence key 'conversation_id'`. Quando `key` é omitido, o comportamento original é preservado e `state.id` continua sendo usado. + ### Como Funciona 1. **Identificação Única do Estado** diff --git a/docs/pt-BR/concepts/production-architecture.mdx b/docs/pt-BR/concepts/production-architecture.mdx index ac1e17801..fbd71cdf2 100644 --- a/docs/pt-BR/concepts/production-architecture.mdx +++ b/docs/pt-BR/concepts/production-architecture.mdx @@ -146,6 +146,15 @@ class ProductionFlow(Flow[AppState]): # ... ``` +Por padrão, `@persist` usa o `state.id` gerado automaticamente como chave do estado salvo. Se a sua aplicação já tem um identificador natural — por exemplo um `conversation_id` que liga várias execuções à mesma sessão de usuário — passe-o como `key` e o decorador usará esse atributo como UUID do flow. Um `ValueError` é lançado se o atributo informado estiver ausente ou for *falsy* no momento de salvar. + +```python +@persist(key="conversation_id") +class ProductionFlow(Flow[AppState]): + # AppState precisa expor conversation_id; retomar a sessão recarrega o estado anterior + ... +``` + ## Resumo - **Comece com um Flow.** diff --git a/docs/pt-BR/guides/flows/mastering-flow-state.mdx b/docs/pt-BR/guides/flows/mastering-flow-state.mdx index 442ab7dbb..0dcebc34c 100644 --- a/docs/pt-BR/guides/flows/mastering-flow-state.mdx +++ b/docs/pt-BR/guides/flows/mastering-flow-state.mdx @@ -167,6 +167,33 @@ Para mais controle, você pode aplicar `@persist()` em métodos específicos: # código não traduzido ``` +#### Usando uma Chave de Persistência Personalizada + +Por padrão, `@persist()` usa o `state.id` gerado automaticamente como chave do estado persistido. Quando seu domínio já possui um identificador natural — por exemplo um `conversation_id` que liga várias execuções do flow à mesma sessão de usuário — passe-o como argumento `key` e `@persist` usará esse atributo como UUID do flow em vez de `id`: + +```python +from crewai.flow.flow import Flow, listen, start +from crewai.flow.persistence import persist +from pydantic import BaseModel + +class ConversationState(BaseModel): + conversation_id: str + history: list[str] = [] + +@persist(key="conversation_id") +class ConversationFlow(Flow[ConversationState]): + @start() + def greet(self): + self.state.history.append("hello") + return self.state.history + +# Uma segunda execução com o mesmo conversation_id recarrega o estado anterior +flow = ConversationFlow(conversation_id="user-42") +flow.kickoff() +``` + +Para estados baseados em dicionário `@persist` lê `state[key]`, e para estados Pydantic / objetos lê `getattr(state, key)`. Se o atributo informado estiver ausente ou for *falsy* no momento em que o estado for salvo, `@persist` lança um `ValueError` como `Flow state is missing required persistence key 'conversation_id'`, fazendo com que a falha apareça imediatamente em vez de descartar silenciosamente os dados persistidos. Chamar `@persist()` sem `key` mantém o comportamento original de usar `state.id`. + ## Padrões Avançados de Estado ### Lógica Condicional Baseada no Estado diff --git a/lib/crewai/src/crewai/flow/persistence/decorators.py b/lib/crewai/src/crewai/flow/persistence/decorators.py index 937b557f4..a5304d87f 100644 --- a/lib/crewai/src/crewai/flow/persistence/decorators.py +++ b/lib/crewai/src/crewai/flow/persistence/decorators.py @@ -50,6 +50,7 @@ LOG_MESSAGES: Final[dict[str, str]] = { "save_error": "Failed to persist state for method {}: {}", "state_missing": "Flow instance has no state", "id_missing": "Flow state must have an 'id' field for persistence", + "key_missing": "Flow state is missing required persistence key '{}'", } @@ -63,6 +64,7 @@ class PersistenceDecorator: method_name: str, persistence_instance: FlowPersistence, verbose: bool = False, + key: str | None = None, ) -> None: """Persist flow state with proper error handling and logging. @@ -74,9 +76,12 @@ class PersistenceDecorator: method_name: Name of the method that triggered persistence persistence_instance: The persistence backend to use verbose: Whether to log persistence operations + key: Optional state attribute/key to use as the persistence key. + When None, falls back to ``state.id``. Raises: - ValueError: If flow has no state or state lacks an ID + ValueError: If flow has no state, state lacks an ID, or the + requested ``key`` is missing or falsy on state. RuntimeError: If state persistence fails AttributeError: If flow instance lacks required state attributes """ @@ -85,19 +90,22 @@ class PersistenceDecorator: if state is None: raise ValueError("Flow instance has no state") + lookup_key = key if key is not None else "id" flow_uuid: str | None = None if isinstance(state, dict): - flow_uuid = state.get("id") + flow_uuid = state.get(lookup_key) elif hasattr(state, "_unwrap"): unwrapped = state._unwrap() if isinstance(unwrapped, dict): - flow_uuid = unwrapped.get("id") + flow_uuid = unwrapped.get(lookup_key) else: - flow_uuid = getattr(unwrapped, "id", None) - elif isinstance(state, BaseModel) or hasattr(state, "id"): - flow_uuid = getattr(state, "id", None) + flow_uuid = getattr(unwrapped, lookup_key, None) + elif isinstance(state, BaseModel) or hasattr(state, lookup_key): + flow_uuid = getattr(state, lookup_key, None) if not flow_uuid: + if key is not None: + raise ValueError(LOG_MESSAGES["key_missing"].format(key)) raise ValueError("Flow state must have an 'id' field for persistence") # Log state saving only if verbose is True @@ -127,7 +135,7 @@ class PersistenceDecorator: logger.error(error_msg) raise ValueError(error_msg) from e except (TypeError, ValueError) as e: - error_msg = LOG_MESSAGES["id_missing"] + error_msg = str(e) or LOG_MESSAGES["id_missing"] if verbose: PRINTER.print(error_msg, color="red") logger.error(error_msg) @@ -135,7 +143,9 @@ class PersistenceDecorator: def persist( - persistence: FlowPersistence | None = None, verbose: bool = False + persistence: FlowPersistence | None = None, + verbose: bool = False, + key: str | None = None, ) -> Callable[[type | Callable[..., T]], type | Callable[..., T]]: """Decorator to persist flow state. @@ -148,12 +158,16 @@ def persist( persistence: Optional FlowPersistence implementation to use. If not provided, uses SQLiteFlowPersistence. verbose: Whether to log persistence operations. Defaults to False. + key: Optional name of the state attribute (for Pydantic/object states) + or dict key (for dict states) to use as the persistence key. When + ``None`` (default) the decorator falls back to ``state.id``. Returns: A decorator that can be applied to either a class or method Raises: - ValueError: If the flow state doesn't have an 'id' field + ValueError: If the flow state doesn't have an 'id' field, or the + specified ``key`` is missing or falsy on state. RuntimeError: If state persistence fails Example: @@ -162,6 +176,10 @@ def persist( @start() def begin(self): pass + + @persist(key="conversation_id") # Custom persistence key + class MyFlow(Flow[MyState]): + ... """ def decorator(target: type | Callable[..., T]) -> type | Callable[..., T]: @@ -207,7 +225,7 @@ def persist( ) -> Any: result = await original_method(self, *args, **kwargs) PersistenceDecorator.persist_state( - self, method_name, actual_persistence, verbose + self, method_name, actual_persistence, verbose, key ) return result @@ -237,7 +255,7 @@ def persist( def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: result = original_method(self, *args, **kwargs) PersistenceDecorator.persist_state( - self, method_name, actual_persistence, verbose + self, method_name, actual_persistence, verbose, key ) return result @@ -276,7 +294,7 @@ def persist( else: result = method_coro PersistenceDecorator.persist_state( - flow_instance, method.__name__, actual_persistence, verbose + flow_instance, method.__name__, actual_persistence, verbose, key ) return cast(T, result) @@ -295,7 +313,7 @@ def persist( def method_sync_wrapper(flow_instance: Any, *args: Any, **kwargs: Any) -> T: result = method(flow_instance, *args, **kwargs) PersistenceDecorator.persist_state( - flow_instance, method.__name__, actual_persistence, verbose + flow_instance, method.__name__, actual_persistence, verbose, key ) return result diff --git a/lib/crewai/tests/test_flow_persistence.py b/lib/crewai/tests/test_flow_persistence.py index 06bbf7231..681061a3d 100644 --- a/lib/crewai/tests/test_flow_persistence.py +++ b/lib/crewai/tests/test_flow_persistence.py @@ -3,6 +3,7 @@ import os from typing import Dict, List +import pytest from crewai.flow.flow import Flow, FlowState, listen, start from crewai.flow.persistence import persist from crewai.flow.persistence.sqlite import SQLiteFlowPersistence @@ -248,3 +249,69 @@ def test_persistence_with_base_model(tmp_path): assert message.type == "text" assert message.content == "Hello, World!" assert isinstance(flow.state._unwrap(), State) + + +def test_persist_custom_key_with_pydantic_state(tmp_path): + """`@persist(key=...)` uses the named attribute on a Pydantic state.""" + db_path = os.path.join(tmp_path, "test_flows.db") + persistence = SQLiteFlowPersistence(db_path) + + class KeyedState(FlowState): + conversation_id: str = "conv-42" + message: str = "" + + class KeyedFlow(Flow[KeyedState]): + @start() + @persist(persistence, key="conversation_id") + def init_step(self): + self.state.message = "hello" + + flow = KeyedFlow(persistence=persistence) + flow.kickoff() + + saved_state = persistence.load_state("conv-42") + assert saved_state is not None + assert saved_state["message"] == "hello" + # The default `state.id` lookup must NOT have been used as the key. + assert persistence.load_state(flow.state.id) is None + + +def test_persist_custom_key_with_dict_state(tmp_path): + """`@persist(key=...)` uses the named key on a dict state.""" + db_path = os.path.join(tmp_path, "test_flows.db") + persistence = SQLiteFlowPersistence(db_path) + + class DictKeyedFlow(Flow[Dict[str, str]]): + initial_state = dict() + + @start() + @persist(persistence, key="conversation_id") + def init_step(self): + self.state["conversation_id"] = "conv-dict-7" + self.state["message"] = "hi from dict" + + flow = DictKeyedFlow(persistence=persistence) + flow.kickoff() + + saved_state = persistence.load_state("conv-dict-7") + assert saved_state is not None + assert saved_state["message"] == "hi from dict" + + +def test_persist_custom_key_missing_raises(tmp_path): + """A missing/falsy custom key must raise a clear ValueError.""" + db_path = os.path.join(tmp_path, "test_flows.db") + persistence = SQLiteFlowPersistence(db_path) + + class MissingKeyFlow(Flow[Dict[str, str]]): + initial_state = dict() + + @start() + @persist(persistence, key="conversation_id") + def init_step(self): + # Intentionally do NOT set "conversation_id" on state. + self.state["message"] = "no key here" + + flow = MissingKeyFlow(persistence=persistence) + with pytest.raises(ValueError, match="conversation_id"): + flow.kickoff()