fix: dispatch Flow checkpoints through Flow APIs in TUI (#5492)

The checkpoint TUI was hardcoded to use Crew.from_checkpoint() and
Crew.fork() for all checkpoints. Flow checkpoints now dispatch through
Flow.from_checkpoint() / Flow.fork() instead.

Changes:
- Add _is_flow_checkpoint() to detect flow entities in checkpoint data
- Modify _run_checkpoint_tui_async to dispatch to Flow or Crew based
  on entity type
- Skip task_overrides for Flow checkpoints (not applicable)
- Update _TuiResult type to include is_flow flag
- Add 12 tests covering flow detection and dispatch paths

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2026-04-16 08:33:07 +00:00
parent 0bb6faa9d3
commit ce468aad70
2 changed files with 189 additions and 23 deletions

View File

@@ -78,15 +78,16 @@ def _build_entity_header(ent: dict[str, Any]) -> str:
return "\n".join(lines)
# Return type: (location, action, inputs, task_output_overrides)
_TuiResult = tuple[str, str, dict[str, Any] | None, dict[int, str] | None] | None
# Return type: (location, action, inputs, task_output_overrides, is_flow)
_TuiResult = tuple[str, str, dict[str, Any] | None, dict[int, str] | None, bool] | None
class CheckpointTUI(App[_TuiResult]):
"""TUI to browse and inspect checkpoints.
Returns ``(location, action, inputs)`` where action is ``"resume"`` or
``"fork"`` and inputs is a parsed dict or ``None``,
Returns ``(location, action, inputs, task_overrides, is_flow)`` where
action is ``"resume"`` or ``"fork"``, inputs is a parsed dict or
``None``, and *is_flow* indicates the checkpoint contains a Flow entity,
or ``None`` if the user quit without selecting.
"""
@@ -520,16 +521,25 @@ class CheckpointTUI(App[_TuiResult]):
if event.node.data is not None:
await self._show_detail(event.node.data)
@staticmethod
def _is_flow_checkpoint(entry: dict[str, Any]) -> bool:
"""Return True if the checkpoint entry contains a Flow entity."""
for ent in entry.get("entities", []):
if ent.get("type") == "flow":
return True
return False
def on_button_pressed(self, event: Button.Pressed) -> None:
if self._selected_entry is None:
return
inputs = self._collect_inputs()
overrides = self._collect_task_overrides()
loc = self._resolve_location(self._selected_entry)
is_flow = self._is_flow_checkpoint(self._selected_entry)
if event.button.id == "btn-resume":
self.exit((loc, "resume", inputs, overrides))
self.exit((loc, "resume", inputs, overrides, is_flow))
elif event.button.id == "btn-fork":
self.exit((loc, "fork", inputs, overrides))
self.exit((loc, "fork", inputs, overrides, is_flow))
def action_refresh(self) -> None:
self._refresh_tree()
@@ -545,37 +555,48 @@ async def _run_checkpoint_tui_async(location: str) -> None:
if selection is None:
return
selected, action, inputs, task_overrides = selection
selected, action, inputs, task_overrides, is_flow = selection
from crewai.crew import Crew
from crewai.state.checkpoint_config import CheckpointConfig
config = CheckpointConfig(restore_from=selected)
if action == "fork":
click.echo(f"\nForking from: {selected}\n")
crew = Crew.fork(config)
else:
click.echo(f"\nResuming from: {selected}\n")
crew = Crew.from_checkpoint(config)
if is_flow:
from crewai.flow.flow import Flow
if task_overrides:
if action == "fork":
click.echo(f"\nForking Flow from: {selected}\n")
entity = Flow.fork(config)
else:
click.echo(f"\nResuming Flow from: {selected}\n")
entity = Flow.from_checkpoint(config)
else:
from crewai.crew import Crew
if action == "fork":
click.echo(f"\nForking from: {selected}\n")
entity = Crew.fork(config)
else:
click.echo(f"\nResuming from: {selected}\n")
entity = Crew.from_checkpoint(config)
if task_overrides and not is_flow:
click.echo("Modifications:")
overridden_agents: set[int] = set()
for task_idx, new_output in task_overrides.items():
if task_idx < len(crew.tasks) and crew.tasks[task_idx].output is not None:
desc = crew.tasks[task_idx].description or f"Task {task_idx + 1}"
if task_idx < len(entity.tasks) and entity.tasks[task_idx].output is not None:
desc = entity.tasks[task_idx].description or f"Task {task_idx + 1}"
if len(desc) > 60:
desc = desc[:57] + "..."
crew.tasks[task_idx].output.raw = new_output # type: ignore[union-attr]
entity.tasks[task_idx].output.raw = new_output # type: ignore[union-attr]
preview = new_output.replace("\n", " ")
if len(preview) > 80:
preview = preview[:77] + "..."
click.echo(f" Task {task_idx + 1}: {desc}")
click.echo(f" -> {preview}")
agent = crew.tasks[task_idx].agent
agent = entity.tasks[task_idx].agent
if agent and agent.agent_executor:
nth = sum(1 for t in crew.tasks[:task_idx] if t.agent is agent)
nth = sum(1 for t in entity.tasks[:task_idx] if t.agent is agent)
messages = agent.agent_executor.messages
system_positions = [
i for i, m in enumerate(messages) if m.get("role") == "system"
@@ -595,7 +616,7 @@ async def _run_checkpoint_tui_async(location: str) -> None:
earliest = min(task_overrides)
for offset, subsequent in enumerate(
crew.tasks[earliest + 1 :], start=earliest + 1
entity.tasks[earliest + 1 :], start=earliest + 1
):
if subsequent.output and offset not in task_overrides:
subsequent.output = None
@@ -611,7 +632,7 @@ async def _run_checkpoint_tui_async(location: str) -> None:
click.echo(f" {k}: {v}")
click.echo()
result = await crew.akickoff(inputs=inputs)
result = await entity.akickoff(inputs=inputs)
click.echo(f"\nResult: {getattr(result, 'raw', result)}")

View File

@@ -8,12 +8,13 @@ import sqlite3
import tempfile
import time
from typing import Any
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from crewai.agent.core import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.cli.checkpoint_tui import CheckpointTUI, _run_checkpoint_tui_async
from crewai.crew import Crew
from crewai.flow.flow import Flow, start
from crewai.state.checkpoint_config import CheckpointConfig
@@ -537,3 +538,147 @@ class TestKickoffFromCheckpoint:
)
assert mock_restored.checkpoint.restore_from is None
assert result == "flow_result"
# ---------- Checkpoint TUI Flow dispatch ----------
class TestCheckpointTUIFlowDetection:
"""Tests for _is_flow_checkpoint and TUI entity-type dispatch."""
def test_is_flow_checkpoint_true(self) -> None:
entry = {"entities": [{"type": "flow", "name": "MyFlow"}]}
assert CheckpointTUI._is_flow_checkpoint(entry) is True
def test_is_flow_checkpoint_false_for_crew(self) -> None:
entry = {"entities": [{"type": "crew", "name": "MyCrew"}]}
assert CheckpointTUI._is_flow_checkpoint(entry) is False
def test_is_flow_checkpoint_false_when_no_entities(self) -> None:
assert CheckpointTUI._is_flow_checkpoint({}) is False
assert CheckpointTUI._is_flow_checkpoint({"entities": []}) is False
def test_is_flow_checkpoint_mixed_entities(self) -> None:
entry = {
"entities": [
{"type": "crew", "name": "InnerCrew"},
{"type": "flow", "name": "MyFlow"},
]
}
assert CheckpointTUI._is_flow_checkpoint(entry) is True
def test_is_flow_checkpoint_agent_only(self) -> None:
entry = {"entities": [{"type": "agent", "name": "AgentX"}]}
assert CheckpointTUI._is_flow_checkpoint(entry) is False
class TestCheckpointTUIAsyncDispatch:
"""Tests for _run_checkpoint_tui_async dispatching to Flow vs Crew."""
@pytest.mark.asyncio
async def test_flow_resume_dispatches_to_flow(self) -> None:
mock_flow = MagicMock(spec=Flow)
mock_flow.akickoff = AsyncMock(return_value="flow_ok")
with (
patch.object(
CheckpointTUI, "run_async", return_value=("/cp/path", "resume", None, None, True)
),
patch.object(Flow, "from_checkpoint", return_value=mock_flow) as mock_from_cp,
):
await _run_checkpoint_tui_async("./.checkpoints")
mock_from_cp.assert_called_once()
mock_flow.akickoff.assert_awaited_once_with(inputs=None)
@pytest.mark.asyncio
async def test_flow_fork_dispatches_to_flow(self) -> None:
mock_flow = MagicMock(spec=Flow)
mock_flow.akickoff = AsyncMock(return_value="flow_forked")
with (
patch.object(
CheckpointTUI, "run_async", return_value=("/cp/path", "fork", None, None, True)
),
patch.object(Flow, "fork", return_value=mock_flow) as mock_fork,
):
await _run_checkpoint_tui_async("./.checkpoints")
mock_fork.assert_called_once()
mock_flow.akickoff.assert_awaited_once_with(inputs=None)
@pytest.mark.asyncio
async def test_crew_resume_dispatches_to_crew(self) -> None:
mock_crew = MagicMock(spec=Crew)
mock_crew.akickoff = AsyncMock(return_value="crew_ok")
with (
patch.object(
CheckpointTUI, "run_async", return_value=("/cp/path", "resume", None, None, False)
),
patch.object(Crew, "from_checkpoint", return_value=mock_crew) as mock_from_cp,
):
await _run_checkpoint_tui_async("./.checkpoints")
mock_from_cp.assert_called_once()
mock_crew.akickoff.assert_awaited_once_with(inputs=None)
@pytest.mark.asyncio
async def test_crew_fork_dispatches_to_crew(self) -> None:
mock_crew = MagicMock(spec=Crew)
mock_crew.akickoff = AsyncMock(return_value="crew_forked")
with (
patch.object(
CheckpointTUI, "run_async", return_value=("/cp/path", "fork", None, None, False)
),
patch.object(Crew, "fork", return_value=mock_crew) as mock_fork,
):
await _run_checkpoint_tui_async("./.checkpoints")
mock_fork.assert_called_once()
mock_crew.akickoff.assert_awaited_once_with(inputs=None)
@pytest.mark.asyncio
async def test_flow_resume_with_inputs(self) -> None:
mock_flow = MagicMock(spec=Flow)
mock_flow.akickoff = AsyncMock(return_value="flow_with_inputs")
with (
patch.object(
CheckpointTUI,
"run_async",
return_value=("/cp/path", "resume", {"topic": "AI"}, None, True),
),
patch.object(Flow, "from_checkpoint", return_value=mock_flow),
):
await _run_checkpoint_tui_async("./.checkpoints")
mock_flow.akickoff.assert_awaited_once_with(inputs={"topic": "AI"})
@pytest.mark.asyncio
async def test_flow_skips_task_overrides(self) -> None:
"""Task overrides should be ignored for Flow checkpoints."""
mock_flow = MagicMock(spec=Flow)
mock_flow.akickoff = AsyncMock(return_value="flow_no_overrides")
overrides = {0: "new output"}
with (
patch.object(
CheckpointTUI,
"run_async",
return_value=("/cp/path", "resume", None, overrides, True),
),
patch.object(Flow, "from_checkpoint", return_value=mock_flow),
):
await _run_checkpoint_tui_async("./.checkpoints")
# Flow should not have tasks attribute accessed for overrides
mock_flow.akickoff.assert_awaited_once_with(inputs=None)
@pytest.mark.asyncio
async def test_tui_quit_returns_none(self) -> None:
"""If the user quits the TUI, nothing should be called."""
with patch.object(CheckpointTUI, "run_async", return_value=None):
# Should not raise
await _run_checkpoint_tui_async("./.checkpoints")