diff --git a/lib/crewai/src/crewai/cli/checkpoint_tui.py b/lib/crewai/src/crewai/cli/checkpoint_tui.py index e0d10f813..00f548728 100644 --- a/lib/crewai/src/crewai/cli/checkpoint_tui.py +++ b/lib/crewai/src/crewai/cli/checkpoint_tui.py @@ -78,15 +78,16 @@ def _build_entity_header(ent: dict[str, Any]) -> str: return "\n".join(lines) -# Return type: (location, action, inputs, task_output_overrides) -_TuiResult = tuple[str, str, dict[str, Any] | None, dict[int, str] | None] | None +# Return type: (location, action, inputs, task_output_overrides, is_flow) +_TuiResult = tuple[str, str, dict[str, Any] | None, dict[int, str] | None, bool] | None class CheckpointTUI(App[_TuiResult]): """TUI to browse and inspect checkpoints. - Returns ``(location, action, inputs)`` where action is ``"resume"`` or - ``"fork"`` and inputs is a parsed dict or ``None``, + Returns ``(location, action, inputs, task_overrides, is_flow)`` where + action is ``"resume"`` or ``"fork"``, inputs is a parsed dict or + ``None``, and *is_flow* indicates the checkpoint contains a Flow entity, or ``None`` if the user quit without selecting. """ @@ -520,16 +521,25 @@ class CheckpointTUI(App[_TuiResult]): if event.node.data is not None: await self._show_detail(event.node.data) + @staticmethod + def _is_flow_checkpoint(entry: dict[str, Any]) -> bool: + """Return True if the checkpoint entry contains a Flow entity.""" + for ent in entry.get("entities", []): + if ent.get("type") == "flow": + return True + return False + def on_button_pressed(self, event: Button.Pressed) -> None: if self._selected_entry is None: return inputs = self._collect_inputs() overrides = self._collect_task_overrides() loc = self._resolve_location(self._selected_entry) + is_flow = self._is_flow_checkpoint(self._selected_entry) if event.button.id == "btn-resume": - self.exit((loc, "resume", inputs, overrides)) + self.exit((loc, "resume", inputs, overrides, is_flow)) elif event.button.id == "btn-fork": - self.exit((loc, "fork", inputs, overrides)) + self.exit((loc, "fork", inputs, overrides, is_flow)) def action_refresh(self) -> None: self._refresh_tree() @@ -545,37 +555,48 @@ async def _run_checkpoint_tui_async(location: str) -> None: if selection is None: return - selected, action, inputs, task_overrides = selection + selected, action, inputs, task_overrides, is_flow = selection - from crewai.crew import Crew from crewai.state.checkpoint_config import CheckpointConfig config = CheckpointConfig(restore_from=selected) - if action == "fork": - click.echo(f"\nForking from: {selected}\n") - crew = Crew.fork(config) - else: - click.echo(f"\nResuming from: {selected}\n") - crew = Crew.from_checkpoint(config) + if is_flow: + from crewai.flow.flow import Flow - if task_overrides: + if action == "fork": + click.echo(f"\nForking Flow from: {selected}\n") + entity = Flow.fork(config) + else: + click.echo(f"\nResuming Flow from: {selected}\n") + entity = Flow.from_checkpoint(config) + else: + from crewai.crew import Crew + + if action == "fork": + click.echo(f"\nForking from: {selected}\n") + entity = Crew.fork(config) + else: + click.echo(f"\nResuming from: {selected}\n") + entity = Crew.from_checkpoint(config) + + if task_overrides and not is_flow: click.echo("Modifications:") overridden_agents: set[int] = set() for task_idx, new_output in task_overrides.items(): - if task_idx < len(crew.tasks) and crew.tasks[task_idx].output is not None: - desc = crew.tasks[task_idx].description or f"Task {task_idx + 1}" + if task_idx < len(entity.tasks) and entity.tasks[task_idx].output is not None: + desc = entity.tasks[task_idx].description or f"Task {task_idx + 1}" if len(desc) > 60: desc = desc[:57] + "..." - crew.tasks[task_idx].output.raw = new_output # type: ignore[union-attr] + entity.tasks[task_idx].output.raw = new_output # type: ignore[union-attr] preview = new_output.replace("\n", " ") if len(preview) > 80: preview = preview[:77] + "..." click.echo(f" Task {task_idx + 1}: {desc}") click.echo(f" -> {preview}") - agent = crew.tasks[task_idx].agent + agent = entity.tasks[task_idx].agent if agent and agent.agent_executor: - nth = sum(1 for t in crew.tasks[:task_idx] if t.agent is agent) + nth = sum(1 for t in entity.tasks[:task_idx] if t.agent is agent) messages = agent.agent_executor.messages system_positions = [ i for i, m in enumerate(messages) if m.get("role") == "system" @@ -595,7 +616,7 @@ async def _run_checkpoint_tui_async(location: str) -> None: earliest = min(task_overrides) for offset, subsequent in enumerate( - crew.tasks[earliest + 1 :], start=earliest + 1 + entity.tasks[earliest + 1 :], start=earliest + 1 ): if subsequent.output and offset not in task_overrides: subsequent.output = None @@ -611,7 +632,7 @@ async def _run_checkpoint_tui_async(location: str) -> None: click.echo(f" {k}: {v}") click.echo() - result = await crew.akickoff(inputs=inputs) + result = await entity.akickoff(inputs=inputs) click.echo(f"\nResult: {getattr(result, 'raw', result)}") diff --git a/lib/crewai/tests/test_checkpoint.py b/lib/crewai/tests/test_checkpoint.py index f645541a4..c3a3642b2 100644 --- a/lib/crewai/tests/test_checkpoint.py +++ b/lib/crewai/tests/test_checkpoint.py @@ -8,12 +8,13 @@ import sqlite3 import tempfile import time from typing import Any -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from crewai.agent.core import Agent from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.cli.checkpoint_tui import CheckpointTUI, _run_checkpoint_tui_async from crewai.crew import Crew from crewai.flow.flow import Flow, start from crewai.state.checkpoint_config import CheckpointConfig @@ -537,3 +538,147 @@ class TestKickoffFromCheckpoint: ) assert mock_restored.checkpoint.restore_from is None assert result == "flow_result" + + +# ---------- Checkpoint TUI Flow dispatch ---------- + + +class TestCheckpointTUIFlowDetection: + """Tests for _is_flow_checkpoint and TUI entity-type dispatch.""" + + def test_is_flow_checkpoint_true(self) -> None: + entry = {"entities": [{"type": "flow", "name": "MyFlow"}]} + assert CheckpointTUI._is_flow_checkpoint(entry) is True + + def test_is_flow_checkpoint_false_for_crew(self) -> None: + entry = {"entities": [{"type": "crew", "name": "MyCrew"}]} + assert CheckpointTUI._is_flow_checkpoint(entry) is False + + def test_is_flow_checkpoint_false_when_no_entities(self) -> None: + assert CheckpointTUI._is_flow_checkpoint({}) is False + assert CheckpointTUI._is_flow_checkpoint({"entities": []}) is False + + def test_is_flow_checkpoint_mixed_entities(self) -> None: + entry = { + "entities": [ + {"type": "crew", "name": "InnerCrew"}, + {"type": "flow", "name": "MyFlow"}, + ] + } + assert CheckpointTUI._is_flow_checkpoint(entry) is True + + def test_is_flow_checkpoint_agent_only(self) -> None: + entry = {"entities": [{"type": "agent", "name": "AgentX"}]} + assert CheckpointTUI._is_flow_checkpoint(entry) is False + + +class TestCheckpointTUIAsyncDispatch: + """Tests for _run_checkpoint_tui_async dispatching to Flow vs Crew.""" + + @pytest.mark.asyncio + async def test_flow_resume_dispatches_to_flow(self) -> None: + mock_flow = MagicMock(spec=Flow) + mock_flow.akickoff = AsyncMock(return_value="flow_ok") + + with ( + patch.object( + CheckpointTUI, "run_async", return_value=("/cp/path", "resume", None, None, True) + ), + patch.object(Flow, "from_checkpoint", return_value=mock_flow) as mock_from_cp, + ): + await _run_checkpoint_tui_async("./.checkpoints") + + mock_from_cp.assert_called_once() + mock_flow.akickoff.assert_awaited_once_with(inputs=None) + + @pytest.mark.asyncio + async def test_flow_fork_dispatches_to_flow(self) -> None: + mock_flow = MagicMock(spec=Flow) + mock_flow.akickoff = AsyncMock(return_value="flow_forked") + + with ( + patch.object( + CheckpointTUI, "run_async", return_value=("/cp/path", "fork", None, None, True) + ), + patch.object(Flow, "fork", return_value=mock_flow) as mock_fork, + ): + await _run_checkpoint_tui_async("./.checkpoints") + + mock_fork.assert_called_once() + mock_flow.akickoff.assert_awaited_once_with(inputs=None) + + @pytest.mark.asyncio + async def test_crew_resume_dispatches_to_crew(self) -> None: + mock_crew = MagicMock(spec=Crew) + mock_crew.akickoff = AsyncMock(return_value="crew_ok") + + with ( + patch.object( + CheckpointTUI, "run_async", return_value=("/cp/path", "resume", None, None, False) + ), + patch.object(Crew, "from_checkpoint", return_value=mock_crew) as mock_from_cp, + ): + await _run_checkpoint_tui_async("./.checkpoints") + + mock_from_cp.assert_called_once() + mock_crew.akickoff.assert_awaited_once_with(inputs=None) + + @pytest.mark.asyncio + async def test_crew_fork_dispatches_to_crew(self) -> None: + mock_crew = MagicMock(spec=Crew) + mock_crew.akickoff = AsyncMock(return_value="crew_forked") + + with ( + patch.object( + CheckpointTUI, "run_async", return_value=("/cp/path", "fork", None, None, False) + ), + patch.object(Crew, "fork", return_value=mock_crew) as mock_fork, + ): + await _run_checkpoint_tui_async("./.checkpoints") + + mock_fork.assert_called_once() + mock_crew.akickoff.assert_awaited_once_with(inputs=None) + + @pytest.mark.asyncio + async def test_flow_resume_with_inputs(self) -> None: + mock_flow = MagicMock(spec=Flow) + mock_flow.akickoff = AsyncMock(return_value="flow_with_inputs") + + with ( + patch.object( + CheckpointTUI, + "run_async", + return_value=("/cp/path", "resume", {"topic": "AI"}, None, True), + ), + patch.object(Flow, "from_checkpoint", return_value=mock_flow), + ): + await _run_checkpoint_tui_async("./.checkpoints") + + mock_flow.akickoff.assert_awaited_once_with(inputs={"topic": "AI"}) + + @pytest.mark.asyncio + async def test_flow_skips_task_overrides(self) -> None: + """Task overrides should be ignored for Flow checkpoints.""" + mock_flow = MagicMock(spec=Flow) + mock_flow.akickoff = AsyncMock(return_value="flow_no_overrides") + + overrides = {0: "new output"} + with ( + patch.object( + CheckpointTUI, + "run_async", + return_value=("/cp/path", "resume", None, overrides, True), + ), + patch.object(Flow, "from_checkpoint", return_value=mock_flow), + ): + await _run_checkpoint_tui_async("./.checkpoints") + + # Flow should not have tasks attribute accessed for overrides + mock_flow.akickoff.assert_awaited_once_with(inputs=None) + + @pytest.mark.asyncio + async def test_tui_quit_returns_none(self) -> None: + """If the user quits the TUI, nothing should be called.""" + with patch.object(CheckpointTUI, "run_async", return_value=None): + # Should not raise + await _run_checkpoint_tui_async("./.checkpoints")