fix: emit task_started on fork resume, redesign checkpoint TUI
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

Redesign checkpoint TUI with tabbed detail panel, collapsible
agent rosters, keybinding actions, and human-readable timestamps.
This commit is contained in:
Greyson LaLonde
2026-04-18 04:19:31 +08:00
committed by GitHub
parent c9b0004d0e
commit f879909526
3 changed files with 448 additions and 221 deletions

View File

@@ -106,17 +106,50 @@ def _parse_checkpoint_json(raw: str, source: str) -> dict[str, Any]:
"name": entity.get("name"),
"id": entity.get("id"),
}
raw_agents = entity.get("agents", [])
agents_by_id: dict[str, dict[str, Any]] = {}
parsed_agents: list[dict[str, Any]] = []
for ag in raw_agents:
agent_info: dict[str, Any] = {
"id": ag.get("id", ""),
"role": ag.get("role", ""),
"goal": ag.get("goal", ""),
}
parsed_agents.append(agent_info)
if ag.get("id"):
agents_by_id[str(ag["id"])] = agent_info
if parsed_agents:
info["agents"] = parsed_agents
if tasks:
info["tasks_completed"] = completed
info["tasks_total"] = len(tasks)
info["tasks"] = [
{
parsed_tasks: list[dict[str, Any]] = []
for t in tasks:
task_info: dict[str, Any] = {
"description": t.get("description", ""),
"completed": t.get("output") is not None,
"output": (t.get("output") or {}).get("raw", ""),
}
for t in tasks
]
task_agent = t.get("agent")
if isinstance(task_agent, dict):
task_info["agent_role"] = task_agent.get("role", "")
task_info["agent_id"] = task_agent.get("id", "")
elif isinstance(task_agent, str) and task_agent in agents_by_id:
task_info["agent_role"] = agents_by_id[task_agent].get("role", "")
task_info["agent_id"] = task_agent
parsed_tasks.append(task_info)
info["tasks"] = parsed_tasks
if entity.get("entity_type") == "flow":
completed_methods = entity.get("checkpoint_completed_methods")
if completed_methods:
info["completed_methods"] = sorted(completed_methods)
state = entity.get("checkpoint_state")
if isinstance(state, dict):
info["flow_state"] = state
parsed_entities.append(info)
inputs: dict[str, Any] = {}

View File

@@ -3,17 +3,20 @@
from __future__ import annotations
from collections import defaultdict
from datetime import datetime
from typing import Any, ClassVar, Literal
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.widgets import (
Button,
Collapsible,
Footer,
Header,
Input,
Static,
TabPane,
TabbedContent,
TextArea,
Tree,
)
@@ -32,6 +35,22 @@ _TERTIARY = "#ffffff"
_DIM = "#888888"
_BG_DARK = "#0d1117"
_BG_PANEL = "#161b22"
_ACCENT = "#c9a227"
_SUCCESS = "#3fb950"
_PENDING = "#e3b341"
_ENTITY_ICONS: dict[str, str] = {
"flow": "",
"crew": "",
"agent": "",
"unknown": "",
}
_ENTITY_COLORS: dict[str, str] = {
"flow": _ACCENT,
"crew": _SECONDARY,
"agent": _PRIMARY,
"unknown": _DIM,
}
def _load_entries(location: str) -> list[dict[str, Any]]:
@@ -40,8 +59,27 @@ def _load_entries(location: str) -> list[dict[str, Any]]:
return _list_json(location)
def _human_ts(ts: str) -> str:
"""Turn '2026-04-17 17:05:00' into a short relative label."""
try:
dt = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S")
except ValueError:
return ts
now = datetime.now()
delta = now.date() - dt.date()
hour = dt.hour % 12 or 12
ampm = "am" if dt.hour < 12 else "pm"
time_str = f"{hour}:{dt.minute:02d}{ampm}"
if delta.days == 0:
return time_str
if delta.days == 1:
return f"yest {time_str}"
if delta.days < 7:
return f"{dt.strftime('%a').lower()} {time_str}"
return f"{dt.strftime('%b')} {dt.day}"
def _short_id(name: str) -> str:
"""Shorten a checkpoint name for tree display."""
if len(name) > 30:
return name[:27] + "..."
return name
@@ -63,22 +101,22 @@ def _entry_id(entry: dict[str, Any]) -> str:
return name
def _build_entity_header(ent: dict[str, Any]) -> str:
"""Build rich text header for an entity (progress bar only)."""
lines: list[str] = []
tasks = ent.get("tasks")
if isinstance(tasks, list):
completed = ent.get("tasks_completed", 0)
total = ent.get("tasks_total", 0)
pct = int(completed / total * 100) if total else 0
bar_len = 20
filled = int(bar_len * completed / total) if total else 0
bar = f"[{_PRIMARY}]{'' * filled}[/][{_DIM}]{'' * (bar_len - filled)}[/]"
lines.append(f"{bar} {completed}/{total} tasks ({pct}%)")
return "\n".join(lines)
def _build_progress_bar(completed: int, total: int, width: int = 20) -> str:
if total == 0:
return f"[{_DIM}]{'' * width}[/] 0/0"
pct = int(completed / total * 100)
filled = int(width * completed / total)
color = _SUCCESS if completed == total else _PRIMARY
bar = f"[{color}]{'' * filled}[/][{_DIM}]{'' * (width - filled)}[/]"
return f"{bar} {completed}/{total} ({pct}%)"
def _entity_icon(etype: str) -> str:
icon = _ENTITY_ICONS.get(etype, _ENTITY_ICONS["unknown"])
color = _ENTITY_COLORS.get(etype, _DIM)
return f"[{color}]{icon}[/]"
# Return type: (location, action, inputs, task_output_overrides, entity_type)
_TuiResult = (
tuple[
str,
@@ -122,7 +160,7 @@ class CheckpointTUI(App[_TuiResult]):
height: 1fr;
}}
#tree-panel {{
width: 45%;
width: 40%;
background: {_BG_PANEL};
border: round {_SECONDARY};
padding: 0 1;
@@ -132,41 +170,81 @@ class CheckpointTUI(App[_TuiResult]):
border: round {_PRIMARY};
}}
#detail-container {{
width: 55%;
width: 60%;
height: 1fr;
}}
#detail-scroll {{
height: 1fr;
background: {_BG_PANEL};
border: round {_SECONDARY};
padding: 1 2;
scrollbar-color: {_PRIMARY};
}}
#detail-scroll:focus-within {{
border: round {_PRIMARY};
}}
#detail-header {{
margin-bottom: 1;
}}
#status {{
height: 1;
padding: 0 2;
color: {_DIM};
}}
#inputs-section {{
display: none;
height: auto;
max-height: 8;
padding: 0 1;
#detail-tabs {{
height: 1fr;
}}
#inputs-section.visible {{
display: block;
TabbedContent > ContentSwitcher {{
background: {_BG_PANEL};
height: 1fr;
}}
#inputs-label {{
height: 1;
TabPane {{
padding: 0;
}}
Tabs {{
background: {_BG_DARK};
}}
Tab {{
background: {_BG_DARK};
color: {_DIM};
padding: 0 2;
}}
Tab.-active {{
background: {_BG_PANEL};
color: {_PRIMARY};
}}
Tab:hover {{
color: {_TERTIARY};
}}
Underline > .underline--bar {{
color: {_SECONDARY};
background: {_BG_DARK};
}}
.tab-scroll {{
background: {_BG_PANEL};
height: 1fr;
padding: 1 2;
scrollbar-color: {_PRIMARY};
}}
.section-header {{
padding: 0 0 0 1;
margin: 1 0 0 0;
}}
.detail-line {{
padding: 0 0 0 1;
}}
.task-label {{
padding: 0 1;
}}
.task-output-editor {{
height: auto;
max-height: 10;
margin: 0 1 1 3;
border: round {_DIM};
}}
.task-output-editor:focus {{
border: round {_PRIMARY};
}}
Collapsible {{
background: {_BG_PANEL};
padding: 0;
margin: 0 0 1 1;
}}
CollapsibleTitle {{
background: {_BG_DARK};
color: {_TERTIARY};
padding: 0 1;
}}
CollapsibleTitle:hover {{
background: {_SECONDARY};
}}
.input-row {{
height: 3;
padding: 0 1;
@@ -180,55 +258,9 @@ class CheckpointTUI(App[_TuiResult]):
.input-row Input {{
width: 1fr;
}}
#no-inputs-label {{
height: 1;
.empty-state {{
color: {_DIM};
padding: 0 1;
}}
#action-buttons {{
height: 3;
align: right middle;
padding: 0 1;
display: none;
}}
#action-buttons.visible {{
display: block;
}}
#action-buttons Button {{
margin: 0 0 0 1;
min-width: 10;
}}
#btn-resume {{
background: {_SECONDARY};
color: {_TERTIARY};
}}
#btn-resume:hover {{
background: {_PRIMARY};
}}
#btn-fork {{
background: {_PRIMARY};
color: {_TERTIARY};
}}
#btn-fork:hover {{
background: {_SECONDARY};
}}
.entity-title {{
padding: 1 1 0 1;
}}
.entity-detail {{
padding: 0 1;
}}
.task-output-editor {{
height: auto;
max-height: 10;
margin: 0 1 1 1;
border: round {_DIM};
}}
.task-output-editor:focus {{
border: round {_PRIMARY};
}}
.task-label {{
padding: 0 1;
padding: 1;
}}
Tree {{
background: {_BG_PANEL};
@@ -242,6 +274,8 @@ class CheckpointTUI(App[_TuiResult]):
BINDINGS: ClassVar[list[Binding | tuple[str, str] | tuple[str, str, str]]] = [
("q", "quit", "Quit"),
("r", "refresh", "Refresh"),
("e", "resume", "Resume"),
("f", "fork", "Fork"),
]
def __init__(self, location: str = "./.checkpoints") -> None:
@@ -256,27 +290,49 @@ class CheckpointTUI(App[_TuiResult]):
yield Header(show_clock=False)
with Horizontal(id="main-layout"):
tree: Tree[dict[str, Any]] = Tree("Checkpoints", id="tree-panel")
tree.show_root = True
tree.show_root = False
tree.guide_depth = 3
yield tree
with Vertical(id="detail-container"):
yield Static("", id="status")
with VerticalScroll(id="detail-scroll"):
yield Static(
f"[{_DIM}]Select a checkpoint from the tree[/]", # noqa: S608
id="detail-header",
)
with Vertical(id="inputs-section"):
yield Static("Inputs", id="inputs-label")
with Horizontal(id="action-buttons"):
yield Button("Resume", id="btn-resume")
yield Button("Fork", id="btn-fork")
with TabbedContent(id="detail-tabs"):
with TabPane("Overview", id="tab-overview"):
with VerticalScroll(classes="tab-scroll"):
yield Static(
f"[{_DIM}]Select a checkpoint from the tree[/]", # noqa: S608
id="overview-empty",
)
with TabPane("Tasks", id="tab-tasks"):
with VerticalScroll(classes="tab-scroll"):
yield Static(
f"[{_DIM}]Select a checkpoint to view tasks[/]",
id="tasks-empty",
)
with TabPane("Inputs", id="tab-inputs"):
with VerticalScroll(classes="tab-scroll"):
yield Static(
f"[{_DIM}]Select a checkpoint to view inputs[/]",
id="inputs-empty",
)
yield Footer()
async def on_mount(self) -> None:
self._refresh_tree()
self.query_one("#tree-panel", Tree).root.expand()
# ── Tree building ──────────────────────────────────────────────
@staticmethod
def _top_level_entity(entry: dict[str, Any]) -> tuple[str, str]:
etype, ename = "unknown", ""
for ent in entry.get("entities", []):
t = ent.get("type", "unknown")
if t == "flow":
return "flow", ent.get("name") or ""
if t == "crew" and etype != "crew":
etype, ename = "crew", ent.get("name") or ""
return etype, ename
def _refresh_tree(self) -> None:
self._entries = _load_entries(self._location)
self._selected_entry = None
@@ -285,45 +341,57 @@ class CheckpointTUI(App[_TuiResult]):
tree.clear()
if not self._entries:
self.query_one("#detail-header", Static).update(
f"[{_DIM}]No checkpoints in {self._location}[/]"
)
self.query_one("#status", Static).update("")
self.sub_title = self._location
self.query_one("#status", Static).update("")
return
# Group by branch
branches: dict[str, list[dict[str, Any]]] = defaultdict(list)
grouped: dict[tuple[str, str], dict[str, list[dict[str, Any]]]] = defaultdict(
lambda: defaultdict(list)
)
for entry in self._entries:
key = self._top_level_entity(entry)
branch = entry.get("branch", "main")
branches[branch].append(entry)
# Index checkpoint names to tree nodes so forks can attach
node_by_name: dict[str, Any] = {}
grouped[key][branch].append(entry)
def _make_label(e: dict[str, Any]) -> str:
name = e.get("name", "")
ts = e.get("ts") or ""
trigger = e.get("trigger") or ""
parts = [f"[bold]{_short_id(name)}[/]"]
if ts:
time_part = ts.split(" ")[-1] if " " in ts else ts
time_part = ts.split(" ")[-1] if " " in ts else ts
total_c, total_t = 0, 0
for ent in e.get("entities", []):
c = ent.get("tasks_completed")
t = ent.get("tasks_total")
if c is not None and t is not None:
total_c += c
total_t += t
parts: list[str] = []
if time_part:
parts.append(f"[{_DIM}]{time_part}[/]")
if trigger:
parts.append(f"[{_PRIMARY}]{trigger}[/]")
return " ".join(parts)
if total_t:
display_c = total_c
if trigger == "task_started" and total_c < total_t:
display_c = total_c + 1
color = _SUCCESS if total_c == total_t else _DIM
parts.append(f"[{color}]{display_c}/{total_t}[/]")
return " ".join(parts) if parts else _short_id(e.get("name", ""))
fork_parents: set[str] = set()
for branch_name, entries in branches.items():
if branch_name == "main" or not entries:
continue
oldest = min(entries, key=lambda e: str(e.get("name", "")))
first_parent = oldest.get("parent_id")
if first_parent:
fork_parents.add(str(first_parent))
for branches in grouped.values():
for branch_name, entries in branches.items():
if branch_name == "main" or not entries:
continue
oldest = min(entries, key=lambda e: str(e.get("name", "")))
first_parent = oldest.get("parent_id")
if first_parent:
fork_parents.add(str(first_parent))
node_by_name: dict[str, Any] = {}
def _add_checkpoint(parent_node: Any, e: dict[str, Any]) -> None:
"""Add a checkpoint node — expandable only if a fork attaches to it."""
cp_id = _entry_id(e)
if cp_id in fork_parents:
node = parent_node.add(
@@ -333,67 +401,97 @@ class CheckpointTUI(App[_TuiResult]):
node = parent_node.add_leaf(_make_label(e), data=e)
node_by_name[cp_id] = node
if "main" in branches:
for entry in reversed(branches["main"]):
_add_checkpoint(tree.root, entry)
type_order = {"flow": 0, "crew": 1}
sorted_keys = sorted(
grouped.keys(), key=lambda k: (type_order.get(k[0], 9), k[1])
)
for etype, ename in sorted_keys:
branches = grouped[(etype, ename)]
icon = _entity_icon(etype)
color = _ENTITY_COLORS.get(etype, _DIM)
total = sum(len(v) for v in branches.values())
label_parts = [f"{icon} [bold {color}]{etype.upper()}[/]"]
if ename:
label_parts.append(f"[bold]{ename}[/]")
label_parts.append(f"[{_DIM}]({total})[/]")
all_entries = [e for bl in branches.values() for e in bl]
timestamps = [str(e.get("ts", "")) for e in all_entries if e.get("ts")]
if timestamps:
latest = max(timestamps)
label_parts.append(f"[{_DIM}]{_human_ts(latest)}[/]")
entity_label = " ".join(label_parts)
entity_node = tree.root.add(entity_label, expand=True)
if "main" in branches:
for entry in reversed(branches["main"]):
_add_checkpoint(entity_node, entry)
fork_branches = [
(name, sorted(entries, key=lambda e: str(e.get("name", ""))))
for name, entries in branches.items()
if name != "main"
]
remaining = fork_branches
max_passes = len(remaining) + 1
while remaining and max_passes > 0:
max_passes -= 1
deferred = []
made_progress = False
for branch_name, entries in remaining:
first_parent = entries[0].get("parent_id") if entries else None
if first_parent and str(first_parent) not in node_by_name:
deferred.append((branch_name, entries))
continue
attach_to: Any = entity_node
if first_parent:
attach_to = node_by_name.get(str(first_parent), entity_node)
branch_label = (
f"[bold {_SECONDARY}]{branch_name}[/] "
f"[{_DIM}]({len(entries)})[/]"
)
branch_node = attach_to.add(branch_label, expand=False)
for entry in entries:
_add_checkpoint(branch_node, entry)
made_progress = True
remaining = deferred
if not made_progress:
break
fork_branches = [
(name, sorted(entries, key=lambda e: str(e.get("name", ""))))
for name, entries in branches.items()
if name != "main"
]
remaining = fork_branches
max_passes = len(remaining) + 1
while remaining and max_passes > 0:
max_passes -= 1
deferred = []
made_progress = False
for branch_name, entries in remaining:
first_parent = entries[0].get("parent_id") if entries else None
if first_parent and str(first_parent) not in node_by_name:
deferred.append((branch_name, entries))
continue
attach_to: Any = tree.root
if first_parent:
attach_to = node_by_name.get(str(first_parent), tree.root)
branch_label = (
f"[bold {_SECONDARY}]{branch_name}[/] [{_DIM}]({len(entries)})[/]"
f"[bold {_SECONDARY}]{branch_name}[/] "
f"[{_DIM}]({len(entries)})[/] [{_DIM}](orphaned)[/]"
)
branch_node = attach_to.add(branch_label, expand=False)
branch_node = entity_node.add(branch_label, expand=False)
for entry in entries:
_add_checkpoint(branch_node, entry)
made_progress = True
remaining = deferred
if not made_progress:
break
for branch_name, entries in remaining:
branch_label = (
f"[bold {_SECONDARY}]{branch_name}[/] "
f"[{_DIM}]({len(entries)})[/] [{_DIM}](orphaned)[/]"
)
branch_node = tree.root.add(branch_label, expand=False)
for entry in entries:
_add_checkpoint(branch_node, entry)
count = len(self._entries)
storage = "SQLite" if _is_sqlite(self._location) else "JSON"
self.sub_title = self._location
self.query_one("#status", Static).update(f" {count} checkpoint(s) | {storage}")
async def _show_detail(self, entry: dict[str, Any]) -> None:
"""Update the detail panel for a checkpoint entry."""
self._selected_entry = entry
self.query_one("#action-buttons").add_class("visible")
# ── Detail panel ───────────────────────────────────────────────
detail_scroll = self.query_one("#detail-scroll", VerticalScroll)
# Remove all dynamic children except the header — await so IDs are freed
to_remove = [c for c in detail_scroll.children if c.id != "detail-header"]
for child in to_remove:
async def _clear_scroll(self, tab_id: str) -> VerticalScroll:
tab = self.query_one(f"#{tab_id}", TabPane)
scroll = tab.query_one(VerticalScroll)
for child in list(scroll.children):
await child.remove()
return scroll
async def _show_detail(self, entry: dict[str, Any]) -> None:
self._selected_entry = entry
await self._render_overview(entry)
await self._render_tasks(entry)
await self._render_inputs(entry.get("inputs", {}))
async def _render_overview(self, entry: dict[str, Any]) -> None:
scroll = await self._clear_scroll("tab-overview")
# Header
name = entry.get("name", "")
ts = entry.get("ts") or "unknown"
trigger = entry.get("trigger") or ""
@@ -414,42 +512,115 @@ class CheckpointTUI(App[_TuiResult]):
header_lines.append(f" [bold]Branch[/] [{_SECONDARY}]{branch}[/]")
if parent_id:
header_lines.append(f" [bold]Parent[/] [{_DIM}]{parent_id}[/]")
if "path" in entry:
header_lines.append(f" [bold]Path[/] [{_DIM}]{entry['path']}[/]")
if "db" in entry:
header_lines.append(f" [bold]Database[/] [{_DIM}]{entry['db']}[/]")
self.query_one("#detail-header", Static).update("\n".join(header_lines))
await scroll.mount(Static("\n".join(header_lines)))
for ent in entry.get("entities", []):
etype = ent.get("type", "unknown")
ename = ent.get("name", "unnamed")
icon = _entity_icon(etype)
color = _ENTITY_COLORS.get(etype, _DIM)
eid = str(ent.get("id", ""))[:8]
entity_title = (
f"\n{icon} [bold {color}]{etype.upper()}[/] [bold]{ename}[/]"
)
if eid:
entity_title += f" [{_DIM}]{eid}…[/]"
await scroll.mount(Static(entity_title, classes="section-header"))
await scroll.mount(Static(f"[{_DIM}]{'' * 46}[/]", classes="detail-line"))
if etype == "flow":
methods = ent.get("completed_methods", [])
if methods:
method_list = ", ".join(f"[{_SUCCESS}]{m}[/]" for m in methods)
await scroll.mount(
Static(
f" [bold]Methods[/] {method_list}",
classes="detail-line",
)
)
flow_state = ent.get("flow_state")
if isinstance(flow_state, dict) and flow_state:
state_parts: list[str] = []
for k, v in list(flow_state.items())[:5]:
sv = str(v)
if len(sv) > 40:
sv = sv[:37] + "..."
state_parts.append(f"[{_DIM}]{k}[/]={sv}")
await scroll.mount(
Static(
f" [bold]State[/] {', '.join(state_parts)}",
classes="detail-line",
)
)
agents = ent.get("agents", [])
if agents:
agent_lines: list[Static] = []
for ag in agents:
role = ag.get("role", "unnamed")
goal = ag.get("goal", "")
if len(goal) > 60:
goal = goal[:57] + "..."
agent_line = f" {_entity_icon('agent')} [bold]{role}[/]"
if goal:
agent_line += f"\n [{_DIM}]{goal}[/]"
agent_lines.append(Static(agent_line))
collapsible = Collapsible(
*agent_lines,
title=f"Agents ({len(agents)})",
collapsed=len(agents) > 3,
)
await scroll.mount(collapsible)
async def _render_tasks(self, entry: dict[str, Any]) -> None:
scroll = await self._clear_scroll("tab-tasks")
# Entity details and editable task outputs — mounted flat for scrolling
self._task_output_ids = []
flat_task_idx = 0
has_tasks = False
for ent_idx, ent in enumerate(entry.get("entities", [])):
etype = ent.get("type", "unknown")
ename = ent.get("name", "unnamed")
completed = ent.get("tasks_completed")
total = ent.get("tasks_total")
entity_title = f"[bold {_SECONDARY}]{etype}: {ename}[/]"
if completed is not None and total is not None:
entity_title += f" [{_DIM}]{completed}/{total} tasks[/]"
await detail_scroll.mount(Static(entity_title, classes="entity-title"))
await detail_scroll.mount(
Static(_build_entity_header(ent), classes="entity-detail")
)
icon = _entity_icon(etype)
color = _ENTITY_COLORS.get(etype, _DIM)
tasks = ent.get("tasks", [])
if not tasks:
continue
has_tasks = True
completed = ent.get("tasks_completed", 0)
total = ent.get("tasks_total", 0)
await scroll.mount(
Static(
f"{icon} [bold {color}]{ename}[/] "
f"{_build_progress_bar(completed, total, width=16)}",
classes="section-header",
)
)
for i, task in enumerate(tasks):
desc = str(task.get("description", ""))
if len(desc) > 55:
desc = desc[:52] + "..."
if len(desc) > 50:
desc = desc[:47] + "..."
agent_role = task.get("agent_role", "")
if task.get("completed"):
icon = "[green]✓[/]"
await detail_scroll.mount(
Static(f" {icon} {i + 1}. {desc}", classes="task-label")
)
status_icon = f"[{_SUCCESS}]✓[/]"
task_line = f" {status_icon} {i + 1}. {desc}"
if agent_role:
task_line += (
f" [{_DIM}]→ {_entity_icon('agent')} {agent_role}[/]"
)
await scroll.mount(Static(task_line, classes="task-label"))
output_text = task.get("output", "")
editor_id = f"task-output-{ent_idx}-{i}"
await detail_scroll.mount(
await scroll.mount(
TextArea(
str(output_text),
classes="task-output-editor",
@@ -460,28 +631,25 @@ class CheckpointTUI(App[_TuiResult]):
(flat_task_idx, editor_id, str(output_text))
)
else:
icon = "[yellow]○[/]"
await detail_scroll.mount(
Static(f" {icon} {i + 1}. {desc}", classes="task-label")
)
status_icon = f"[{_PENDING}]○[/]"
task_line = f" {status_icon} {i + 1}. {desc}"
if agent_role:
task_line += (
f" [{_DIM}]→ {_entity_icon('agent')} {agent_role}[/]"
)
await scroll.mount(Static(task_line, classes="task-label"))
flat_task_idx += 1
# Build input fields
await self._build_input_fields(entry.get("inputs", {}))
if not has_tasks:
await scroll.mount(Static(f"[{_DIM}]No tasks[/]", classes="empty-state"))
async def _build_input_fields(self, inputs: dict[str, Any]) -> None:
"""Rebuild the inputs section with one field per input key."""
section = self.query_one("#inputs-section")
# Remove old dynamic children — await so IDs are freed
for widget in list(section.query(".input-row, .no-inputs")):
await widget.remove()
async def _render_inputs(self, inputs: dict[str, Any]) -> None:
scroll = await self._clear_scroll("tab-inputs")
self._input_keys = []
if not inputs:
await section.mount(Static(f"[{_DIM}]No inputs[/]", classes="no-inputs"))
section.add_class("visible")
await scroll.mount(Static(f"[{_DIM}]No inputs[/]", classes="empty-state"))
return
for key, value in inputs.items():
@@ -491,12 +659,11 @@ class CheckpointTUI(App[_TuiResult]):
row.compose_add_child(
Input(value=str(value), placeholder=key, id=f"input-{key}")
)
await section.mount(row)
await scroll.mount(row)
section.add_class("visible")
# ── Data collection ────────────────────────────────────────────
def _collect_inputs(self) -> dict[str, Any] | None:
"""Collect current values from input fields."""
if not self._input_keys:
return None
result: dict[str, Any] = {}
@@ -506,7 +673,6 @@ class CheckpointTUI(App[_TuiResult]):
return result
def _collect_task_overrides(self) -> dict[int, str] | None:
"""Collect edited task outputs. Returns only changed values."""
if not self._task_output_ids or self._selected_entry is None:
return None
overrides: dict[int, str] = {}
@@ -517,37 +683,43 @@ class CheckpointTUI(App[_TuiResult]):
return overrides or None
def _detect_entity_type(self, entry: dict[str, Any]) -> Literal["crew", "flow"]:
"""Infer the top-level entity type from checkpoint entities."""
for ent in entry.get("entities", []):
if ent.get("type") == "flow":
return "flow"
return "crew"
def _resolve_location(self, entry: dict[str, Any]) -> str:
"""Get the restore location string for a checkpoint entry."""
if "path" in entry:
return str(entry["path"])
if _is_sqlite(self._location):
return f"{self._location}#{entry['name']}"
return str(entry.get("name", ""))
# ── Events ─────────────────────────────────────────────────────
async def on_tree_node_highlighted(
self, event: Tree.NodeHighlighted[dict[str, Any]]
) -> None:
if event.node.data is not None:
await self._show_detail(event.node.data)
def on_button_pressed(self, event: Button.Pressed) -> None:
def _exit_with_action(self, action: str) -> None:
if self._selected_entry is None:
self.notify("No checkpoint selected", severity="warning")
return
inputs = self._collect_inputs()
overrides = self._collect_task_overrides()
loc = self._resolve_location(self._selected_entry)
etype = self._detect_entity_type(self._selected_entry)
if event.button.id == "btn-resume":
self.exit((loc, "resume", inputs, overrides, etype))
elif event.button.id == "btn-fork":
self.exit((loc, "fork", inputs, overrides, etype))
name = self._selected_entry.get("name", "")[:30]
self.notify(f"{action.title()}: {name}")
self.exit((loc, action, inputs, overrides, etype))
def action_resume(self) -> None:
self._exit_with_action("resume")
def action_fork(self) -> None:
self._exit_with_action("fork")
def action_refresh(self) -> None:
self._refresh_tree()

View File

@@ -419,10 +419,32 @@ class Crew(FlowTrackable, BaseModel):
def _restore_runtime(self) -> None:
"""Re-create runtime objects after restoring from a checkpoint."""
from crewai.events.event_bus import crewai_event_bus
started_task_ids: set[str] = set()
state = crewai_event_bus._runtime_state
if state is not None:
for node in state.event_record.nodes.values():
if node.event.type == "task_started" and node.event.task_id:
started_task_ids.add(node.event.task_id)
resuming_task_agent_roles: set[str] = set()
for task in self.tasks:
if (
task.output is None
and task.agent is not None
and str(task.id) in started_task_ids
):
resuming_task_agent_roles.add(task.agent.role)
for agent in self.agents:
agent.crew = self
executor = agent.agent_executor
if executor and executor.messages:
if (
executor
and executor.messages
and agent.role in resuming_task_agent_roles
):
executor.crew = self
executor.agent = agent
executor._resuming = True