From fc4890be8bc4297636b6f011c72e89bbfb75ef4f Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Fri, 12 Jun 2026 09:49:53 -0700 Subject: [PATCH] Address code review comment --- .../src/crewai/flow/runtime/__init__.py | 15 ++++--- lib/crewai/tests/test_flow_from_definition.py | 42 +++++++++++++++++++ 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/lib/crewai/src/crewai/flow/runtime/__init__.py b/lib/crewai/src/crewai/flow/runtime/__init__.py index c1b4adfee..0180cf428 100644 --- a/lib/crewai/src/crewai/flow/runtime/__init__.py +++ b/lib/crewai/src/crewai/flow/runtime/__init__.py @@ -1009,7 +1009,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): _flow_match_id: str | None = PrivateAttr(default=None) _usage_aggregation_handler: Callable[..., Any] | None = PrivateAttr(default=None) _persist_backends: dict[str, FlowPersistence] = PrivateAttr(default_factory=dict) - _persistence_from_definition: bool = PrivateAttr(default=False) + _persistence_supplied: bool = PrivateAttr(default=False) def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override] class _FlowGeneric(cls): # type: ignore[valid-type,misc] @@ -1048,6 +1048,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): else self._class_bound_methods() ) + self._persistence_supplied = self.persistence is not None flow_persist = self._definition.persist if ( self.persistence is None @@ -1055,7 +1056,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): and flow_persist.enabled ): self.persistence = self._resolve_persist_backend(flow_persist) - self._persistence_from_definition = True if self._state is None: self._state = self._create_initial_state() @@ -1472,6 +1472,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): result = flow.resume("looks good!") ``` """ + persistence_supplied = persistence is not None if persistence is None: from crewai.flow.persistence.factory import default_flow_persistence @@ -1488,6 +1489,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): if definition is not None else cls(persistence=persistence, **kwargs) ) + instance._persistence_supplied = persistence_supplied instance._initialize_state(state_data) instance._pending_feedback_context = pending_context instance._is_execution_resuming = True @@ -2917,14 +2919,15 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): from crewai.flow.persistence.decorators import PersistenceDecorator - # Backend precedence: user-supplied instance persistence, then the - # method's own declared backend, then the flow-level backend (folded - # into self.persistence from the definition), then the default. + # Backend precedence: persistence the caller explicitly supplied, then + # the method's own declared backend, then whatever the runtime folded + # into self.persistence (flow-level backend, pause-created default), + # then the default. method_declares_backend = ( method_definition.persist is not None and method_definition.persist.persistence is not None ) - if method_declares_backend and self._persistence_from_definition: + if method_declares_backend and not self._persistence_supplied: backend = self._persist_backends.get(str(method_name)) if backend is None: backend = self._resolve_persist_backend(persist_definition) diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index 9f381977b..21b7dd5ba 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -23,6 +23,7 @@ from crewai.flow.flow_config import flow_config from crewai.flow.flow_definition import FlowConfigDefinition, FlowDefinition from crewai.flow.persistence import persist from crewai.flow.persistence.base import FlowPersistence +from crewai.flow.persistence.factory import set_flow_persistence_factory from crewai.state.checkpoint_config import CheckpointConfig from crewai.types.streaming import FlowStreamingOutput @@ -1162,6 +1163,47 @@ def test_human_feedback_pending_and_resume_from_yaml(): assert flow_id not in DefinitionStoreBackend.pending +def test_resume_with_default_persistence_uses_method_declared_backend(): + yaml_str = f""" +schema: crewai.flow/v1 +name: DefaultResumeFlow +methods: + generate: + do: + ref: {__name__}:_pending_generate + start: true + human_feedback: + message: "Review:" + provider: {__name__}:PausingProvider + process: + do: + ref: {__name__}:_pending_process + listen: generate + persist: + enabled: true + persistence: + persistence_type: DefinitionStoreBackend + store: hitl-method-backend +""" + definition = FlowDefinition.from_yaml(yaml_str) + set_flow_persistence_factory( + lambda: DefinitionStoreBackend(store="hitl-default-fallback") + ) + try: + flow = Flow.from_definition(definition) + pending = flow.kickoff() + assert isinstance(pending, HumanFeedbackPending) + + resumed = Flow.from_pending(pending.context.flow_id, definition=definition) + result = resumed.resume("ship it") + finally: + set_flow_persistence_factory(None) + + assert result == "resumed:ship it" + assert _saved_methods("hitl-method-backend") == ["process"] + assert _saved_methods("hitl-default-fallback") == [] + + def test_flow_config_provider_fallback_from_yaml(): yaml_str = f""" schema: crewai.flow/v1