Compare commits

...

3 Commits

Author SHA1 Message Date
Devin AI
1a7d0af602 fix: use separate flow/crew variables to satisfy type-checker
Co-Authored-By: João <joao@crewai.com>
2026-04-16 08:39:47 +00:00
Devin AI
97df2b2fcf style: fix ruff formatting in checkpoint_tui.py
Co-Authored-By: João <joao@crewai.com>
2026-04-16 08:35:36 +00:00
Devin AI
ce468aad70 fix: dispatch Flow checkpoints through Flow APIs in TUI (#5492)
The checkpoint TUI was hardcoded to use Crew.from_checkpoint() and
Crew.fork() for all checkpoints. Flow checkpoints now dispatch through
Flow.from_checkpoint() / Flow.fork() instead.

Changes:
- Add _is_flow_checkpoint() to detect flow entities in checkpoint data
- Modify _run_checkpoint_tui_async to dispatch to Flow or Crew based
  on entity type
- Skip task_overrides for Flow checkpoints (not applicable)
- Update _TuiResult type to include is_flow flag
- Add 12 tests covering flow detection and dispatch paths

Co-Authored-By: João <joao@crewai.com>
2026-04-16 08:33:07 +00:00
2 changed files with 185 additions and 9 deletions

View File

@@ -78,15 +78,16 @@ def _build_entity_header(ent: dict[str, Any]) -> str:
return "\n".join(lines)
# Return type: (location, action, inputs, task_output_overrides)
_TuiResult = tuple[str, str, dict[str, Any] | None, dict[int, str] | None] | None
# Return type: (location, action, inputs, task_output_overrides, is_flow)
_TuiResult = tuple[str, str, dict[str, Any] | None, dict[int, str] | None, bool] | None
class CheckpointTUI(App[_TuiResult]):
"""TUI to browse and inspect checkpoints.
Returns ``(location, action, inputs)`` where action is ``"resume"`` or
``"fork"`` and inputs is a parsed dict or ``None``,
Returns ``(location, action, inputs, task_overrides, is_flow)`` where
action is ``"resume"`` or ``"fork"``, inputs is a parsed dict or
``None``, and *is_flow* indicates the checkpoint contains a Flow entity,
or ``None`` if the user quit without selecting.
"""
@@ -520,16 +521,25 @@ class CheckpointTUI(App[_TuiResult]):
if event.node.data is not None:
await self._show_detail(event.node.data)
@staticmethod
def _is_flow_checkpoint(entry: dict[str, Any]) -> bool:
"""Return True if the checkpoint entry contains a Flow entity."""
for ent in entry.get("entities", []):
if ent.get("type") == "flow":
return True
return False
def on_button_pressed(self, event: Button.Pressed) -> None:
if self._selected_entry is None:
return
inputs = self._collect_inputs()
overrides = self._collect_task_overrides()
loc = self._resolve_location(self._selected_entry)
is_flow = self._is_flow_checkpoint(self._selected_entry)
if event.button.id == "btn-resume":
self.exit((loc, "resume", inputs, overrides))
self.exit((loc, "resume", inputs, overrides, is_flow))
elif event.button.id == "btn-fork":
self.exit((loc, "fork", inputs, overrides))
self.exit((loc, "fork", inputs, overrides, is_flow))
def action_refresh(self) -> None:
self._refresh_tree()
@@ -545,13 +555,34 @@ async def _run_checkpoint_tui_async(location: str) -> None:
if selection is None:
return
selected, action, inputs, task_overrides = selection
selected, action, inputs, task_overrides, is_flow = selection
from crewai.crew import Crew
from crewai.state.checkpoint_config import CheckpointConfig
config = CheckpointConfig(restore_from=selected)
if is_flow:
from crewai.flow.flow import Flow
if action == "fork":
click.echo(f"\nForking Flow from: {selected}\n")
flow = Flow.fork(config)
else:
click.echo(f"\nResuming Flow from: {selected}\n")
flow = Flow.from_checkpoint(config)
if inputs:
click.echo("Inputs:")
for k, v in inputs.items():
click.echo(f" {k}: {v}")
click.echo()
result = await flow.akickoff(inputs=inputs)
click.echo(f"\nResult: {getattr(result, 'raw', result)}")
return
from crewai.crew import Crew
if action == "fork":
click.echo(f"\nForking from: {selected}\n")
crew = Crew.fork(config)

View File

@@ -8,12 +8,13 @@ import sqlite3
import tempfile
import time
from typing import Any
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from crewai.agent.core import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.cli.checkpoint_tui import CheckpointTUI, _run_checkpoint_tui_async
from crewai.crew import Crew
from crewai.flow.flow import Flow, start
from crewai.state.checkpoint_config import CheckpointConfig
@@ -537,3 +538,147 @@ class TestKickoffFromCheckpoint:
)
assert mock_restored.checkpoint.restore_from is None
assert result == "flow_result"
# ---------- Checkpoint TUI Flow dispatch ----------
class TestCheckpointTUIFlowDetection:
"""Tests for _is_flow_checkpoint and TUI entity-type dispatch."""
def test_is_flow_checkpoint_true(self) -> None:
entry = {"entities": [{"type": "flow", "name": "MyFlow"}]}
assert CheckpointTUI._is_flow_checkpoint(entry) is True
def test_is_flow_checkpoint_false_for_crew(self) -> None:
entry = {"entities": [{"type": "crew", "name": "MyCrew"}]}
assert CheckpointTUI._is_flow_checkpoint(entry) is False
def test_is_flow_checkpoint_false_when_no_entities(self) -> None:
assert CheckpointTUI._is_flow_checkpoint({}) is False
assert CheckpointTUI._is_flow_checkpoint({"entities": []}) is False
def test_is_flow_checkpoint_mixed_entities(self) -> None:
entry = {
"entities": [
{"type": "crew", "name": "InnerCrew"},
{"type": "flow", "name": "MyFlow"},
]
}
assert CheckpointTUI._is_flow_checkpoint(entry) is True
def test_is_flow_checkpoint_agent_only(self) -> None:
entry = {"entities": [{"type": "agent", "name": "AgentX"}]}
assert CheckpointTUI._is_flow_checkpoint(entry) is False
class TestCheckpointTUIAsyncDispatch:
"""Tests for _run_checkpoint_tui_async dispatching to Flow vs Crew."""
@pytest.mark.asyncio
async def test_flow_resume_dispatches_to_flow(self) -> None:
mock_flow = MagicMock(spec=Flow)
mock_flow.akickoff = AsyncMock(return_value="flow_ok")
with (
patch.object(
CheckpointTUI, "run_async", return_value=("/cp/path", "resume", None, None, True)
),
patch.object(Flow, "from_checkpoint", return_value=mock_flow) as mock_from_cp,
):
await _run_checkpoint_tui_async("./.checkpoints")
mock_from_cp.assert_called_once()
mock_flow.akickoff.assert_awaited_once_with(inputs=None)
@pytest.mark.asyncio
async def test_flow_fork_dispatches_to_flow(self) -> None:
mock_flow = MagicMock(spec=Flow)
mock_flow.akickoff = AsyncMock(return_value="flow_forked")
with (
patch.object(
CheckpointTUI, "run_async", return_value=("/cp/path", "fork", None, None, True)
),
patch.object(Flow, "fork", return_value=mock_flow) as mock_fork,
):
await _run_checkpoint_tui_async("./.checkpoints")
mock_fork.assert_called_once()
mock_flow.akickoff.assert_awaited_once_with(inputs=None)
@pytest.mark.asyncio
async def test_crew_resume_dispatches_to_crew(self) -> None:
mock_crew = MagicMock(spec=Crew)
mock_crew.akickoff = AsyncMock(return_value="crew_ok")
with (
patch.object(
CheckpointTUI, "run_async", return_value=("/cp/path", "resume", None, None, False)
),
patch.object(Crew, "from_checkpoint", return_value=mock_crew) as mock_from_cp,
):
await _run_checkpoint_tui_async("./.checkpoints")
mock_from_cp.assert_called_once()
mock_crew.akickoff.assert_awaited_once_with(inputs=None)
@pytest.mark.asyncio
async def test_crew_fork_dispatches_to_crew(self) -> None:
mock_crew = MagicMock(spec=Crew)
mock_crew.akickoff = AsyncMock(return_value="crew_forked")
with (
patch.object(
CheckpointTUI, "run_async", return_value=("/cp/path", "fork", None, None, False)
),
patch.object(Crew, "fork", return_value=mock_crew) as mock_fork,
):
await _run_checkpoint_tui_async("./.checkpoints")
mock_fork.assert_called_once()
mock_crew.akickoff.assert_awaited_once_with(inputs=None)
@pytest.mark.asyncio
async def test_flow_resume_with_inputs(self) -> None:
mock_flow = MagicMock(spec=Flow)
mock_flow.akickoff = AsyncMock(return_value="flow_with_inputs")
with (
patch.object(
CheckpointTUI,
"run_async",
return_value=("/cp/path", "resume", {"topic": "AI"}, None, True),
),
patch.object(Flow, "from_checkpoint", return_value=mock_flow),
):
await _run_checkpoint_tui_async("./.checkpoints")
mock_flow.akickoff.assert_awaited_once_with(inputs={"topic": "AI"})
@pytest.mark.asyncio
async def test_flow_skips_task_overrides(self) -> None:
"""Task overrides should be ignored for Flow checkpoints."""
mock_flow = MagicMock(spec=Flow)
mock_flow.akickoff = AsyncMock(return_value="flow_no_overrides")
overrides = {0: "new output"}
with (
patch.object(
CheckpointTUI,
"run_async",
return_value=("/cp/path", "resume", None, overrides, True),
),
patch.object(Flow, "from_checkpoint", return_value=mock_flow),
):
await _run_checkpoint_tui_async("./.checkpoints")
# Flow should not have tasks attribute accessed for overrides
mock_flow.akickoff.assert_awaited_once_with(inputs=None)
@pytest.mark.asyncio
async def test_tui_quit_returns_none(self) -> None:
"""If the user quits the TUI, nothing should be called."""
with patch.object(CheckpointTUI, "run_async", return_value=None):
# Should not raise
await _run_checkpoint_tui_async("./.checkpoints")