Fix resumption flag logic in Flow class and add regression test for cyclic flow persistence

- Updated the logic for setting the `_is_execution_resuming` flag to ensure it only activates when there are completed methods to replay, preventing incorrect suppression of cyclic re-execution during state reloads.
- Added a regression test to validate that cyclic router flows complete all iterations when persistence is enabled and an 'id' is passed in inputs, ensuring robust handling of flow execution in these scenarios.
This commit is contained in:
Joao Moura
2026-02-16 14:19:31 -08:00
parent 8ba416a16e
commit ae94dbb99a
2 changed files with 78 additions and 2 deletions

View File

@@ -1804,8 +1804,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._pending_and_listeners.clear()
self._clear_or_listeners()
else:
# We're restoring from persistence, set the flag
self._is_execution_resuming = True
# Only enter resumption mode if there are completed methods to
# replay. When _completed_methods is empty (e.g. a pure
# state-reload via kickoff(inputs={"id": ...})), the flow
# executes from scratch and the flag would incorrectly
# suppress cyclic re-execution on the second iteration.
if self._completed_methods:
self._is_execution_resuming = True
if inputs:
# Override the id in the state if it exists in inputs

View File

@@ -1772,3 +1772,74 @@ def test_cyclic_flow_multiple_or_listeners_fire_every_iteration():
f"'{method}' should fire every iteration, "
f"got {len(events)} fires: {execution_order}"
)
def test_cyclic_flow_works_with_persist_and_id_input():
"""Cyclic router flows must complete all iterations when persistence is
enabled and 'id' is passed in inputs.
Regression test: passing ``inputs={"id": ...}`` with a persistence backend
previously caused ``_is_execution_resuming`` to be set even though
``_completed_methods`` was empty. The flag was never cleared during
execution, so on the second cycle iteration the resumption path in
``_execute_single_listener`` short-circuited the router with ``(None, None)``
and the flow silently terminated after a single iteration.
"""
from uuid import uuid4
from crewai.flow.persistence import SQLiteFlowPersistence
execution_order: list[str] = []
class PersistCyclicFlow(Flow):
iteration: int = 0
max_iterations: int = 3
@start()
def begin(self):
execution_order.append("begin")
@router(or_(begin, "capture"))
def classify(self):
self.iteration += 1
execution_order.append(f"classify_{self.iteration}")
if self.iteration <= self.max_iterations:
return "type_a"
return "exit"
@listen("type_a")
def handle(self):
execution_order.append(f"handle_{self.iteration}")
@listen(or_(handle,))
def send(self):
execution_order.append(f"send_{self.iteration}")
@listen("send")
def capture(self):
execution_order.append(f"capture_{self.iteration}")
@listen("exit")
def finish(self):
execution_order.append("finish")
persistence = SQLiteFlowPersistence()
flow = PersistCyclicFlow(persistence=persistence)
flow.kickoff(inputs={"id": str(uuid4())})
assert "finish" in execution_order, (
f"Flow should have reached 'finish', got: {execution_order}"
)
# The router fires max_iterations+1 times (3 cycles + the final "exit")
classify_events = [e for e in execution_order if e.startswith("classify_")]
assert len(classify_events) == 4, (
f"'classify' should fire 4 times (3 cycles + exit), "
f"got {len(classify_events)}: {execution_order}"
)
# The other methods fire once per "type_a" cycle
for method in ["handle", "send", "capture"]:
events = [e for e in execution_order if e.startswith(f"{method}_")]
assert len(events) == 3, (
f"'{method}' should fire 3 times, "
f"got {len(events)}: {execution_order}"
)