From ae94dbb99ae1b380f6402b7c67a9018f5b0ec4ba Mon Sep 17 00:00:00 2001 From: Joao Moura Date: Mon, 16 Feb 2026 14:19:31 -0800 Subject: [PATCH] Fix resumption flag logic in Flow class and add regression test for cyclic flow persistence - Updated the logic for setting the `_is_execution_resuming` flag to ensure it only activates when there are completed methods to replay, preventing incorrect suppression of cyclic re-execution during state reloads. - Added a regression test to validate that cyclic router flows complete all iterations when persistence is enabled and an 'id' is passed in inputs, ensuring robust handling of flow execution in these scenarios. --- lib/crewai/src/crewai/flow/flow.py | 9 +++- lib/crewai/tests/test_flow.py | 71 ++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index d8e74fc08..58a59debe 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1804,8 +1804,13 @@ class Flow(Generic[T], metaclass=FlowMeta): self._pending_and_listeners.clear() self._clear_or_listeners() else: - # We're restoring from persistence, set the flag - self._is_execution_resuming = True + # Only enter resumption mode if there are completed methods to + # replay. When _completed_methods is empty (e.g. a pure + # state-reload via kickoff(inputs={"id": ...})), the flow + # executes from scratch and the flag would incorrectly + # suppress cyclic re-execution on the second iteration. + if self._completed_methods: + self._is_execution_resuming = True if inputs: # Override the id in the state if it exists in inputs diff --git a/lib/crewai/tests/test_flow.py b/lib/crewai/tests/test_flow.py index 0ec4b3063..585b6881e 100644 --- a/lib/crewai/tests/test_flow.py +++ b/lib/crewai/tests/test_flow.py @@ -1772,3 +1772,74 @@ def test_cyclic_flow_multiple_or_listeners_fire_every_iteration(): f"'{method}' should fire every iteration, " f"got {len(events)} fires: {execution_order}" ) + + +def test_cyclic_flow_works_with_persist_and_id_input(): + """Cyclic router flows must complete all iterations when persistence is + enabled and 'id' is passed in inputs. + + Regression test: passing ``inputs={"id": ...}`` with a persistence backend + previously caused ``_is_execution_resuming`` to be set even though + ``_completed_methods`` was empty. The flag was never cleared during + execution, so on the second cycle iteration the resumption path in + ``_execute_single_listener`` short-circuited the router with ``(None, None)`` + and the flow silently terminated after a single iteration. + """ + from uuid import uuid4 + + from crewai.flow.persistence import SQLiteFlowPersistence + + execution_order: list[str] = [] + + class PersistCyclicFlow(Flow): + iteration: int = 0 + max_iterations: int = 3 + + @start() + def begin(self): + execution_order.append("begin") + + @router(or_(begin, "capture")) + def classify(self): + self.iteration += 1 + execution_order.append(f"classify_{self.iteration}") + if self.iteration <= self.max_iterations: + return "type_a" + return "exit" + + @listen("type_a") + def handle(self): + execution_order.append(f"handle_{self.iteration}") + + @listen(or_(handle,)) + def send(self): + execution_order.append(f"send_{self.iteration}") + + @listen("send") + def capture(self): + execution_order.append(f"capture_{self.iteration}") + + @listen("exit") + def finish(self): + execution_order.append("finish") + + persistence = SQLiteFlowPersistence() + flow = PersistCyclicFlow(persistence=persistence) + flow.kickoff(inputs={"id": str(uuid4())}) + + assert "finish" in execution_order, ( + f"Flow should have reached 'finish', got: {execution_order}" + ) + # The router fires max_iterations+1 times (3 cycles + the final "exit") + classify_events = [e for e in execution_order if e.startswith("classify_")] + assert len(classify_events) == 4, ( + f"'classify' should fire 4 times (3 cycles + exit), " + f"got {len(classify_events)}: {execution_order}" + ) + # The other methods fire once per "type_a" cycle + for method in ["handle", "send", "capture"]: + events = [e for e in execution_order if e.startswith(f"{method}_")] + assert len(events) == 3, ( + f"'{method}' should fire 3 times, " + f"got {len(events)}: {execution_order}" + )