Address code review comment

This commit is contained in:
Vinicius Brasil
2026-06-12 09:49:53 -07:00
parent b0a71bb861
commit fc4890be8b
2 changed files with 51 additions and 6 deletions

View File

@@ -1009,7 +1009,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_flow_match_id: str | None = PrivateAttr(default=None)
_usage_aggregation_handler: Callable[..., Any] | None = PrivateAttr(default=None)
_persist_backends: dict[str, FlowPersistence] = PrivateAttr(default_factory=dict)
_persistence_from_definition: bool = PrivateAttr(default=False)
_persistence_supplied: bool = PrivateAttr(default=False)
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
class _FlowGeneric(cls): # type: ignore[valid-type,misc]
@@ -1048,6 +1048,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
else self._class_bound_methods()
)
self._persistence_supplied = self.persistence is not None
flow_persist = self._definition.persist
if (
self.persistence is None
@@ -1055,7 +1056,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
and flow_persist.enabled
):
self.persistence = self._resolve_persist_backend(flow_persist)
self._persistence_from_definition = True
if self._state is None:
self._state = self._create_initial_state()
@@ -1472,6 +1472,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
result = flow.resume("looks good!")
```
"""
persistence_supplied = persistence is not None
if persistence is None:
from crewai.flow.persistence.factory import default_flow_persistence
@@ -1488,6 +1489,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if definition is not None
else cls(persistence=persistence, **kwargs)
)
instance._persistence_supplied = persistence_supplied
instance._initialize_state(state_data)
instance._pending_feedback_context = pending_context
instance._is_execution_resuming = True
@@ -2917,14 +2919,15 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
from crewai.flow.persistence.decorators import PersistenceDecorator
# Backend precedence: user-supplied instance persistence, then the
# method's own declared backend, then the flow-level backend (folded
# into self.persistence from the definition), then the default.
# Backend precedence: persistence the caller explicitly supplied, then
# the method's own declared backend, then whatever the runtime folded
# into self.persistence (flow-level backend, pause-created default),
# then the default.
method_declares_backend = (
method_definition.persist is not None
and method_definition.persist.persistence is not None
)
if method_declares_backend and self._persistence_from_definition:
if method_declares_backend and not self._persistence_supplied:
backend = self._persist_backends.get(str(method_name))
if backend is None:
backend = self._resolve_persist_backend(persist_definition)

View File

@@ -23,6 +23,7 @@ from crewai.flow.flow_config import flow_config
from crewai.flow.flow_definition import FlowConfigDefinition, FlowDefinition
from crewai.flow.persistence import persist
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.persistence.factory import set_flow_persistence_factory
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.types.streaming import FlowStreamingOutput
@@ -1162,6 +1163,47 @@ def test_human_feedback_pending_and_resume_from_yaml():
assert flow_id not in DefinitionStoreBackend.pending
def test_resume_with_default_persistence_uses_method_declared_backend():
yaml_str = f"""
schema: crewai.flow/v1
name: DefaultResumeFlow
methods:
generate:
do:
ref: {__name__}:_pending_generate
start: true
human_feedback:
message: "Review:"
provider: {__name__}:PausingProvider
process:
do:
ref: {__name__}:_pending_process
listen: generate
persist:
enabled: true
persistence:
persistence_type: DefinitionStoreBackend
store: hitl-method-backend
"""
definition = FlowDefinition.from_yaml(yaml_str)
set_flow_persistence_factory(
lambda: DefinitionStoreBackend(store="hitl-default-fallback")
)
try:
flow = Flow.from_definition(definition)
pending = flow.kickoff()
assert isinstance(pending, HumanFeedbackPending)
resumed = Flow.from_pending(pending.context.flow_id, definition=definition)
result = resumed.resume("ship it")
finally:
set_flow_persistence_factory(None)
assert result == "resumed:ship it"
assert _saved_methods("hitl-method-backend") == ["process"]
assert _saved_methods("hitl-default-fallback") == []
def test_flow_config_provider_fallback_from_yaml():
yaml_str = f"""
schema: crewai.flow/v1