fix: restore Flow execution state from checkpoint fields

Flow.from_checkpoint deserialized the checkpoint_* fields but never
copied them back into the private execution attrs (_completed_methods,
_method_outputs, _method_execution_counts, _state). Calling kickoff()
after from_checkpoint would restart from scratch.

Add _restore_from_checkpoint that copies the checkpoint fields into the
private attrs, using the existing _restore_state method for state
reconstruction.
This commit is contained in:
Greyson LaLonde
2026-04-07 03:08:29 +08:00
parent 3ced490ca0
commit 4db05c3d2a

View File

@@ -941,6 +941,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if isinstance(entity, cls):
if entity.execution_context is not None:
apply_execution_context(entity.execution_context)
entity._restore_from_checkpoint()
return entity
raise ValueError(f"No {cls.__name__} found in checkpoint: {path}")
@@ -949,6 +950,21 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
checkpoint_method_counts: dict[str, int] | None = Field(default=None)
checkpoint_state: dict[str, Any] | None = Field(default=None)
def _restore_from_checkpoint(self) -> None:
"""Restore private execution state from checkpoint fields."""
if self.checkpoint_completed_methods is not None:
self._completed_methods = {
FlowMethodName(m) for m in self.checkpoint_completed_methods
}
if self.checkpoint_method_outputs is not None:
self._method_outputs = list(self.checkpoint_method_outputs)
if self.checkpoint_method_counts is not None:
self._method_execution_counts = {
FlowMethodName(k): v for k, v in self.checkpoint_method_counts.items()
}
if self.checkpoint_state is not None:
self._restore_state(self.checkpoint_state)
_methods: dict[FlowMethodName, FlowMethod[Any, Any]] = PrivateAttr(
default_factory=dict
)