chore: merge main into gl/chore/refactor-cli

Resolve conflicts from origin/main: relocate new CLI additions
(checkpoint_tui, deploy/validate, remote_template, content_crew
templates) into lib/cli, rewrite imports for the standalone
crewai-cli package, port main's trained_agents_file param and
predeploy validation, and bump python-dotenv/pydantic in
crewai-cli to match crewai's constraints. Add the new
mark_ephemeral_trace_batch_as_failed method to the relocated
crewai.plus_api. Update tests for the new payload field, deploy
--skip-validate kwarg, and crewai_cli import paths.
This commit is contained in:
Greyson Lalonde
2026-05-05 01:50:34 +08:00
1014 changed files with 176672 additions and 17289 deletions

View File

@@ -1,6 +1,7 @@
from pathlib import Path
import click
from crewai.utilities.printer import PRINTER
from crewai_cli.utils import copy_template
@@ -9,8 +10,8 @@ def add_crew_to_flow(crew_name: str) -> None:
"""Add a new crew to the current flow."""
# Check if pyproject.toml exists in the current directory
if not Path("pyproject.toml").exists():
click.secho(
"This command must be run from the root of a flow project.", fg="red"
PRINTER.print(
"This command must be run from the root of a flow project.", color="red"
)
raise click.ClickException(
"This command must be run from the root of a flow project."
@@ -21,7 +22,7 @@ def add_crew_to_flow(crew_name: str) -> None:
crews_folder = flow_folder / "src" / flow_folder.name / "crews"
if not crews_folder.exists():
click.secho("Crews folder does not exist in the current flow.", fg="red")
PRINTER.print("Crews folder does not exist in the current flow.", color="red")
raise click.ClickException("Crews folder does not exist in the current flow.")
# Create the crew within the flow's crews directory

View File

@@ -0,0 +1,732 @@
"""CLI commands for inspecting checkpoint files."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import glob
import json
import os
import re
import sqlite3
from typing import Any
import click
_PLACEHOLDER_RE = re.compile(r"\{([A-Za-z_][A-Za-z0-9_\-]*)}")
_SQLITE_MAGIC = b"SQLite format 3\x00"
_SELECT_ALL = """
SELECT id, created_at, json(data)
FROM checkpoints
ORDER BY rowid DESC
"""
_SELECT_ONE = """
SELECT id, created_at, json(data)
FROM checkpoints
WHERE id = ?
"""
_SELECT_LATEST = """
SELECT id, created_at, json(data)
FROM checkpoints
ORDER BY rowid DESC
LIMIT 1
"""
_DELETE_OLDER_THAN = """
DELETE FROM checkpoints
WHERE created_at < ?
"""
_DELETE_KEEP_N = """
DELETE FROM checkpoints WHERE rowid NOT IN (
SELECT rowid FROM checkpoints ORDER BY rowid DESC LIMIT ?
)
"""
_COUNT_CHECKPOINTS = "SELECT COUNT(*) FROM checkpoints"
_SELECT_LIKE = """
SELECT id, created_at, json(data)
FROM checkpoints
WHERE id LIKE ?
ORDER BY rowid DESC
"""
_DEFAULT_DIR = "./.checkpoints"
_DEFAULT_DB = "./.checkpoints.db"
def _detect_location(location: str) -> str:
"""Resolve the default checkpoint location.
When the caller passes the default directory path, check whether a
SQLite database exists at the conventional ``.db`` path and prefer it.
"""
if (
location == _DEFAULT_DIR
and not os.path.exists(_DEFAULT_DIR)
and os.path.exists(_DEFAULT_DB)
):
return _DEFAULT_DB
return location
def _is_sqlite(path: str) -> bool:
"""Check if a file is a SQLite database by reading its magic bytes."""
if not os.path.isfile(path):
return False
try:
with open(path, "rb") as f:
return f.read(16) == _SQLITE_MAGIC
except OSError:
return False
def _parse_checkpoint_json(raw: str, source: str) -> dict[str, Any]:
"""Parse checkpoint JSON into metadata dict."""
data = json.loads(raw)
entities = data.get("entities", [])
nodes = data.get("event_record", {}).get("nodes", {})
event_count = len(nodes)
trigger_event = data.get("trigger")
parsed_entities: list[dict[str, Any]] = []
for entity in entities:
tasks = entity.get("tasks", [])
completed = sum(1 for t in tasks if t.get("output") is not None)
info: dict[str, Any] = {
"type": entity.get("entity_type", "unknown"),
"name": entity.get("name"),
"id": entity.get("id"),
}
raw_agents = entity.get("agents", [])
agents_by_id: dict[str, dict[str, Any]] = {}
parsed_agents: list[dict[str, Any]] = []
for ag in raw_agents:
agent_info: dict[str, Any] = {
"id": ag.get("id", ""),
"role": ag.get("role", ""),
"goal": ag.get("goal", ""),
}
parsed_agents.append(agent_info)
if ag.get("id"):
agents_by_id[str(ag["id"])] = agent_info
if parsed_agents:
info["agents"] = parsed_agents
if tasks:
info["tasks_completed"] = completed
info["tasks_total"] = len(tasks)
parsed_tasks: list[dict[str, Any]] = []
for t in tasks:
task_info: dict[str, Any] = {
"description": t.get("description", ""),
"completed": t.get("output") is not None,
"output": (t.get("output") or {}).get("raw", ""),
}
task_agent = t.get("agent")
if isinstance(task_agent, dict):
task_info["agent_role"] = task_agent.get("role", "")
task_info["agent_id"] = task_agent.get("id", "")
elif isinstance(task_agent, str) and task_agent in agents_by_id:
task_info["agent_role"] = agents_by_id[task_agent].get("role", "")
task_info["agent_id"] = task_agent
parsed_tasks.append(task_info)
info["tasks"] = parsed_tasks
if entity.get("entity_type") == "flow":
completed_methods = entity.get("checkpoint_completed_methods")
if completed_methods:
info["completed_methods"] = sorted(completed_methods)
state = entity.get("checkpoint_state")
if isinstance(state, dict):
info["flow_state"] = state
parsed_entities.append(info)
inputs: dict[str, Any] = {}
for entity in entities:
cp_inputs = entity.get("checkpoint_inputs")
if isinstance(cp_inputs, dict) and cp_inputs:
inputs = dict(cp_inputs)
break
for entity in entities:
for task in entity.get("tasks", []):
for field in (
"checkpoint_original_description",
"checkpoint_original_expected_output",
):
text = task.get(field) or ""
for match in _PLACEHOLDER_RE.findall(text):
if match not in inputs:
inputs[match] = ""
for agent in entity.get("agents", []):
for field in ("role", "goal", "backstory"):
text = agent.get(field) or ""
for match in _PLACEHOLDER_RE.findall(text):
if match not in inputs:
inputs[match] = ""
branch = data.get("branch", "main")
parent_id = data.get("parent_id")
return {
"source": source,
"event_count": event_count,
"trigger": trigger_event,
"entities": parsed_entities,
"branch": branch,
"parent_id": parent_id,
"inputs": inputs,
}
def _format_size(size: int) -> str:
if size < 1024:
return f"{size}B"
if size < 1024 * 1024:
return f"{size / 1024:.1f}KB"
return f"{size / 1024 / 1024:.1f}MB"
def _ts_from_name(name: str) -> str | None:
"""Extract timestamp from checkpoint ID or filename."""
stem = os.path.basename(name).split("_")[0].removesuffix(".json")
try:
dt = datetime.strptime(stem, "%Y%m%dT%H%M%S")
except ValueError:
return None
return dt.strftime("%Y-%m-%d %H:%M:%S")
def _entity_summary(entities: list[dict[str, Any]]) -> str:
parts = []
for ent in entities:
etype = ent.get("type", "unknown")
ename = ent.get("name", "")
completed = ent.get("tasks_completed")
total = ent.get("tasks_total")
if completed is not None and total is not None:
parts.append(f"{etype}:{ename} [{completed}/{total} tasks]")
else:
parts.append(f"{etype}:{ename}")
return ", ".join(parts) if parts else "empty"
# --- JSON directory ---
def _list_json(location: str) -> list[dict[str, Any]]:
pattern = os.path.join(location, "**", "*.json")
results = []
for path in sorted(
glob.glob(pattern, recursive=True), key=os.path.getmtime, reverse=True
):
name = os.path.basename(path)
try:
with open(path) as f:
raw = f.read()
meta = _parse_checkpoint_json(raw, source=name)
meta["name"] = name
meta["ts"] = _ts_from_name(name)
meta["size"] = os.path.getsize(path)
meta["path"] = path
except Exception:
meta = {"name": name, "ts": None, "size": 0, "entities": [], "source": name}
results.append(meta)
return results
def _info_json_latest(location: str) -> dict[str, Any] | None:
pattern = os.path.join(location, "**", "*.json")
files = sorted(
glob.glob(pattern, recursive=True), key=os.path.getmtime, reverse=True
)
if not files:
return None
path = files[0]
with open(path) as f:
raw = f.read()
meta = _parse_checkpoint_json(raw, source=os.path.basename(path))
meta["name"] = os.path.basename(path)
meta["ts"] = _ts_from_name(path)
meta["size"] = os.path.getsize(path)
meta["path"] = path
return meta
def _info_json_file(path: str) -> dict[str, Any]:
with open(path) as f:
raw = f.read()
meta = _parse_checkpoint_json(raw, source=os.path.basename(path))
meta["name"] = os.path.basename(path)
meta["ts"] = _ts_from_name(path)
meta["size"] = os.path.getsize(path)
meta["path"] = path
return meta
# --- SQLite ---
def _list_sqlite(db_path: str) -> list[dict[str, Any]]:
results = []
with sqlite3.connect(db_path) as conn:
for row in conn.execute(_SELECT_ALL):
checkpoint_id, created_at, raw = row
try:
meta = _parse_checkpoint_json(raw, source=checkpoint_id)
meta["name"] = checkpoint_id
meta["ts"] = _ts_from_name(checkpoint_id) or created_at
except Exception:
meta = {
"name": checkpoint_id,
"ts": created_at,
"entities": [],
"source": checkpoint_id,
}
meta["db"] = db_path
results.append(meta)
return results
def _info_sqlite_latest(db_path: str) -> dict[str, Any] | None:
with sqlite3.connect(db_path) as conn:
row = conn.execute(_SELECT_LATEST).fetchone()
if not row:
return None
checkpoint_id, created_at, raw = row
meta = _parse_checkpoint_json(raw, source=checkpoint_id)
meta["name"] = checkpoint_id
meta["ts"] = _ts_from_name(checkpoint_id) or created_at
meta["db"] = db_path
return meta
def _info_sqlite_id(db_path: str, checkpoint_id: str) -> dict[str, Any] | None:
with sqlite3.connect(db_path) as conn:
row = conn.execute(_SELECT_ONE, (checkpoint_id,)).fetchone()
if not row:
row = conn.execute(_SELECT_LIKE, (f"%{checkpoint_id}%",)).fetchone()
if not row:
return None
cid, created_at, raw = row
meta = _parse_checkpoint_json(raw, source=cid)
meta["name"] = cid
meta["ts"] = _ts_from_name(cid) or created_at
meta["db"] = db_path
return meta
# --- Public API ---
def list_checkpoints(location: str) -> None:
"""List all checkpoints at a location."""
if _is_sqlite(location):
entries = _list_sqlite(location)
label = f"SQLite: {location}"
elif os.path.isdir(location):
entries = _list_json(location)
label = location
else:
click.echo(f"Not a directory or SQLite database: {location}")
return
if not entries:
click.echo(f"No checkpoints found in {label}")
return
click.echo(f"Found {len(entries)} checkpoint(s) in {label}\n")
for entry in entries:
ts = entry.get("ts") or "unknown"
name = entry.get("name", "")
size = _format_size(entry["size"]) if "size" in entry else ""
trigger = entry.get("trigger") or ""
summary = _entity_summary(entry.get("entities", []))
parts = [name, ts]
if size:
parts.append(size)
if trigger:
parts.append(trigger)
parts.append(summary)
click.echo(f" {' '.join(parts)}")
def info_checkpoint(path: str) -> None:
"""Show details of a single checkpoint."""
meta: dict[str, Any] | None = None
# db_path#checkpoint_id format
if "#" in path:
db_path, checkpoint_id = path.rsplit("#", 1)
if _is_sqlite(db_path):
meta = _info_sqlite_id(db_path, checkpoint_id)
if not meta:
click.echo(f"Checkpoint not found: {checkpoint_id}")
return
# SQLite file — show latest
if meta is None and _is_sqlite(path):
meta = _info_sqlite_latest(path)
if not meta:
click.echo(f"No checkpoints in database: {path}")
return
click.echo(f"Latest checkpoint: {meta['name']}\n")
# Directory — show latest JSON
if meta is None and os.path.isdir(path):
meta = _info_json_latest(path)
if not meta:
click.echo(f"No checkpoints found in {path}")
return
click.echo(f"Latest checkpoint: {meta['name']}\n")
# Specific JSON file
if meta is None and os.path.isfile(path):
try:
meta = _info_json_file(path)
except Exception as exc:
click.echo(f"Failed to read checkpoint: {exc}")
return
if meta is None:
click.echo(f"Not found: {path}")
return
_print_info(meta)
def _print_info(meta: dict[str, Any]) -> None:
ts = meta.get("ts") or "unknown"
source = meta.get("path") or meta.get("db") or meta.get("source", "")
click.echo(f"Source: {source}")
click.echo(f"Name: {meta.get('name', '')}")
click.echo(f"Time: {ts}")
if "size" in meta:
click.echo(f"Size: {_format_size(meta['size'])}")
click.echo(f"Events: {meta.get('event_count', 0)}")
trigger = meta.get("trigger")
if trigger:
click.echo(f"Trigger: {trigger}")
click.echo(f"Branch: {meta.get('branch', 'main')}")
parent_id = meta.get("parent_id")
if parent_id:
click.echo(f"Parent: {parent_id}")
for ent in meta.get("entities", []):
eid = str(ent.get("id", ""))[:8]
click.echo(f"\n {ent['type']}: {ent.get('name', 'unnamed')} ({eid}...)")
tasks = ent.get("tasks")
if isinstance(tasks, list):
click.echo(
f" Tasks: {ent['tasks_completed']}/{ent['tasks_total']} completed"
)
for i, task in enumerate(tasks):
status = "done" if task.get("completed") else "pending"
desc = str(task.get("description", ""))
if len(desc) > 70:
desc = desc[:67] + "..."
click.echo(f" {i + 1}. [{status}] {desc}")
def _resolve_checkpoint(
location: str, checkpoint_id: str | None
) -> dict[str, Any] | None:
if _is_sqlite(location):
if checkpoint_id:
return _info_sqlite_id(location, checkpoint_id)
return _info_sqlite_latest(location)
if os.path.isdir(location):
if checkpoint_id:
from crewai.state.provider.json_provider import JsonProvider
_json_provider: JsonProvider = JsonProvider()
pattern: str = os.path.join(location, "**", "*.json")
all_files: list[str] = glob.glob(pattern, recursive=True)
matches: list[str] = [
f for f in all_files if checkpoint_id in _json_provider.extract_id(f)
]
matches.sort(key=os.path.getmtime, reverse=True)
if matches:
return _info_json_file(matches[0])
return None
return _info_json_latest(location)
if os.path.isfile(location):
return _info_json_file(location)
return None
def _entity_type_from_meta(meta: dict[str, Any]) -> str:
for ent in meta.get("entities", []):
if ent.get("type") == "flow":
return "flow"
if ent.get("type") == "agent":
return "agent"
return "crew"
def resume_checkpoint(location: str, checkpoint_id: str | None) -> None:
import asyncio
meta: dict[str, Any] | None = _resolve_checkpoint(location, checkpoint_id)
if meta is None:
if checkpoint_id:
click.echo(f"Checkpoint not found: {checkpoint_id}")
else:
click.echo(f"No checkpoints found in {location}")
return
restore_path: str = meta.get("path") or meta.get("source", "")
if meta.get("db"):
restore_path = f"{meta['db']}#{meta['name']}"
click.echo(f"Resuming from: {meta.get('name', restore_path)}")
_print_info(meta)
click.echo()
from crewai.state.checkpoint_config import CheckpointConfig
config: CheckpointConfig = CheckpointConfig(restore_from=restore_path)
entity_type: str = _entity_type_from_meta(meta)
inputs: dict[str, Any] | None = meta.get("inputs") or None
if entity_type == "flow":
from crewai.flow.flow import Flow
flow = Flow.from_checkpoint(config)
result = asyncio.run(flow.kickoff_async(inputs=inputs))
elif entity_type == "agent":
from crewai.agent import Agent
agent = Agent.from_checkpoint(config)
result = asyncio.run(agent.akickoff(messages="Resume execution."))
else:
from crewai.crew import Crew
crew = Crew.from_checkpoint(config)
result = asyncio.run(crew.akickoff(inputs=inputs))
click.echo(f"\nResult: {getattr(result, 'raw', result)}")
def _task_list_from_meta(meta: dict[str, Any]) -> list[dict[str, Any]]:
tasks: list[dict[str, Any]] = []
for ent in meta.get("entities", []):
tasks.extend(
{
"entity": ent.get("name", "unnamed"),
"description": t.get("description", ""),
"completed": t.get("completed", False),
"output": t.get("output", ""),
}
for t in ent.get("tasks", [])
)
return tasks
def diff_checkpoints(location: str, id1: str, id2: str) -> None:
meta1: dict[str, Any] | None = _resolve_checkpoint(location, id1)
meta2: dict[str, Any] | None = _resolve_checkpoint(location, id2)
if meta1 is None:
click.echo(f"Checkpoint not found: {id1}")
return
if meta2 is None:
click.echo(f"Checkpoint not found: {id2}")
return
name1: str = meta1.get("name", id1)
name2: str = meta2.get("name", id2)
click.echo(f"--- {name1}")
click.echo(f"+++ {name2}")
click.echo()
fields: list[tuple[str, str]] = [
("Time", "ts"),
("Branch", "branch"),
("Trigger", "trigger"),
("Events", "event_count"),
]
for label, key in fields:
v1: str = str(meta1.get(key, ""))
v2: str = str(meta2.get(key, ""))
if v1 != v2:
click.echo(f" {label}:")
click.echo(f" - {v1}")
click.echo(f" + {v2}")
inputs1: dict[str, Any] = meta1.get("inputs", {})
inputs2: dict[str, Any] = meta2.get("inputs", {})
all_keys: list[str] = sorted(set(list(inputs1.keys()) + list(inputs2.keys())))
changed_inputs: list[tuple[str, Any, Any]] = [
(k, inputs1.get(k, ""), inputs2.get(k, ""))
for k in all_keys
if inputs1.get(k) != inputs2.get(k)
]
if changed_inputs:
click.echo("\n Inputs:")
for key, v1, v2 in changed_inputs:
click.echo(f" {key}:")
click.echo(f" - {v1}")
click.echo(f" + {v2}")
tasks1: list[dict[str, Any]] = _task_list_from_meta(meta1)
tasks2: list[dict[str, Any]] = _task_list_from_meta(meta2)
max_tasks: int = max(len(tasks1), len(tasks2))
if max_tasks == 0:
return
click.echo("\n Tasks:")
for i in range(max_tasks):
t1: dict[str, Any] | None = tasks1[i] if i < len(tasks1) else None
t2: dict[str, Any] | None = tasks2[i] if i < len(tasks2) else None
if t1 is None:
desc: str = t2["description"][:60] if t2 else ""
click.echo(f" + {i + 1}. [new] {desc}")
continue
if t2 is None:
desc = t1["description"][:60]
click.echo(f" - {i + 1}. [removed] {desc}")
continue
desc = str(t1["description"][:60])
s1: str = "done" if t1["completed"] else "pending"
s2: str = "done" if t2["completed"] else "pending"
if s1 != s2:
click.echo(f" {i + 1}. {desc}")
click.echo(f" status: {s1} -> {s2}")
out1: str = (t1.get("output") or "").strip()
out2: str = (t2.get("output") or "").strip()
if out1 != out2:
if s1 == s2:
click.echo(f" {i + 1}. {desc}")
preview1: str = (
out1[:80] + ("..." if len(out1) > 80 else "") if out1 else "(empty)"
)
preview2: str = (
out2[:80] + ("..." if len(out2) > 80 else "") if out2 else "(empty)"
)
click.echo(" output:")
click.echo(f" - {preview1}")
click.echo(f" + {preview2}")
def _parse_duration(value: str) -> timedelta:
match: re.Match[str] | None = re.match(r"^(\d+)([dhm])$", value.strip())
if not match:
raise click.BadParameter(
f"Invalid duration: {value!r}. Use format like '7d', '24h', or '30m'."
)
amount: int = int(match.group(1))
unit: str = match.group(2)
if unit == "d":
return timedelta(days=amount)
if unit == "h":
return timedelta(hours=amount)
return timedelta(minutes=amount)
def _prune_json(location: str, keep: int | None, older_than: timedelta | None) -> int:
pattern: str = os.path.join(location, "**", "*.json")
files: list[str] = sorted(
glob.glob(pattern, recursive=True), key=os.path.getmtime, reverse=True
)
if not files:
return 0
to_delete: set[str] = set()
if keep is not None and len(files) > keep:
to_delete.update(files[keep:])
if older_than is not None:
cutoff: datetime = datetime.now(timezone.utc) - older_than
for path in files:
mtime: datetime = datetime.fromtimestamp(
os.path.getmtime(path), tz=timezone.utc
)
if mtime < cutoff:
to_delete.add(path)
deleted: int = 0
for path in to_delete:
try:
os.remove(path)
deleted += 1
except OSError: # noqa: PERF203
pass
for dirpath, dirnames, filenames in os.walk(location, topdown=False):
if dirpath != location and not filenames and not dirnames:
try:
os.rmdir(dirpath)
except OSError:
pass
return deleted
def _prune_sqlite(db_path: str, keep: int | None, older_than: timedelta | None) -> int:
deleted: int = 0
with sqlite3.connect(db_path) as conn:
if older_than is not None:
cutoff: str = (datetime.now(timezone.utc) - older_than).strftime(
"%Y%m%dT%H%M%S"
)
cursor: sqlite3.Cursor = conn.execute(_DELETE_OLDER_THAN, (cutoff,))
deleted += cursor.rowcount
if keep is not None:
cursor = conn.execute(_DELETE_KEEP_N, (keep,))
deleted += cursor.rowcount
conn.commit()
return deleted
def prune_checkpoints(
location: str, keep: int | None, older_than: str | None, dry_run: bool = False
) -> None:
if keep is None and older_than is None:
click.echo("Specify --keep N and/or --older-than DURATION (e.g. 7d, 24h)")
return
duration: timedelta | None = _parse_duration(older_than) if older_than else None
deleted: int
if _is_sqlite(location):
if dry_run:
with sqlite3.connect(location) as conn:
total: int = conn.execute(_COUNT_CHECKPOINTS).fetchone()[0]
click.echo(f"Would prune from {total} checkpoint(s) in {location}")
return
deleted = _prune_sqlite(location, keep, duration)
elif os.path.isdir(location):
if dry_run:
files: list[str] = glob.glob(
os.path.join(location, "**", "*.json"), recursive=True
)
click.echo(f"Would prune from {len(files)} checkpoint(s) in {location}")
return
deleted = _prune_json(location, keep, duration)
else:
click.echo(f"Not a directory or SQLite database: {location}")
return
click.echo(f"Pruned {deleted} checkpoint(s) from {location}")

View File

@@ -0,0 +1,877 @@
"""Textual TUI for browsing checkpoint files."""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime
from typing import Any, ClassVar, Literal
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.widgets import (
Collapsible,
Footer,
Header,
Input,
Static,
TabPane,
TabbedContent,
TextArea,
Tree,
)
from crewai_cli.checkpoint_cli import (
_format_size,
_is_sqlite,
_list_json,
_list_sqlite,
)
_PRIMARY = "#eb6658"
_SECONDARY = "#1F7982"
_TERTIARY = "#ffffff"
_DIM = "#888888"
_BG_DARK = "#0d1117"
_BG_PANEL = "#161b22"
_ACCENT = "#c9a227"
_SUCCESS = "#3fb950"
_PENDING = "#e3b341"
_ENTITY_ICONS: dict[str, str] = {
"flow": "",
"crew": "",
"agent": "",
"unknown": "",
}
_ENTITY_COLORS: dict[str, str] = {
"flow": _ACCENT,
"crew": _SECONDARY,
"agent": _PRIMARY,
"unknown": _DIM,
}
def _load_entries(location: str) -> list[dict[str, Any]]:
if _is_sqlite(location):
return _list_sqlite(location)
return _list_json(location)
def _human_ts(ts: str) -> str:
"""Turn '2026-04-17 17:05:00' into a short relative label."""
try:
dt = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S")
except ValueError:
return ts
now = datetime.now()
delta = now.date() - dt.date()
hour = dt.hour % 12 or 12
ampm = "am" if dt.hour < 12 else "pm"
time_str = f"{hour}:{dt.minute:02d}{ampm}"
if delta.days == 0:
return time_str
if delta.days == 1:
return f"yest {time_str}"
if delta.days < 7:
return f"{dt.strftime('%a').lower()} {time_str}"
return f"{dt.strftime('%b')} {dt.day}"
def _short_id(name: str) -> str:
if len(name) > 30:
return name[:27] + "..."
return name
def _entry_id(entry: dict[str, Any]) -> str:
"""Normalize an entry's name into its checkpoint ID.
JSON filenames are ``{ts}_{uuid}_p-{parent}.json``; SQLite IDs
are already ``{ts}_{uuid}``. This strips the JSON suffix so
fork-parent lookups work in both providers.
"""
name = str(entry.get("name", ""))
if name.endswith(".json"):
name = name[: -len(".json")]
idx = name.find("_p-")
if idx != -1:
name = name[:idx]
return name
def _build_progress_bar(completed: int, total: int, width: int = 20) -> str:
if total == 0:
return f"[{_DIM}]{'' * width}[/] 0/0"
pct = int(completed / total * 100)
filled = int(width * completed / total)
color = _SUCCESS if completed == total else _PRIMARY
bar = f"[{color}]{'' * filled}[/][{_DIM}]{'' * (width - filled)}[/]"
return f"{bar} {completed}/{total} ({pct}%)"
def _entity_icon(etype: str) -> str:
icon = _ENTITY_ICONS.get(etype, _ENTITY_ICONS["unknown"])
color = _ENTITY_COLORS.get(etype, _DIM)
return f"[{color}]{icon}[/]"
_TuiResult = (
tuple[
str,
str,
dict[str, Any] | None,
dict[int, str] | None,
Literal["crew", "flow", "agent"],
]
| None
)
class CheckpointTUI(App[_TuiResult]):
"""TUI to browse and inspect checkpoints.
Returns ``(location, action, inputs, task_overrides, entity_type)``
where action is ``"resume"`` or ``"fork"``, inputs is a parsed dict
or ``None``, and entity_type is ``"crew"`` or ``"flow"``;
or ``None`` if the user quit without selecting.
"""
TITLE = "CrewAI Checkpoints"
CSS = f"""
Screen {{
background: {_BG_DARK};
}}
Header {{
background: {_PRIMARY};
color: {_TERTIARY};
}}
Footer {{
background: {_SECONDARY};
color: {_TERTIARY};
}}
Footer > .footer-key--key {{
background: {_PRIMARY};
color: {_TERTIARY};
}}
#main-layout {{
height: 1fr;
}}
#tree-panel {{
width: 40%;
background: {_BG_PANEL};
border: round {_SECONDARY};
padding: 0 1;
scrollbar-color: {_PRIMARY};
}}
#tree-panel:focus-within {{
border: round {_PRIMARY};
}}
#detail-container {{
width: 60%;
height: 1fr;
}}
#status {{
height: 1;
padding: 0 2;
color: {_DIM};
}}
#detail-tabs {{
height: 1fr;
}}
TabbedContent > ContentSwitcher {{
background: {_BG_PANEL};
height: 1fr;
}}
TabPane {{
padding: 0;
}}
Tabs {{
background: {_BG_DARK};
}}
Tab {{
background: {_BG_DARK};
color: {_DIM};
padding: 0 2;
}}
Tab.-active {{
background: {_BG_PANEL};
color: {_PRIMARY};
}}
Tab:hover {{
color: {_TERTIARY};
}}
Underline > .underline--bar {{
color: {_SECONDARY};
background: {_BG_DARK};
}}
.tab-scroll {{
background: {_BG_PANEL};
height: 1fr;
padding: 1 2;
scrollbar-color: {_PRIMARY};
}}
.section-header {{
padding: 0 0 0 1;
margin: 1 0 0 0;
}}
.detail-line {{
padding: 0 0 0 1;
}}
.task-label {{
padding: 0 1;
}}
.task-output-editor {{
height: auto;
max-height: 10;
margin: 0 1 1 3;
border: round {_DIM};
}}
.task-output-editor:focus {{
border: round {_PRIMARY};
}}
Collapsible {{
background: {_BG_PANEL};
padding: 0;
margin: 0 0 1 1;
}}
CollapsibleTitle {{
background: {_BG_DARK};
color: {_TERTIARY};
padding: 0 1;
}}
CollapsibleTitle:hover {{
background: {_SECONDARY};
}}
.input-row {{
height: 3;
padding: 0 1;
}}
.input-row Static {{
width: auto;
min-width: 12;
padding: 1 1 0 0;
color: {_TERTIARY};
}}
.input-row Input {{
width: 1fr;
}}
.empty-state {{
color: {_DIM};
padding: 1;
}}
Tree {{
background: {_BG_PANEL};
}}
Tree > .tree--cursor {{
background: {_SECONDARY};
color: {_TERTIARY};
}}
"""
BINDINGS: ClassVar[list[Binding | tuple[str, str] | tuple[str, str, str]]] = [
("q", "quit", "Quit"),
("r", "refresh", "Refresh"),
("e", "resume", "Resume"),
("f", "fork", "Fork"),
]
def __init__(self, location: str = "./.checkpoints") -> None:
super().__init__()
self._location = location
self._entries: list[dict[str, Any]] = []
self._selected_entry: dict[str, Any] | None = None
self._input_keys: list[str] = []
self._task_output_ids: list[tuple[int, str, str]] = []
def compose(self) -> ComposeResult:
yield Header(show_clock=False)
with Horizontal(id="main-layout"):
tree: Tree[dict[str, Any]] = Tree("Checkpoints", id="tree-panel")
tree.show_root = False
tree.guide_depth = 3
yield tree
with Vertical(id="detail-container"):
yield Static("", id="status")
with TabbedContent(id="detail-tabs"):
with TabPane("Overview", id="tab-overview"):
with VerticalScroll(classes="tab-scroll"):
yield Static(
f"[{_DIM}]Select a checkpoint from the tree[/]", # noqa: S608
id="overview-empty",
)
with TabPane("Tasks", id="tab-tasks"):
with VerticalScroll(classes="tab-scroll"):
yield Static(
f"[{_DIM}]Select a checkpoint to view tasks[/]",
id="tasks-empty",
)
with TabPane("Inputs", id="tab-inputs"):
with VerticalScroll(classes="tab-scroll"):
yield Static(
f"[{_DIM}]Select a checkpoint to view inputs[/]",
id="inputs-empty",
)
yield Footer()
async def on_mount(self) -> None:
self._refresh_tree()
self.query_one("#tree-panel", Tree).root.expand()
# ── Tree building ──────────────────────────────────────────────
@staticmethod
def _top_level_entity(entry: dict[str, Any]) -> tuple[str, str]:
etype, ename = "unknown", ""
for ent in entry.get("entities", []):
t = ent.get("type", "unknown")
if t == "flow":
return "flow", ent.get("name") or ""
if t == "crew" and etype != "crew":
etype, ename = "crew", ent.get("name") or ""
return etype, ename
def _refresh_tree(self) -> None:
self._entries = _load_entries(self._location)
self._selected_entry = None
tree = self.query_one("#tree-panel", Tree)
tree.clear()
if not self._entries:
self.sub_title = self._location
self.query_one("#status", Static).update("")
return
grouped: dict[tuple[str, str], dict[str, list[dict[str, Any]]]] = defaultdict(
lambda: defaultdict(list)
)
for entry in self._entries:
key = self._top_level_entity(entry)
branch = entry.get("branch", "main")
grouped[key][branch].append(entry)
def _make_label(e: dict[str, Any]) -> str:
ts = e.get("ts") or ""
trigger = e.get("trigger") or ""
time_part = ts.split(" ")[-1] if " " in ts else ts
total_c, total_t = 0, 0
for ent in e.get("entities", []):
c = ent.get("tasks_completed")
t = ent.get("tasks_total")
if c is not None and t is not None:
total_c += c
total_t += t
parts: list[str] = []
if time_part:
parts.append(f"[{_DIM}]{time_part}[/]")
if trigger:
parts.append(f"[{_PRIMARY}]{trigger}[/]")
if total_t:
display_c = total_c
if trigger == "task_started" and total_c < total_t:
display_c = total_c + 1
color = _SUCCESS if total_c == total_t else _DIM
parts.append(f"[{color}]{display_c}/{total_t}[/]")
return " ".join(parts) if parts else _short_id(e.get("name", ""))
fork_parents: set[str] = set()
for branches in grouped.values():
for branch_name, entries in branches.items():
if branch_name == "main" or not entries:
continue
oldest = min(entries, key=lambda e: str(e.get("name", "")))
first_parent = oldest.get("parent_id")
if first_parent:
fork_parents.add(str(first_parent))
node_by_name: dict[str, Any] = {}
def _add_checkpoint(parent_node: Any, e: dict[str, Any]) -> None:
cp_id = _entry_id(e)
if cp_id in fork_parents:
node = parent_node.add(
_make_label(e), data=e, expand=False, allow_expand=True
)
else:
node = parent_node.add_leaf(_make_label(e), data=e)
node_by_name[cp_id] = node
type_order = {"flow": 0, "crew": 1}
sorted_keys = sorted(
grouped.keys(), key=lambda k: (type_order.get(k[0], 9), k[1])
)
for etype, ename in sorted_keys:
branches = grouped[(etype, ename)]
icon = _entity_icon(etype)
color = _ENTITY_COLORS.get(etype, _DIM)
total = sum(len(v) for v in branches.values())
label_parts = [f"{icon} [bold {color}]{etype.upper()}[/]"]
if ename:
label_parts.append(f"[bold]{ename}[/]")
label_parts.append(f"[{_DIM}]({total})[/]")
all_entries = [e for bl in branches.values() for e in bl]
timestamps = [str(e.get("ts", "")) for e in all_entries if e.get("ts")]
if timestamps:
latest = max(timestamps)
label_parts.append(f"[{_DIM}]{_human_ts(latest)}[/]")
entity_label = " ".join(label_parts)
entity_node = tree.root.add(entity_label, expand=True)
if "main" in branches:
for entry in reversed(branches["main"]):
_add_checkpoint(entity_node, entry)
fork_branches = [
(name, sorted(entries, key=lambda e: str(e.get("name", ""))))
for name, entries in branches.items()
if name != "main"
]
remaining = fork_branches
max_passes = len(remaining) + 1
while remaining and max_passes > 0:
max_passes -= 1
deferred = []
made_progress = False
for branch_name, entries in remaining:
first_parent = entries[0].get("parent_id") if entries else None
if first_parent and str(first_parent) not in node_by_name:
deferred.append((branch_name, entries))
continue
attach_to: Any = entity_node
if first_parent:
attach_to = node_by_name.get(str(first_parent), entity_node)
branch_label = (
f"[bold {_SECONDARY}]{branch_name}[/] "
f"[{_DIM}]({len(entries)})[/]"
)
branch_node = attach_to.add(branch_label, expand=False)
for entry in entries:
_add_checkpoint(branch_node, entry)
made_progress = True
remaining = deferred
if not made_progress:
break
for branch_name, entries in remaining:
branch_label = (
f"[bold {_SECONDARY}]{branch_name}[/] "
f"[{_DIM}]({len(entries)})[/] [{_DIM}](orphaned)[/]"
)
branch_node = entity_node.add(branch_label, expand=False)
for entry in entries:
_add_checkpoint(branch_node, entry)
count = len(self._entries)
storage = "SQLite" if _is_sqlite(self._location) else "JSON"
self.sub_title = self._location
self.query_one("#status", Static).update(f" {count} checkpoint(s) | {storage}")
# ── Detail panel ───────────────────────────────────────────────
async def _clear_scroll(self, tab_id: str) -> VerticalScroll:
tab = self.query_one(f"#{tab_id}", TabPane)
scroll = tab.query_one(VerticalScroll)
for child in list(scroll.children):
await child.remove()
return scroll
async def _show_detail(self, entry: dict[str, Any]) -> None:
self._selected_entry = entry
await self._render_overview(entry)
await self._render_tasks(entry)
await self._render_inputs(entry.get("inputs", {}))
async def _render_overview(self, entry: dict[str, Any]) -> None:
scroll = await self._clear_scroll("tab-overview")
name = entry.get("name", "")
ts = entry.get("ts") or "unknown"
trigger = entry.get("trigger") or ""
branch = entry.get("branch", "main")
parent_id = entry.get("parent_id")
header_lines = [
f"[bold {_PRIMARY}]{name}[/]",
f"[{_DIM}]{'' * 50}[/]",
"",
f" [bold]Time[/] {ts}",
]
if "size" in entry:
header_lines.append(f" [bold]Size[/] {_format_size(entry['size'])}")
header_lines.append(f" [bold]Events[/] {entry.get('event_count', 0)}")
if trigger:
header_lines.append(f" [bold]Trigger[/] [{_PRIMARY}]{trigger}[/]")
header_lines.append(f" [bold]Branch[/] [{_SECONDARY}]{branch}[/]")
if parent_id:
header_lines.append(f" [bold]Parent[/] [{_DIM}]{parent_id}[/]")
await scroll.mount(Static("\n".join(header_lines)))
for ent in entry.get("entities", []):
etype = ent.get("type", "unknown")
ename = ent.get("name", "unnamed")
icon = _entity_icon(etype)
color = _ENTITY_COLORS.get(etype, _DIM)
eid = str(ent.get("id", ""))[:8]
entity_title = (
f"\n{icon} [bold {color}]{etype.upper()}[/] [bold]{ename}[/]"
)
if eid:
entity_title += f" [{_DIM}]{eid}…[/]"
await scroll.mount(Static(entity_title, classes="section-header"))
await scroll.mount(Static(f"[{_DIM}]{'' * 46}[/]", classes="detail-line"))
if etype == "flow":
methods = ent.get("completed_methods", [])
if methods:
method_list = ", ".join(f"[{_SUCCESS}]{m}[/]" for m in methods)
await scroll.mount(
Static(
f" [bold]Methods[/] {method_list}",
classes="detail-line",
)
)
flow_state = ent.get("flow_state")
if isinstance(flow_state, dict) and flow_state:
state_parts: list[str] = []
for k, v in list(flow_state.items())[:5]:
sv = str(v)
if len(sv) > 40:
sv = sv[:37] + "..."
state_parts.append(f"[{_DIM}]{k}[/]={sv}")
await scroll.mount(
Static(
f" [bold]State[/] {', '.join(state_parts)}",
classes="detail-line",
)
)
agents = ent.get("agents", [])
if agents:
agent_lines: list[Static] = []
for ag in agents:
role = ag.get("role", "unnamed")
goal = ag.get("goal", "")
if len(goal) > 60:
goal = goal[:57] + "..."
agent_line = f" {_entity_icon('agent')} [bold]{role}[/]"
if goal:
agent_line += f"\n [{_DIM}]{goal}[/]"
agent_lines.append(Static(agent_line))
collapsible = Collapsible(
*agent_lines,
title=f"Agents ({len(agents)})",
collapsed=len(agents) > 3,
)
await scroll.mount(collapsible)
async def _render_tasks(self, entry: dict[str, Any]) -> None:
scroll = await self._clear_scroll("tab-tasks")
self._task_output_ids = []
flat_task_idx = 0
has_tasks = False
for ent_idx, ent in enumerate(entry.get("entities", [])):
etype = ent.get("type", "unknown")
ename = ent.get("name", "unnamed")
icon = _entity_icon(etype)
color = _ENTITY_COLORS.get(etype, _DIM)
tasks = ent.get("tasks", [])
if not tasks:
continue
has_tasks = True
completed = ent.get("tasks_completed", 0)
total = ent.get("tasks_total", 0)
await scroll.mount(
Static(
f"{icon} [bold {color}]{ename}[/] "
f"{_build_progress_bar(completed, total, width=16)}",
classes="section-header",
)
)
for i, task in enumerate(tasks):
desc = str(task.get("description", ""))
if len(desc) > 50:
desc = desc[:47] + "..."
agent_role = task.get("agent_role", "")
if task.get("completed"):
status_icon = f"[{_SUCCESS}]✓[/]"
task_line = f" {status_icon} {i + 1}. {desc}"
if agent_role:
task_line += (
f" [{_DIM}]→ {_entity_icon('agent')} {agent_role}[/]"
)
await scroll.mount(Static(task_line, classes="task-label"))
output_text = task.get("output", "")
editor_id = f"task-output-{ent_idx}-{i}"
await scroll.mount(
TextArea(
str(output_text),
classes="task-output-editor",
id=editor_id,
)
)
self._task_output_ids.append(
(flat_task_idx, editor_id, str(output_text))
)
else:
status_icon = f"[{_PENDING}]○[/]"
task_line = f" {status_icon} {i + 1}. {desc}"
if agent_role:
task_line += (
f" [{_DIM}]→ {_entity_icon('agent')} {agent_role}[/]"
)
await scroll.mount(Static(task_line, classes="task-label"))
flat_task_idx += 1
if not has_tasks:
await scroll.mount(Static(f"[{_DIM}]No tasks[/]", classes="empty-state"))
async def _render_inputs(self, inputs: dict[str, Any]) -> None:
scroll = await self._clear_scroll("tab-inputs")
self._input_keys = []
if not inputs:
await scroll.mount(Static(f"[{_DIM}]No inputs[/]", classes="empty-state"))
return
for key, value in inputs.items():
self._input_keys.append(key)
row = Horizontal(classes="input-row")
row.compose_add_child(Static(f"[bold]{key}[/]"))
row.compose_add_child(
Input(value=str(value), placeholder=key, id=f"input-{key}")
)
await scroll.mount(row)
# ── Data collection ────────────────────────────────────────────
def _collect_inputs(self) -> dict[str, Any] | None:
if not self._input_keys:
return None
result: dict[str, Any] = {}
for key in self._input_keys:
widget = self.query_one(f"#input-{key}", Input)
result[key] = widget.value
return result
def _collect_task_overrides(self) -> dict[int, str] | None:
if not self._task_output_ids or self._selected_entry is None:
return None
overrides: dict[int, str] = {}
for task_idx, editor_id, original in self._task_output_ids:
editor = self.query_one(f"#{editor_id}", TextArea)
if editor.text != original:
overrides[task_idx] = editor.text
return overrides or None
def _detect_entity_type(
self, entry: dict[str, Any]
) -> Literal["crew", "flow", "agent"]:
for ent in entry.get("entities", []):
if ent.get("type") == "flow":
return "flow"
if ent.get("type") == "agent":
return "agent"
return "crew"
def _resolve_location(self, entry: dict[str, Any]) -> str:
if "path" in entry:
return str(entry["path"])
if _is_sqlite(self._location):
return f"{self._location}#{entry['name']}"
return str(entry.get("name", ""))
# ── Events ─────────────────────────────────────────────────────
async def on_tree_node_highlighted(
self, event: Tree.NodeHighlighted[dict[str, Any]]
) -> None:
if event.node.data is not None:
await self._show_detail(event.node.data)
def _exit_with_action(self, action: str) -> None:
if self._selected_entry is None:
self.notify("No checkpoint selected", severity="warning")
return
inputs = self._collect_inputs()
overrides = self._collect_task_overrides()
loc = self._resolve_location(self._selected_entry)
etype = self._detect_entity_type(self._selected_entry)
name = self._selected_entry.get("name", "")[:30]
self.notify(f"{action.title()}: {name}")
self.exit((loc, action, inputs, overrides, etype))
def action_resume(self) -> None:
self._exit_with_action("resume")
def action_fork(self) -> None:
self._exit_with_action("fork")
def action_refresh(self) -> None:
self._refresh_tree()
def _apply_task_overrides(crew: Any, task_overrides: dict[int, str]) -> None:
"""Apply task output overrides to a restored Crew and print modifications."""
import click
click.echo("Modifications:")
overridden_agents: set[int] = set()
for task_idx, new_output in task_overrides.items():
if task_idx < len(crew.tasks) and crew.tasks[task_idx].output is not None:
desc = crew.tasks[task_idx].description or f"Task {task_idx + 1}"
if len(desc) > 60:
desc = desc[:57] + "..."
crew.tasks[task_idx].output.raw = new_output
preview = new_output.replace("\n", " ")
if len(preview) > 80:
preview = preview[:77] + "..."
click.echo(f" Task {task_idx + 1}: {desc}")
click.echo(f" -> {preview}")
agent = crew.tasks[task_idx].agent
if agent and agent.agent_executor:
nth = sum(1 for t in crew.tasks[:task_idx] if t.agent is agent)
messages = agent.agent_executor.messages
system_positions = [
i for i, m in enumerate(messages) if m.get("role") == "system"
]
if nth < len(system_positions):
seg_start = system_positions[nth]
seg_end = (
system_positions[nth + 1]
if nth + 1 < len(system_positions)
else len(messages)
)
for j in range(seg_end - 1, seg_start, -1):
if messages[j].get("role") == "assistant":
messages[j]["content"] = new_output
break
overridden_agents.add(id(agent))
earliest = min(task_overrides)
for offset, subsequent in enumerate(crew.tasks[earliest + 1 :], start=earliest + 1):
if subsequent.output and offset not in task_overrides:
subsequent.output = None
if subsequent.agent and subsequent.agent.agent_executor:
subsequent.agent.agent_executor._resuming = False
if id(subsequent.agent) not in overridden_agents:
subsequent.agent.agent_executor.messages = []
click.echo()
async def _run_checkpoint_tui_async(location: str) -> None:
"""Async implementation of the checkpoint TUI flow."""
import click
app = CheckpointTUI(location=location)
selection = await app.run_async()
if selection is None:
return
selected, action, inputs, task_overrides, entity_type = selection
from crewai.state.checkpoint_config import CheckpointConfig
config = CheckpointConfig(restore_from=selected)
if entity_type == "flow":
from crewai.events.event_bus import crewai_event_bus
from crewai.flow.flow import Flow
if action == "fork":
click.echo(f"\nForking flow from: {selected}\n")
flow = Flow.fork(config)
else:
click.echo(f"\nResuming flow from: {selected}\n")
flow = Flow.from_checkpoint(config)
if task_overrides:
from crewai.crew import Crew as CrewCls
state = crewai_event_bus._runtime_state
if state is not None:
flat_offset = 0
for entity in state.root:
if not isinstance(entity, CrewCls) or not entity.tasks:
continue
n = len(entity.tasks)
local = {
idx - flat_offset: out
for idx, out in task_overrides.items()
if flat_offset <= idx < flat_offset + n
}
if local:
_apply_task_overrides(entity, local)
flat_offset += n
if inputs:
click.echo("Inputs:")
for k, v in inputs.items():
click.echo(f" {k}: {v}")
click.echo()
result = await flow.kickoff_async(inputs=inputs)
click.echo(f"\nResult: {getattr(result, 'raw', result)}")
return
if entity_type == "agent":
from crewai.agent import Agent
if action == "fork":
click.echo(f"\nForking agent from: {selected}\n")
agent = Agent.fork(config)
else:
click.echo(f"\nResuming agent from: {selected}\n")
agent = Agent.from_checkpoint(config)
click.echo()
result = await agent.akickoff(messages="Resume execution.")
click.echo(f"\nResult: {getattr(result, 'raw', result)}")
return
from crewai.crew import Crew
if action == "fork":
click.echo(f"\nForking from: {selected}\n")
crew = Crew.fork(config)
else:
click.echo(f"\nResuming from: {selected}\n")
crew = Crew.from_checkpoint(config)
if task_overrides:
_apply_task_overrides(crew, task_overrides)
if inputs:
click.echo("Inputs:")
for k, v in inputs.items():
click.echo(f" {k}: {v}")
click.echo()
result = await crew.akickoff(inputs=inputs)
click.echo(f"\nResult: {getattr(result, 'raw', result)}")
def run_checkpoint_tui(location: str = "./.checkpoints") -> None:
"""Launch the checkpoint browser TUI."""
import asyncio
asyncio.run(_run_checkpoint_tui_async(location))

View File

@@ -20,10 +20,12 @@ from crewai_cli.install_crew import install_crew
from crewai_cli.kickoff_flow import kickoff_flow
from crewai_cli.organization.main import OrganizationCommand
from crewai_cli.plot_flow import plot_flow
from crewai_cli.remote_template.main import TemplateCommand
from crewai_cli.replay_from_task import replay_task_command
from crewai_cli.reset_memories_command import reset_memories_command
from crewai_cli.run_crew import run_crew
from crewai_cli.settings.main import SettingsCommand
from crewai_cli.shared.token_manager import TokenManager
from crewai_cli.task_outputs import load_task_outputs
from crewai_cli.tools.main import ToolCommand
from crewai_cli.train_crew import train_crew
@@ -34,7 +36,7 @@ from crewai_cli.user_data import (
_save_user_data,
is_tracing_enabled,
)
from crewai_cli.utils import build_env_with_tool_repository_credentials, read_toml
from crewai_cli.utils import build_env_with_all_tool_credentials, read_toml
def _get_cli_version() -> str:
@@ -52,7 +54,7 @@ def _get_cli_version() -> str:
@click.group()
@click.version_option(_get_cli_version())
def crewai():
def crewai() -> None:
"""Top-level command group for crewai."""
@@ -61,26 +63,20 @@ def crewai():
context_settings={"ignore_unknown_options": True},
)
@click.argument("uv_args", nargs=-1, type=click.UNPROCESSED)
def uv(uv_args):
def uv(uv_args: tuple[str, ...]) -> None:
"""A wrapper around uv commands that adds custom tool authentication through env vars."""
env = os.environ.copy()
try:
pyproject_data = read_toml()
sources = pyproject_data.get("tool", {}).get("uv", {}).get("sources", {})
for source_config in sources.values():
if isinstance(source_config, dict):
index = source_config.get("index")
if index:
index_env = build_env_with_tool_repository_credentials(index)
env.update(index_env)
except (FileNotFoundError, KeyError) as e:
# Verify pyproject.toml exists first
read_toml()
except FileNotFoundError as e:
raise SystemExit(
"Error. A valid pyproject.toml file is required. Check that a valid pyproject.toml file exists in the current directory."
) from e
except Exception as e:
raise SystemExit(f"Error: {e}") from e
env = build_env_with_all_tool_credentials()
try:
subprocess.run( # noqa: S603
["uv", *uv_args], # noqa: S607
@@ -99,7 +95,9 @@ def uv(uv_args):
@click.argument("name")
@click.option("--provider", type=str, help="The provider to use for the crew")
@click.option("--skip_provider", is_flag=True, help="Skip provider validation")
def create(type, name, provider, skip_provider=False):
def create(
type: str, name: str, provider: str | None, skip_provider: bool = False
) -> None:
"""Create a new crew, or flow."""
if type == "crew":
create_crew(name, provider, skip_provider)
@@ -113,7 +111,7 @@ def create(type, name, provider, skip_provider=False):
@click.option(
"--tools", is_flag=True, help="Show the installed version of crewai tools"
)
def version(tools):
def version(tools: bool) -> None:
"""Show the installed version of crewai."""
try:
crewai_version = get_version("crewai")
@@ -144,7 +142,7 @@ def version(tools):
default="trained_agents_data.pkl",
help="Path to a custom file for training",
)
def train(n_iterations: int, filename: str):
def train(n_iterations: int, filename: str) -> None:
"""Train the crew."""
click.echo(f"Training the Crew for {n_iterations} iterations")
train_crew(n_iterations, filename)
@@ -157,11 +155,29 @@ def train(n_iterations: int, filename: str):
type=str,
help="Replay the crew from this task ID, including all subsequent tasks.",
)
def replay(task_id: str) -> None:
"""Replay the crew execution from a specific task."""
@click.option(
"-f",
"--filename",
"trained_agents_file",
type=str,
default=None,
help=(
"Path to a trained-agents pickle (produced by `crewai train -f`). "
"When set, agents load suggestions from this file instead of the "
"default trained_agents_data.pkl. Equivalent to setting "
"CREWAI_TRAINED_AGENTS_FILE."
),
)
def replay(task_id: str, trained_agents_file: str | None) -> None:
"""Replay the crew execution from a specific task.
Args:
task_id: The ID of the task to replay from.
trained_agents_file: Optional trained-agents pickle path.
"""
try:
click.echo(f"Replaying the crew from task {task_id}")
replay_task_command(task_id)
replay_task_command(task_id, trained_agents_file=trained_agents_file)
except Exception as e:
click.echo(f"An error occurred while replaying: {e}", err=True)
@@ -339,10 +355,23 @@ def memory(
default="gpt-4o-mini",
help="LLM Model to run the tests on the Crew. For now only accepting only OpenAI models.",
)
def test(n_iterations: int, model: str):
@click.option(
"-f",
"--filename",
"trained_agents_file",
type=str,
default=None,
help=(
"Path to a trained-agents pickle (produced by `crewai train -f`). "
"When set, agents load suggestions from this file instead of the "
"default trained_agents_data.pkl. Equivalent to setting "
"CREWAI_TRAINED_AGENTS_FILE."
),
)
def test(n_iterations: int, model: str, trained_agents_file: str | None) -> None:
"""Test the crew and evaluate the results."""
click.echo(f"Testing the crew for {n_iterations} iterations with model {model}")
evaluate_crew(n_iterations, model)
evaluate_crew(n_iterations, model, trained_agents_file=trained_agents_file)
@crewai.command(
@@ -352,46 +381,80 @@ def test(n_iterations: int, model: str):
}
)
@click.pass_context
def install(context):
def install(context: click.Context) -> None:
"""Install the Crew."""
install_crew(context.args)
@crewai.command()
def run():
@click.option(
"-f",
"--filename",
"trained_agents_file",
type=str,
default=None,
help=(
"Path to a trained-agents pickle (produced by `crewai train -f`). "
"When set, agents load suggestions from this file instead of the "
"default trained_agents_data.pkl. Equivalent to setting "
"CREWAI_TRAINED_AGENTS_FILE."
),
)
def run(trained_agents_file: str | None) -> None:
"""Run the Crew."""
run_crew()
run_crew(trained_agents_file=trained_agents_file)
@crewai.command()
def update():
def update() -> None:
"""Update the pyproject.toml of the Crew project to use uv."""
update_crew()
@crewai.command()
def login():
def login() -> None:
"""Sign Up/Login to CrewAI AMP."""
Settings().clear_user_settings()
AuthenticationCommand().login()
@crewai.command()
@click.option(
"--reset", is_flag=True, help="Also reset all CLI configuration to defaults"
)
def logout(reset: bool) -> None:
"""Logout from CrewAI AMP."""
settings = Settings()
if reset:
settings.reset()
click.echo("Successfully logged out and reset all CLI configuration.")
else:
TokenManager().clear_tokens()
settings.clear_user_settings()
click.echo("Successfully logged out from CrewAI AMP.")
# DEPLOY CREWAI+ COMMANDS
@crewai.group()
def deploy():
def deploy() -> None:
"""Deploy the Crew CLI group."""
@deploy.command(name="create")
@click.option("-y", "--yes", is_flag=True, help="Skip the confirmation prompt")
def deploy_create(yes: bool):
@click.option(
"--skip-validate",
is_flag=True,
help="Skip the pre-deploy validation checks.",
)
def deploy_create(yes: bool, skip_validate: bool) -> None:
"""Create a Crew deployment."""
deploy_cmd = DeployCommand()
deploy_cmd.create_crew(yes)
deploy_cmd.create_crew(yes, skip_validate=skip_validate)
@deploy.command(name="list")
def deploy_list():
def deploy_list() -> None:
"""List all deployments."""
deploy_cmd = DeployCommand()
deploy_cmd.list_crews()
@@ -399,15 +462,33 @@ def deploy_list():
@deploy.command(name="push")
@click.option("-u", "--uuid", type=str, help="Crew UUID parameter")
def deploy_push(uuid: str | None):
@click.option(
"--skip-validate",
is_flag=True,
help="Skip the pre-deploy validation checks.",
)
def deploy_push(uuid: str | None, skip_validate: bool) -> None:
"""Deploy the Crew."""
deploy_cmd = DeployCommand()
deploy_cmd.deploy(uuid=uuid)
deploy_cmd.deploy(uuid=uuid, skip_validate=skip_validate)
@deploy.command(name="validate")
def deploy_validate() -> None:
"""Validate the current project against common deployment failures.
Runs the same pre-deploy checks that `crewai deploy create` and
`crewai deploy push` run automatically, without contacting the platform.
Exits non-zero if any blocking issues are found.
"""
from crewai_cli.deploy.validate import run_validate_command
run_validate_command()
@deploy.command(name="status")
@click.option("-u", "--uuid", type=str, help="Crew UUID parameter")
def deply_status(uuid: str | None):
def deply_status(uuid: str | None) -> None:
"""Get the status of a deployment."""
deploy_cmd = DeployCommand()
deploy_cmd.get_crew_status(uuid=uuid)
@@ -415,7 +496,7 @@ def deply_status(uuid: str | None):
@deploy.command(name="logs")
@click.option("-u", "--uuid", type=str, help="Crew UUID parameter")
def deploy_logs(uuid: str | None):
def deploy_logs(uuid: str | None) -> None:
"""Get the logs of a deployment."""
deploy_cmd = DeployCommand()
deploy_cmd.get_crew_logs(uuid=uuid)
@@ -423,27 +504,27 @@ def deploy_logs(uuid: str | None):
@deploy.command(name="remove")
@click.option("-u", "--uuid", type=str, help="Crew UUID parameter")
def deploy_remove(uuid: str | None):
def deploy_remove(uuid: str | None) -> None:
"""Remove a deployment."""
deploy_cmd = DeployCommand()
deploy_cmd.remove_crew(uuid=uuid)
@crewai.group()
def tool():
def tool() -> None:
"""Tool Repository related commands."""
@tool.command(name="create")
@click.argument("handle")
def tool_create(handle: str):
def tool_create(handle: str) -> None:
tool_cmd = ToolCommand()
tool_cmd.create(handle)
@tool.command(name="install")
@click.argument("handle")
def tool_install(handle: str):
def tool_install(handle: str) -> None:
tool_cmd = ToolCommand()
tool_cmd.login()
tool_cmd.install(handle)
@@ -459,26 +540,53 @@ def tool_install(handle: str):
)
@click.option("--public", "is_public", flag_value=True, default=False)
@click.option("--private", "is_public", flag_value=False)
def tool_publish(is_public: bool, force: bool):
def tool_publish(is_public: bool, force: bool) -> None:
tool_cmd = ToolCommand()
tool_cmd.login()
tool_cmd.publish(is_public, force)
@crewai.group()
def flow():
def template() -> None:
"""Browse and install project templates."""
@template.command(name="list")
def template_list() -> None:
"""List available templates and select one to install."""
template_cmd = TemplateCommand()
template_cmd.list_templates()
@template.command(name="add")
@click.argument("name")
@click.option(
"-o",
"--output-dir",
type=str,
default=None,
help="Directory name for the template (defaults to template name)",
)
def template_add(name: str, output_dir: str | None) -> None:
"""Add a template to the current directory."""
template_cmd = TemplateCommand()
template_cmd.add_template(name, output_dir)
@crewai.group()
def flow() -> None:
"""Flow related commands."""
@flow.command(name="kickoff")
def flow_run():
def flow_run() -> None:
"""Kickoff the Flow."""
click.echo("Running the Flow")
kickoff_flow()
@flow.command(name="plot")
def flow_plot():
def flow_plot() -> None:
"""Plot the Flow."""
click.echo("Plotting the Flow")
plot_flow()
@@ -486,19 +594,19 @@ def flow_plot():
@flow.command(name="add-crew")
@click.argument("crew_name")
def flow_add_crew(crew_name):
def flow_add_crew(crew_name: str) -> None:
"""Add a crew to an existing flow."""
click.echo(f"Adding crew {crew_name} to the flow")
add_crew_to_flow(crew_name)
@crewai.group()
def triggers():
def triggers() -> None:
"""Trigger related commands. Use 'crewai triggers list' to see available triggers, or 'crewai triggers run app_slug/trigger_slug' to execute."""
@triggers.command(name="list")
def triggers_list():
def triggers_list() -> None:
"""List all available triggers from integrations."""
triggers_cmd = TriggersCommand()
triggers_cmd.list_triggers()
@@ -506,14 +614,14 @@ def triggers_list():
@triggers.command(name="run")
@click.argument("trigger_path")
def triggers_run(trigger_path: str):
def triggers_run(trigger_path: str) -> None:
"""Execute crew with trigger payload. Format: app_slug/trigger_slug"""
triggers_cmd = TriggersCommand()
triggers_cmd.execute_with_trigger(trigger_path)
@crewai.command()
def chat():
def chat() -> None:
"""Start a conversation with the Crew, collecting user-supplied inputs,
and using the Chat LLM to generate responses.
"""
@@ -524,12 +632,12 @@ def chat():
@crewai.group(invoke_without_command=True)
def org():
def org() -> None:
"""Organization management commands."""
@org.command("list")
def org_list():
def org_list() -> None:
"""List available organizations."""
org_command = OrganizationCommand()
org_command.list()
@@ -537,39 +645,39 @@ def org_list():
@org.command()
@click.argument("id")
def switch(id):
def switch(id: str) -> None:
"""Switch to a specific organization."""
org_command = OrganizationCommand()
org_command.switch(id)
@org.command()
def current():
def current() -> None:
"""Show current organization when 'crewai org' is called without subcommands."""
org_command = OrganizationCommand()
org_command.current()
@crewai.group()
def enterprise():
def enterprise() -> None:
"""Enterprise Configuration commands."""
@enterprise.command("configure")
@click.argument("enterprise_url")
def enterprise_configure(enterprise_url: str):
def enterprise_configure(enterprise_url: str) -> None:
"""Configure CrewAI AMP OAuth2 settings from the provided Enterprise URL."""
enterprise_command = EnterpriseConfigureCommand()
enterprise_command.configure(enterprise_url)
@crewai.group()
def config():
def config() -> None:
"""CLI Configuration commands."""
@config.command("list")
def config_list():
def config_list() -> None:
"""List all CLI configuration parameters."""
config_command = SettingsCommand()
config_command.list()
@@ -578,28 +686,27 @@ def config_list():
@config.command("set")
@click.argument("key")
@click.argument("value")
def config_set(key: str, value: str):
def config_set(key: str, value: str) -> None:
"""Set a CLI configuration parameter."""
config_command = SettingsCommand()
config_command.set(key, value)
@config.command("reset")
def config_reset():
def config_reset() -> None:
"""Reset all CLI configuration parameters to default values."""
config_command = SettingsCommand()
config_command.reset_all_settings()
@crewai.group()
def env():
def env() -> None:
"""Environment variable commands."""
@env.command("view")
def env_view():
def env_view() -> None:
"""View tracing-related environment variables."""
import os
from pathlib import Path
from rich.console import Console
@@ -675,12 +782,12 @@ def env_view():
@crewai.group()
def traces():
def traces() -> None:
"""Trace collection management commands."""
@traces.command("enable")
def traces_enable():
def traces_enable() -> None:
"""Enable trace collection for crew/flow executions."""
from rich.console import Console
from rich.panel import Panel
@@ -705,7 +812,7 @@ def traces_enable():
@traces.command("disable")
def traces_disable():
def traces_disable() -> None:
"""Disable trace collection for crew/flow executions."""
from rich.console import Console
from rich.panel import Panel
@@ -730,9 +837,8 @@ def traces_disable():
@traces.command("status")
def traces_status():
def traces_status() -> None:
"""Show current trace collection status."""
import os
from rich.console import Console
from rich.panel import Panel
@@ -777,5 +883,84 @@ def traces_status():
console.print(panel)
@crewai.group(invoke_without_command=True)
@click.option(
"--location", default="./.checkpoints", help="Checkpoint directory or SQLite file."
)
@click.pass_context
def checkpoint(ctx: click.Context, location: str) -> None:
"""Browse and inspect checkpoints. Launches a TUI when called without a subcommand."""
from crewai_cli.checkpoint_cli import _detect_location
location = _detect_location(location)
ctx.ensure_object(dict)
ctx.obj["location"] = location
if ctx.invoked_subcommand is None:
from crewai_cli.checkpoint_tui import run_checkpoint_tui
run_checkpoint_tui(location)
@checkpoint.command("list")
@click.argument("location", default="./.checkpoints")
def checkpoint_list(location: str) -> None:
"""List checkpoints in a directory."""
from crewai_cli.checkpoint_cli import _detect_location, list_checkpoints
list_checkpoints(_detect_location(location))
@checkpoint.command("info")
@click.argument("path", default="./.checkpoints")
def checkpoint_info(path: str) -> None:
"""Show details of a checkpoint. Pass a file or directory for latest."""
from crewai_cli.checkpoint_cli import _detect_location, info_checkpoint
info_checkpoint(_detect_location(path))
@checkpoint.command("resume")
@click.argument("checkpoint_id", required=False, default=None)
@click.pass_context
def checkpoint_resume(ctx: click.Context, checkpoint_id: str | None) -> None:
"""Resume from a checkpoint. Defaults to the most recent."""
from crewai_cli.checkpoint_cli import resume_checkpoint
resume_checkpoint(ctx.obj["location"], checkpoint_id)
@checkpoint.command("diff")
@click.argument("id1")
@click.argument("id2")
@click.pass_context
def checkpoint_diff(ctx: click.Context, id1: str, id2: str) -> None:
"""Compare two checkpoints side-by-side."""
from crewai_cli.checkpoint_cli import diff_checkpoints
diff_checkpoints(ctx.obj["location"], id1, id2)
@checkpoint.command("prune")
@click.option(
"--keep", type=int, default=None, help="Keep the N most recent checkpoints."
)
@click.option(
"--older-than",
default=None,
help="Remove checkpoints older than duration (e.g. 7d, 24h, 30m).",
)
@click.option(
"--dry-run", is_flag=True, help="Show what would be pruned without deleting."
)
@click.pass_context
def checkpoint_prune(
ctx: click.Context, keep: int | None, older_than: str | None, dry_run: bool
) -> None:
"""Remove old checkpoints."""
from crewai_cli.checkpoint_cli import prune_checkpoints
prune_checkpoints(ctx.obj["location"], keep, older_than, dry_run)
if __name__ == "__main__":
crewai()

View File

@@ -77,7 +77,7 @@ CLI_SETTINGS_KEYS = [
]
# Default values for CLI settings
DEFAULT_CLI_SETTINGS = {
DEFAULT_CLI_SETTINGS: dict[str, Any] = {
"enterprise_base_url": DEFAULT_CREWAI_ENTERPRISE_URL,
"oauth2_provider": CREWAI_ENTERPRISE_DEFAULT_OAUTH2_PROVIDER,
"oauth2_audience": CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE,

View File

@@ -4,7 +4,7 @@ import shutil
import click
def create_flow(name):
def create_flow(name: str) -> None:
"""Create a new flow."""
folder_name = name.replace(" ", "_").replace("-", "_").lower()
class_name = name.replace("_", " ").replace("-", " ").title().replace(" ", "")
@@ -40,10 +40,10 @@ def create_flow(name):
tools_template_files = ["tools/__init__.py", "tools/custom_tool.py"]
crew_folders = [
"poem_crew",
"content_crew",
]
def process_file(src_file, dst_file):
def process_file(src_file: Path, dst_file: Path) -> None:
if src_file.suffix in [".pyc", ".pyo", ".pyd"]:
return

View File

@@ -5,18 +5,41 @@ from rich.console import Console
from crewai_cli import git
from crewai_cli.command import BaseCommand, PlusAPIMixin
from crewai_cli.deploy.validate import validate_project
from crewai_cli.utils import fetch_and_json_env_file, get_project_name
console = Console()
def _run_predeploy_validation(skip_validate: bool) -> bool:
"""Run pre-deploy validation unless skipped.
Returns True if deployment should proceed, False if it should abort.
"""
if skip_validate:
console.print(
"[yellow]Skipping pre-deploy validation (--skip-validate).[/yellow]"
)
return True
console.print("Running pre-deploy validation...", style="bold blue")
validator = validate_project()
if not validator.ok:
console.print(
"\n[bold red]Pre-deploy validation failed. "
"Fix the issues above or re-run with --skip-validate.[/bold red]"
)
return False
return True
class DeployCommand(BaseCommand, PlusAPIMixin):
"""
A class to handle deployment-related operations for CrewAI projects.
"""
def __init__(self):
def __init__(self) -> None:
"""
Initialize the DeployCommand with project name and API client.
"""
@@ -96,13 +119,16 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
f"{log_message['timestamp']} - {log_message['level']}: {log_message['message']}"
)
def deploy(self, uuid: str | None = None) -> None:
def deploy(self, uuid: str | None = None, skip_validate: bool = False) -> None:
"""
Deploy a crew using either UUID or project name.
Args:
uuid (Optional[str]): The UUID of the crew to deploy.
skip_validate (bool): Skip pre-deploy validation checks.
"""
if not _run_predeploy_validation(skip_validate):
return
console.print("Starting deployment...", style="bold blue")
if uuid:
response = self.plus_api_client.deploy_by_uuid(uuid)
@@ -115,10 +141,16 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
self._validate_response(response)
self._display_deployment_info(response.json())
def create_crew(self, confirm: bool = False) -> None:
def create_crew(self, confirm: bool = False, skip_validate: bool = False) -> None:
"""
Create a new crew deployment.
Args:
confirm (bool): Whether to skip the interactive confirmation prompt.
skip_validate (bool): Skip pre-deploy validation checks.
"""
if not _run_predeploy_validation(skip_validate):
return
console.print("Creating deployment...", style="bold blue")
env_vars = fetch_and_json_env_file()

View File

@@ -0,0 +1,845 @@
"""Pre-deploy validation for CrewAI projects.
Catches locally what a deploy would reject at build or runtime so users
don't burn deployment attempts on fixable project-structure problems.
Each check is grouped into one of:
- ERROR: will block a deployment; validator exits non-zero.
- WARNING: may still deploy but is almost always a deployment bug; printed
but does not block.
The individual checks mirror the categories observed in production
deployment-failure logs:
1. pyproject.toml present with ``[project].name``
2. lockfile (``uv.lock`` or ``poetry.lock``) present and not stale
3. package directory at ``src/<package>/`` exists (no empty name, no egg-info)
4. standard crew files: ``crew.py``, ``config/agents.yaml``, ``config/tasks.yaml``
5. flow entrypoint: ``main.py`` with a Flow subclass
6. hatch wheel target resolves (packages = [...] or default dir matches name)
7. crew/flow module imports cleanly (catches ``@CrewBase not found``,
``No Flow subclass found``, provider import errors)
8. environment variables referenced in code vs ``.env`` / deployment env
9. installed crewai vs lockfile pin (catches missing-attribute failures from
stale pins)
"""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
import json
import logging
import os
from pathlib import Path
import re
import shutil
import subprocess
import sys
from typing import Any
from rich.console import Console
from crewai_cli.utils import parse_toml
console = Console()
logger = logging.getLogger(__name__)
class Severity(str, Enum):
"""Severity of a validation finding."""
ERROR = "error"
WARNING = "warning"
@dataclass
class ValidationResult:
"""A single finding from a validation check.
Attributes:
severity: whether this blocks deploy or is advisory.
code: stable short identifier, used in tests and docs
(e.g. ``missing_pyproject``, ``stale_lockfile``).
title: one-line summary shown to the user.
detail: optional multi-line explanation.
hint: optional remediation suggestion.
"""
severity: Severity
code: str
title: str
detail: str = ""
hint: str = ""
# Maps known provider env var names → label used in hint messages.
_KNOWN_API_KEY_HINTS: dict[str, str] = {
"OPENAI_API_KEY": "OpenAI",
"ANTHROPIC_API_KEY": "Anthropic",
"GOOGLE_API_KEY": "Google",
"GEMINI_API_KEY": "Gemini",
"AZURE_OPENAI_API_KEY": "Azure OpenAI",
"AZURE_API_KEY": "Azure",
"AWS_ACCESS_KEY_ID": "AWS",
"AWS_SECRET_ACCESS_KEY": "AWS",
"COHERE_API_KEY": "Cohere",
"GROQ_API_KEY": "Groq",
"MISTRAL_API_KEY": "Mistral",
"TAVILY_API_KEY": "Tavily",
"SERPER_API_KEY": "Serper",
"SERPLY_API_KEY": "Serply",
"PERPLEXITY_API_KEY": "Perplexity",
"DEEPSEEK_API_KEY": "DeepSeek",
"OPENROUTER_API_KEY": "OpenRouter",
"FIRECRAWL_API_KEY": "Firecrawl",
"EXA_API_KEY": "Exa",
"BROWSERBASE_API_KEY": "Browserbase",
}
def normalize_package_name(project_name: str) -> str:
"""Normalize a pyproject project.name into a Python package directory name.
Mirrors the rules in ``crewai.cli.create_crew.create_crew`` so the
validator agrees with the scaffolder about where ``src/<pkg>/`` should
live.
"""
folder = project_name.replace(" ", "_").replace("-", "_").lower()
return re.sub(r"[^a-zA-Z0-9_]", "", folder)
class DeployValidator:
"""Runs the full pre-deploy validation suite against a project directory."""
def __init__(self, project_root: Path | None = None) -> None:
self.project_root: Path = (project_root or Path.cwd()).resolve()
self.results: list[ValidationResult] = []
self._pyproject: dict[str, Any] | None = None
self._project_name: str | None = None
self._package_name: str | None = None
self._package_dir: Path | None = None
self._is_flow: bool = False
def _add(
self,
severity: Severity,
code: str,
title: str,
detail: str = "",
hint: str = "",
) -> None:
self.results.append(
ValidationResult(
severity=severity,
code=code,
title=title,
detail=detail,
hint=hint,
)
)
@property
def errors(self) -> list[ValidationResult]:
return [r for r in self.results if r.severity is Severity.ERROR]
@property
def warnings(self) -> list[ValidationResult]:
return [r for r in self.results if r.severity is Severity.WARNING]
@property
def ok(self) -> bool:
return not self.errors
def run(self) -> list[ValidationResult]:
"""Run all checks. Later checks are skipped when earlier ones make
them impossible (e.g. no pyproject.toml → no lockfile check)."""
if not self._check_pyproject():
return self.results
self._check_lockfile()
if not self._check_package_dir():
self._check_hatch_wheel_target()
return self.results
if self._is_flow:
self._check_flow_entrypoint()
else:
self._check_crew_entrypoint()
self._check_config_yamls()
self._check_hatch_wheel_target()
self._check_module_imports()
self._check_env_vars()
self._check_version_vs_lockfile()
return self.results
def _check_pyproject(self) -> bool:
pyproject_path = self.project_root / "pyproject.toml"
if not pyproject_path.exists():
self._add(
Severity.ERROR,
"missing_pyproject",
"Cannot find pyproject.toml",
detail=(
f"Expected pyproject.toml at {pyproject_path}. "
"CrewAI projects must be installable Python packages."
),
hint="Run `crewai create crew <name>` to scaffold a valid project layout.",
)
return False
try:
self._pyproject = parse_toml(pyproject_path.read_text())
except Exception as e:
self._add(
Severity.ERROR,
"invalid_pyproject",
"pyproject.toml is not valid TOML",
detail=str(e),
)
return False
project = self._pyproject.get("project") or {}
name = project.get("name")
if not isinstance(name, str) or not name.strip():
self._add(
Severity.ERROR,
"missing_project_name",
"pyproject.toml is missing [project].name",
detail=(
"Without a project name the platform cannot resolve your "
"package directory (this produces errors like "
"'Cannot find src//crew.py')."
),
hint='Set a `name = "..."` field under `[project]` in pyproject.toml.',
)
return False
self._project_name = name
self._package_name = normalize_package_name(name)
self._is_flow = (self._pyproject.get("tool") or {}).get("crewai", {}).get(
"type"
) == "flow"
return True
def _check_lockfile(self) -> None:
uv_lock = self.project_root / "uv.lock"
poetry_lock = self.project_root / "poetry.lock"
pyproject = self.project_root / "pyproject.toml"
if not uv_lock.exists() and not poetry_lock.exists():
self._add(
Severity.ERROR,
"missing_lockfile",
"Expected to find at least one of these files: uv.lock or poetry.lock",
hint=(
"Run `uv lock` (recommended) or `poetry lock` in your project "
"directory, commit the lockfile, then redeploy."
),
)
return
lockfile = uv_lock if uv_lock.exists() else poetry_lock
try:
if lockfile.stat().st_mtime < pyproject.stat().st_mtime:
self._add(
Severity.WARNING,
"stale_lockfile",
f"{lockfile.name} is older than pyproject.toml",
detail=(
"Your lockfile may not reflect recent dependency changes. "
"The platform resolves from the lockfile, so deployed "
"dependencies may differ from local."
),
hint="Run `uv lock` (or `poetry lock`) and commit the result.",
)
except OSError:
pass
def _check_package_dir(self) -> bool:
if self._package_name is None:
return False
src_dir = self.project_root / "src"
if not src_dir.is_dir():
self._add(
Severity.ERROR,
"missing_src_dir",
"Missing src/ directory",
detail=(
"CrewAI deployments expect a src-layout project: "
f"src/{self._package_name}/crew.py (or main.py for flows)."
),
hint="Run `crewai create crew <name>` to see the expected layout.",
)
return False
package_dir = src_dir / self._package_name
if not package_dir.is_dir():
siblings = [
p.name
for p in src_dir.iterdir()
if p.is_dir() and not p.name.endswith(".egg-info")
]
egg_info = [
p.name for p in src_dir.iterdir() if p.name.endswith(".egg-info")
]
hint_parts = [
f'Create src/{self._package_name}/ to match [project].name = "{self._project_name}".'
]
if siblings:
hint_parts.append(
f"Found other package directories: {', '.join(siblings)}. "
f"Either rename one to '{self._package_name}' or update [project].name."
)
if egg_info:
hint_parts.append(
f"Delete stale build artifacts: {', '.join(egg_info)} "
"(these confuse the platform's package discovery)."
)
self._add(
Severity.ERROR,
"missing_package_dir",
f"Cannot find src/{self._package_name}/",
detail=(
"The platform looks for your crew source under "
"src/<package_name>/, derived from [project].name."
),
hint=" ".join(hint_parts),
)
return False
for p in src_dir.iterdir():
if p.name.endswith(".egg-info"):
self._add(
Severity.WARNING,
"stale_egg_info",
f"Stale build artifact in src/: {p.name}",
detail=(
".egg-info directories can be mistaken for your package "
"and cause 'Cannot find src/<name>.egg-info/crew.py' errors."
),
hint=f"Delete {p} and add `*.egg-info/` to .gitignore.",
)
self._package_dir = package_dir
return True
def _check_crew_entrypoint(self) -> None:
if self._package_dir is None:
return
crew_py = self._package_dir / "crew.py"
if not crew_py.is_file():
self._add(
Severity.ERROR,
"missing_crew_py",
f"Cannot find {crew_py.relative_to(self.project_root)}",
detail=(
"Standard crew projects must define a Crew class decorated "
"with @CrewBase inside crew.py."
),
hint=(
"Create crew.py with an @CrewBase-annotated class, or set "
'`[tool.crewai] type = "flow"` in pyproject.toml if this is a flow.'
),
)
def _check_config_yamls(self) -> None:
if self._package_dir is None:
return
config_dir = self._package_dir / "config"
if not config_dir.is_dir():
self._add(
Severity.ERROR,
"missing_config_dir",
f"Cannot find {config_dir.relative_to(self.project_root)}",
hint="Create a config/ directory with agents.yaml and tasks.yaml.",
)
return
for yaml_name in ("agents.yaml", "tasks.yaml"):
yaml_path = config_dir / yaml_name
if not yaml_path.is_file():
self._add(
Severity.ERROR,
f"missing_{yaml_name.replace('.', '_')}",
f"Cannot find {yaml_path.relative_to(self.project_root)}",
detail=(
"CrewAI loads agent and task config from these files; "
"missing them causes empty-config warnings and runtime crashes."
),
)
def _check_flow_entrypoint(self) -> None:
if self._package_dir is None:
return
main_py = self._package_dir / "main.py"
if not main_py.is_file():
self._add(
Severity.ERROR,
"missing_flow_main",
f"Cannot find {main_py.relative_to(self.project_root)}",
detail=(
"Flow projects must define a Flow subclass in main.py. "
'This project has `[tool.crewai] type = "flow"` set.'
),
hint="Create main.py with a `class MyFlow(Flow[...])`.",
)
def _check_hatch_wheel_target(self) -> None:
if not self._pyproject:
return
build_system = self._pyproject.get("build-system") or {}
backend = build_system.get("build-backend", "")
if "hatchling" not in backend:
return
hatch_wheel = (
(self._pyproject.get("tool") or {})
.get("hatch", {})
.get("build", {})
.get("targets", {})
.get("wheel", {})
)
if hatch_wheel.get("packages") or hatch_wheel.get("only-include"):
return
if self._package_dir and self._package_dir.is_dir():
return
self._add(
Severity.ERROR,
"hatch_wheel_target_missing",
"Hatchling cannot determine which files to ship",
detail=(
"Your pyproject uses hatchling but has no "
"[tool.hatch.build.targets.wheel] configuration and no "
"directory matching your project name."
),
hint=(
"Add:\n"
" [tool.hatch.build.targets.wheel]\n"
f' packages = ["src/{self._package_name}"]'
),
)
def _check_module_imports(self) -> None:
"""Import the user's crew/flow via `uv run` so the check sees the same
package versions as `crewai run` would. Result is reported as JSON on
the subprocess's stdout."""
script = (
"import json, sys, traceback, os\n"
"os.chdir(sys.argv[1])\n"
"try:\n"
" from crewai.utilities.project_utils import get_crews, get_flows\n"
" is_flow = sys.argv[2] == 'flow'\n"
" if is_flow:\n"
" instances = get_flows()\n"
" kind = 'flow'\n"
" else:\n"
" instances = get_crews()\n"
" kind = 'crew'\n"
" print(json.dumps({'ok': True, 'kind': kind, 'count': len(instances)}))\n"
"except BaseException as e:\n"
" print(json.dumps({\n"
" 'ok': False,\n"
" 'error_type': type(e).__name__,\n"
" 'error': str(e),\n"
" 'traceback': traceback.format_exc(),\n"
" }))\n"
)
uv_path = shutil.which("uv")
if uv_path is None:
self._add(
Severity.WARNING,
"uv_not_found",
"Skipping import check: `uv` not installed",
hint="Install uv: https://docs.astral.sh/uv/",
)
return
try:
proc = subprocess.run( # noqa: S603 - args constructed from trusted inputs
[
uv_path,
"run",
"python",
"-c",
script,
str(self.project_root),
"flow" if self._is_flow else "crew",
],
cwd=self.project_root,
capture_output=True,
text=True,
timeout=120,
check=False,
)
except subprocess.TimeoutExpired:
self._add(
Severity.ERROR,
"import_timeout",
"Importing your crew/flow module timed out after 120s",
detail=(
"User code may be making network calls or doing heavy work "
"at import time. Move that work into agent methods."
),
)
return
# The payload is the last JSON object on stdout; user code may print
# other lines before it.
payload: dict[str, Any] | None = None
for line in reversed(proc.stdout.splitlines()):
line = line.strip()
if line.startswith("{") and line.endswith("}"):
try:
payload = json.loads(line)
break
except json.JSONDecodeError:
continue
if payload is None:
self._add(
Severity.ERROR,
"import_failed",
"Could not import your crew/flow module",
detail=(proc.stderr or proc.stdout or "").strip()[:1500],
hint="Run `crewai run` locally first to reproduce the error.",
)
return
if payload.get("ok"):
if payload.get("count", 0) == 0:
kind = payload.get("kind", "crew")
if kind == "flow":
self._add(
Severity.ERROR,
"no_flow_subclass",
"No Flow subclass found in the module",
hint=(
"main.py must define a class extending "
"`crewai.flow.Flow`, instantiable with no arguments."
),
)
else:
self._add(
Severity.ERROR,
"no_crewbase_class",
"Crew class annotated with @CrewBase not found",
hint=(
"Decorate your crew class with @CrewBase from "
"crewai.project (see `crewai create crew` template)."
),
)
return
err_msg = str(payload.get("error", ""))
err_type = str(payload.get("error_type", "Exception"))
tb = str(payload.get("traceback", ""))
self._classify_import_error(err_type, err_msg, tb)
def _classify_import_error(self, err_type: str, err_msg: str, tb: str) -> None:
"""Turn a raw import-time exception into a user-actionable finding."""
# Must be checked before the generic "native provider" branch below:
# the extras-missing message contains the same phrase. Providers
# format the install command as plain text (`to install: uv add
# "crewai[extra]"`); also tolerate backtick-delimited variants.
m = re.search(
r"(?P<pkg>[A-Za-z0-9_ -]+?)\s+native provider not available"
r".*?to install:\s*`?(?P<cmd>uv add [\"']crewai\[[^\]]+\][\"'])`?",
err_msg,
)
if m:
self._add(
Severity.ERROR,
"missing_provider_extra",
f"{m.group('pkg').strip()} provider extra not installed",
hint=f"Run: {m.group('cmd')}",
)
return
# crewai.llm.LLM.__new__ wraps provider init errors as
# ImportError("Error importing native provider: ...").
if "Error importing native provider" in err_msg or "native provider" in err_msg:
missing_key = self._extract_missing_api_key(err_msg)
if missing_key:
provider = _KNOWN_API_KEY_HINTS.get(missing_key, missing_key)
self._add(
Severity.WARNING,
"llm_init_missing_key",
f"LLM is constructed at import time but {missing_key} is not set",
detail=(
f"Your crew instantiates a {provider} LLM during module "
"load (e.g. in a class field default or @crew method). "
f"The {provider} provider currently requires {missing_key} "
"at construction time, so this will fail on the platform "
"unless the key is set in your deployment environment."
),
hint=(
f"Add {missing_key} to your deployment's Environment "
"Variables before deploying, or move LLM construction "
"inside agent methods so it runs lazily."
),
)
return
self._add(
Severity.ERROR,
"llm_provider_init_failed",
"LLM native provider failed to initialize",
detail=err_msg,
hint=(
"Check your LLM(model=...) configuration and provider-specific "
"extras (e.g. `uv add 'crewai[azure-ai-inference]'` for Azure)."
),
)
return
if err_type == "KeyError":
key = err_msg.strip("'\"")
if key in _KNOWN_API_KEY_HINTS or key.endswith("_API_KEY"):
self._add(
Severity.WARNING,
"env_var_read_at_import",
f"{key} is read at import time via os.environ[...]",
detail=(
"Using os.environ[...] (rather than os.getenv(...)) "
"at module scope crashes the build if the key isn't set."
),
hint=(
f"Either add {key} as a deployment env var, or switch "
"to os.getenv() and move the access inside agent methods."
),
)
return
if "Crew class annotated with @CrewBase not found" in err_msg:
self._add(
Severity.ERROR,
"no_crewbase_class",
"Crew class annotated with @CrewBase not found",
detail=err_msg,
)
return
if "No Flow subclass found" in err_msg:
self._add(
Severity.ERROR,
"no_flow_subclass",
"No Flow subclass found in the module",
detail=err_msg,
)
return
if (
err_type == "AttributeError"
and "has no attribute '_load_response_format'" in err_msg
):
self._add(
Severity.ERROR,
"stale_crewai_pin",
"Your lockfile pins a crewai version missing `_load_response_format`",
detail=err_msg,
hint=(
"Run `uv lock --upgrade-package crewai` (or `poetry update crewai`) "
"to pin a newer release."
),
)
return
if "pydantic" in tb.lower() or "validation error" in err_msg.lower():
self._add(
Severity.ERROR,
"pydantic_validation_error",
"Pydantic validation failed while loading your crew",
detail=err_msg[:800],
hint=(
"Check agent/task configuration fields. `crewai run` locally "
"will show the full traceback."
),
)
return
self._add(
Severity.ERROR,
"import_failed",
f"Importing your crew failed: {err_type}",
detail=err_msg[:800],
hint="Run `crewai run` locally to see the full traceback.",
)
@staticmethod
def _extract_missing_api_key(err_msg: str) -> str | None:
"""Pull 'FOO_API_KEY' out of '... FOO_API_KEY is required ...'."""
m = re.search(r"([A-Z][A-Z0-9_]*_API_KEY)\s+is required", err_msg)
if m:
return m.group(1)
m = re.search(r"['\"]([A-Z][A-Z0-9_]*_API_KEY)['\"]", err_msg)
if m:
return m.group(1)
return None
def _check_env_vars(self) -> None:
"""Warn about env vars referenced in user code but missing locally.
Best-effort only — the platform sets vars server-side, so we never error.
"""
if not self._package_dir:
return
referenced: set[str] = set()
pattern = re.compile(
r"""(?x)
(?:os\.environ\s*(?:\[\s*|\.get\s*\(\s*)
|os\.getenv\s*\(\s*
|getenv\s*\(\s*)
['"]([A-Z][A-Z0-9_]*)['"]
"""
)
for path in self._package_dir.rglob("*.py"):
try:
text = path.read_text(encoding="utf-8", errors="ignore")
except OSError:
continue
referenced.update(pattern.findall(text))
for path in self._package_dir.rglob("*.yaml"):
try:
text = path.read_text(encoding="utf-8", errors="ignore")
except OSError:
continue
referenced.update(re.findall(r"\$\{?([A-Z][A-Z0-9_]+)\}?", text))
env_file = self.project_root / ".env"
env_keys: set[str] = set()
if env_file.exists():
for line in env_file.read_text(errors="ignore").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
env_keys.add(line.split("=", 1)[0].strip())
missing_known: list[str] = sorted(
var
for var in referenced
if var in _KNOWN_API_KEY_HINTS
and var not in env_keys
and var not in os.environ
)
if missing_known:
self._add(
Severity.WARNING,
"env_vars_not_in_dotenv",
f"{len(missing_known)} referenced API key(s) not in .env",
detail=(
"These env vars are referenced in your source but not set "
f"locally: {', '.join(missing_known)}. Deploys will fail "
"unless they are added to the deployment's Environment "
"Variables in the CrewAI dashboard."
),
)
def _check_version_vs_lockfile(self) -> None:
"""Warn when the lockfile pins a crewai release older than 1.13.0,
which is where ``_load_response_format`` was introduced.
"""
uv_lock = self.project_root / "uv.lock"
poetry_lock = self.project_root / "poetry.lock"
lockfile = (
uv_lock
if uv_lock.exists()
else poetry_lock
if poetry_lock.exists()
else None
)
if lockfile is None:
return
try:
text = lockfile.read_text(errors="ignore")
except OSError:
return
m = re.search(
r'name\s*=\s*"crewai"\s*\nversion\s*=\s*"([^"]+)"',
text,
)
if not m:
return
locked = m.group(1)
try:
from packaging.version import Version
if Version(locked) < Version("1.13.0"):
self._add(
Severity.WARNING,
"old_crewai_pin",
f"Lockfile pins crewai=={locked} (older than 1.13.0)",
detail=(
"Older pinned versions are missing API surface the "
"platform builder expects (e.g. `_load_response_format`)."
),
hint="Run `uv lock --upgrade-package crewai` and redeploy.",
)
except Exception as e:
logger.debug("Could not parse crewai pin from lockfile: %s", e)
def render_report(results: list[ValidationResult]) -> None:
"""Pretty-print results to the shared rich console."""
if not results:
console.print("[bold green]Pre-deploy validation passed.[/bold green]")
return
errors = [r for r in results if r.severity is Severity.ERROR]
warnings = [r for r in results if r.severity is Severity.WARNING]
for result in errors:
console.print(f"[bold red]ERROR[/bold red] [{result.code}] {result.title}")
if result.detail:
console.print(f" {result.detail}")
if result.hint:
console.print(f" [dim]hint:[/dim] {result.hint}")
for result in warnings:
console.print(
f"[bold yellow]WARNING[/bold yellow] [{result.code}] {result.title}"
)
if result.detail:
console.print(f" {result.detail}")
if result.hint:
console.print(f" [dim]hint:[/dim] {result.hint}")
summary_parts: list[str] = []
if errors:
summary_parts.append(f"[bold red]{len(errors)} error(s)[/bold red]")
if warnings:
summary_parts.append(f"[bold yellow]{len(warnings)} warning(s)[/bold yellow]")
console.print(f"\n{' / '.join(summary_parts)}")
def validate_project(project_root: Path | None = None) -> DeployValidator:
"""Entrypoint: run validation, render results, return the validator.
The caller inspects ``validator.ok`` to decide whether to proceed with a
deploy.
"""
validator = DeployValidator(project_root=project_root)
validator.run()
render_report(validator.results)
return validator
def run_validate_command() -> None:
"""Implementation of `crewai deploy validate`."""
validator = validate_project()
if not validator.ok:
sys.exit(1)

View File

@@ -1,23 +1,34 @@
import subprocess
import click
from crewai.utilities.constants import CREWAI_TRAINED_AGENTS_FILE_ENV
from crewai_cli.utils import build_env_with_all_tool_credentials
def evaluate_crew(n_iterations: int, model: str) -> None:
"""
Test and Evaluate the crew by running a command in the UV environment.
def evaluate_crew(
n_iterations: int, model: str, trained_agents_file: str | None = None
) -> None:
"""Test and Evaluate the crew by running a command in the UV environment.
Args:
n_iterations (int): The number of iterations to test the crew.
model (str): The model to test the crew with.
n_iterations: The number of iterations to test the crew.
model: The model to test the crew with.
trained_agents_file: Optional trained-agents pickle path forwarded to
the subprocess via the ``CREWAI_TRAINED_AGENTS_FILE`` env var.
"""
command = ["uv", "run", "test", str(n_iterations), model]
env = build_env_with_all_tool_credentials()
if trained_agents_file:
env[CREWAI_TRAINED_AGENTS_FILE_ENV] = trained_agents_file
try:
if n_iterations <= 0:
raise ValueError("The number of iterations must be a positive integer.")
result = subprocess.run(command, capture_output=False, text=True, check=True) # noqa: S603
result = subprocess.run( # noqa: S603
command, capture_output=False, text=True, check=True, env=env
)
if result.stderr:
click.echo(result.stderr, err=True)

View File

@@ -2,6 +2,8 @@ import subprocess
import click
from crewai_cli.utils import build_env_with_all_tool_credentials
# Be mindful about changing this.
# on some environments we don't use this command but instead uv sync directly
@@ -13,7 +15,14 @@ def install_crew(proxy_options: list[str]) -> None:
"""
try:
command = ["uv", "sync", *proxy_options]
subprocess.run(command, check=True, capture_output=False, text=True) # noqa: S603
# Inject tool repository credentials so uv can authenticate
# against private package indexes (e.g. crewai tool repository).
# Without this, `uv sync` fails with 401 Unauthorized when the
# project depends on tools from a private index.
env = build_env_with_all_tool_credentials()
subprocess.run(command, check=True, capture_output=False, text=True, env=env) # noqa: S603
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while running the crew: {e}", err=True)

View File

@@ -173,13 +173,13 @@ class MemoryTUI(App[None]):
info = self._memory.info("/")
tree.root.label = f"/ ({info.record_count} records)"
tree.root.data = "/"
self._add_children(tree.root, "/", depth=0, max_depth=3)
self._add_scope_children(tree.root, "/", depth=0, max_depth=3)
tree.root.expand()
return tree
def _add_children(
def _add_scope_children(
self,
parent_node: Tree.Node[str],
parent_node: Any,
path: str,
depth: int,
max_depth: int,
@@ -191,7 +191,7 @@ class MemoryTUI(App[None]):
child_info = self._memory.info(child)
label = f"{child} ({child_info.record_count})"
node = parent_node.add(label, data=child)
self._add_children(node, child, depth + 1, max_depth)
self._add_scope_children(node, child, depth + 1, max_depth)
# -- Populating the OptionList -------------------------------------------

View File

@@ -67,6 +67,7 @@ class PlusAPI:
description: str | None,
encoded_file: str,
available_exports: list[dict[str, Any]] | None = None,
tools_metadata: list[dict[str, Any]] | None = None,
) -> httpx.Response:
params = {
"handle": handle,
@@ -75,6 +76,9 @@ class PlusAPI:
"file": encoded_file,
"description": description,
"available_exports": available_exports,
"tools_metadata": {"package": handle, "tools": tools_metadata}
if tools_metadata is not None
else None,
}
return self._make_request("POST", f"{self.TOOLS_RESOURCE}", json=params)
@@ -190,6 +194,16 @@ class PlusAPI:
timeout=30,
)
def mark_ephemeral_trace_batch_as_failed(
self, trace_batch_id: str, error_message: str
) -> httpx.Response:
return self._make_request(
"PATCH",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}",
json={"status": "failed", "failure_reason": error_message},
timeout=30,
)
def get_mcp_configs(self, slugs: list[str]) -> httpx.Response:
"""Get MCP server configurations for the given slugs."""
return self._make_request(

View File

@@ -0,0 +1,250 @@
import io
import logging
import os
import shutil
from typing import Any
import zipfile
import click
import httpx
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from crewai_cli.command import BaseCommand
logger = logging.getLogger(__name__)
console = Console()
GITHUB_ORG = "crewAIInc"
TEMPLATE_PREFIX = "template_"
GITHUB_API_BASE = "https://api.github.com"
BANNER = """\
[bold white] ██████╗██████╗ ███████╗██╗ ██╗[/bold white] [bold red] █████╗ ██╗[/bold red]
[bold white]██╔════╝██╔══██╗██╔════╝██║ ██║[/bold white] [bold red]██╔══██╗██║[/bold red]
[bold white]██║ ██████╔╝█████╗ ██║ █╗ ██║[/bold white] [bold red]███████║██║[/bold red]
[bold white]██║ ██╔══██╗██╔══╝ ██║███╗██║[/bold white] [bold red]██╔══██║██║[/bold red]
[bold white]╚██████╗██║ ██║███████╗╚███╔███╔╝[/bold white] [bold red]██║ ██║██║[/bold red]
[bold white] ╚═════╝╚═╝ ╚═╝╚══════╝ ╚══╝╚══╝[/bold white] [bold red]╚═╝ ╚═╝╚═╝[/bold red]
[dim white]████████╗███████╗███╗ ███╗██████╗ ██╗ █████╗ ████████╗███████╗███████╗[/dim white]
[dim white]╚══██╔══╝██╔════╝████╗ ████║██╔══██╗██║ ██╔══██╗╚══██╔══╝██╔════╝██╔════╝[/dim white]
[dim white] ██║ █████╗ ██╔████╔██║██████╔╝██║ ███████║ ██║ █████╗ ███████╗[/dim white]
[dim white] ██║ ██╔══╝ ██║╚██╔╝██║██╔═══╝ ██║ ██╔══██║ ██║ ██╔══╝ ╚════██║[/dim white]
[dim white] ██║ ███████╗██║ ╚═╝ ██║██║ ███████╗██║ ██║ ██║ ███████╗███████║[/dim white]
[dim white] ╚═╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚══════╝╚═╝ ╚═╝ ╚═╝ ╚══════╝╚══════╝[/dim white]"""
class TemplateCommand(BaseCommand):
"""Handle template-related operations for CrewAI projects."""
def __init__(self) -> None:
super().__init__()
def list_templates(self) -> None:
"""List available templates with an interactive selector to install."""
templates = self._fetch_templates()
if not templates:
click.echo("No templates found.")
return
console.print(f"\n{BANNER}\n")
console.print(" [on cyan] templates [/on cyan]\n")
console.print(f" [green]o[/green] Source: https://github.com/{GITHUB_ORG}")
console.print(
f" [green]o[/green] Found [bold]{len(templates)}[/bold] templates\n"
)
console.print(" [green]o[/green] Select a template to install")
for idx, repo in enumerate(templates, start=1):
name = repo["name"].removeprefix(TEMPLATE_PREFIX)
description = repo.get("description") or ""
if description:
console.print(
f" [bold cyan]{idx}.[/bold cyan] [bold white]{name}[/bold white] [dim]({description})[/dim]"
)
else:
console.print(
f" [bold cyan]{idx}.[/bold cyan] [bold white]{name}[/bold white]"
)
console.print(" [bold cyan]q.[/bold cyan] [dim]Quit[/dim]\n")
while True:
choice = click.prompt("Enter your choice", type=str)
if choice.lower() == "q":
return
if choice.isdigit() and 1 <= int(choice) <= len(templates):
selected_index = int(choice) - 1
break
click.secho(
f"Please enter a number between 1 and {len(templates)}, or 'q' to quit.",
fg="yellow",
)
selected = templates[selected_index]
repo_name = selected["name"]
self._install_repo(repo_name)
def add_template(self, name: str, output_dir: str | None = None) -> None:
"""Download a template and copy it into the current working directory.
Args:
name: Template name (with or without the template_ prefix).
output_dir: Optional directory name. Defaults to the template name.
"""
repo_name = self._resolve_repo_name(name)
if repo_name is None:
click.secho(f"Template '{name}' not found.", fg="red")
click.echo("Run 'crewai template list' to see available templates.")
raise SystemExit(1)
self._install_repo(repo_name, output_dir)
def _install_repo(self, repo_name: str, output_dir: str | None = None) -> None:
"""Download and extract a template repo into the current directory.
Args:
repo_name: Full GitHub repo name (e.g. template_deep_research).
output_dir: Optional directory name. Defaults to the template name.
"""
folder_name = output_dir or repo_name.removeprefix(TEMPLATE_PREFIX)
dest = os.path.join(os.getcwd(), folder_name)
while os.path.exists(dest):
click.secho(f"Directory '{folder_name}' already exists.", fg="yellow")
folder_name = click.prompt(
"Enter a different directory name (or 'q' to quit)", type=str
)
if folder_name.lower() == "q":
return
dest = os.path.join(os.getcwd(), folder_name)
click.echo(
f"Downloading template '{repo_name.removeprefix(TEMPLATE_PREFIX)}'..."
)
zip_bytes = self._download_zip(repo_name)
self._extract_zip(zip_bytes, dest)
self._telemetry.template_installed_span(repo_name.removeprefix(TEMPLATE_PREFIX))
console.print(
f"\n [green]\u2713[/green] Installed template [bold white]{folder_name}[/bold white]"
f" [dim](source: github.com/{GITHUB_ORG}/{repo_name})[/dim]\n"
)
next_steps = Text()
next_steps.append(f" cd {folder_name}\n", style="bold white")
next_steps.append(" crewai install", style="bold white")
panel = Panel(
next_steps,
title="[green]\u25c7 Next steps[/green]",
title_align="left",
border_style="dim",
padding=(1, 2),
)
console.print(panel)
def _fetch_templates(self) -> list[dict[str, Any]]:
"""Fetch all template repos from the GitHub org."""
templates: list[dict[str, Any]] = []
page = 1
while True:
url = f"{GITHUB_API_BASE}/orgs/{GITHUB_ORG}/repos"
params: dict[str, str | int] = {
"per_page": 100,
"page": page,
"type": "public",
}
try:
response = httpx.get(url, params=params, timeout=15)
response.raise_for_status()
except httpx.HTTPError as e:
click.secho(f"Failed to fetch templates from GitHub: {e}", fg="red")
raise SystemExit(1) from e
repos = response.json()
if not repos:
break
templates.extend(
repo
for repo in repos
if repo["name"].startswith(TEMPLATE_PREFIX) and not repo.get("private")
)
page += 1
templates.sort(key=lambda r: r["name"])
return templates
def _resolve_repo_name(self, name: str) -> str | None:
"""Resolve user input to a full repo name, or None if not found."""
# Accept both 'deep_research' and 'template_deep_research'
candidates = [
f"{TEMPLATE_PREFIX}{name}"
if not name.startswith(TEMPLATE_PREFIX)
else name,
name,
]
templates = self._fetch_templates()
template_names = {t["name"] for t in templates}
for candidate in candidates:
if candidate in template_names:
return candidate
return None
def _download_zip(self, repo_name: str) -> bytes:
"""Download the default branch zipball for a repo."""
url = f"{GITHUB_API_BASE}/repos/{GITHUB_ORG}/{repo_name}/zipball"
try:
response = httpx.get(url, follow_redirects=True, timeout=60)
response.raise_for_status()
except httpx.HTTPError as e:
click.secho(f"Failed to download template: {e}", fg="red")
raise SystemExit(1) from e
return response.content
def _extract_zip(self, zip_bytes: bytes, dest: str) -> None:
"""Extract a GitHub zipball into dest, stripping the top-level directory."""
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
# GitHub zipballs have a single top-level dir like 'crewAIInc-template_xxx-<sha>/'
members = zf.namelist()
if not members:
click.secho("Downloaded archive is empty.", fg="red")
raise SystemExit(1)
top_dir = members[0].split("/")[0] + "/"
os.makedirs(dest, exist_ok=True)
for member in members:
if member == top_dir or not member.startswith(top_dir):
continue
relative_path = member[len(top_dir) :]
if not relative_path:
continue
target = os.path.realpath(os.path.join(dest, relative_path))
if not target.startswith(
os.path.realpath(dest) + os.sep
) and target != os.path.realpath(dest):
continue
if member.endswith("/"):
os.makedirs(target, exist_ok=True)
else:
os.makedirs(os.path.dirname(target), exist_ok=True)
with zf.open(member) as src, open(target, "wb") as dst:
shutil.copyfileobj(src, dst)

View File

@@ -1,19 +1,28 @@
import subprocess
import click
from crewai.utilities.constants import CREWAI_TRAINED_AGENTS_FILE_ENV
from crewai_cli.utils import build_env_with_all_tool_credentials
def replay_task_command(task_id: str) -> None:
"""
Replay the crew execution from a specific task.
def replay_task_command(task_id: str, trained_agents_file: str | None = None) -> None:
"""Replay the crew execution from a specific task.
Args:
task_id (str): The ID of the task to replay from.
task_id: The ID of the task to replay from.
trained_agents_file: Optional trained-agents pickle path forwarded to
the subprocess via the ``CREWAI_TRAINED_AGENTS_FILE`` env var.
"""
command = ["uv", "run", "replay", task_id]
env = build_env_with_all_tool_credentials()
if trained_agents_file:
env[CREWAI_TRAINED_AGENTS_FILE_ENV] = trained_agents_file
try:
result = subprocess.run(command, capture_output=False, text=True, check=True) # noqa: S603
result = subprocess.run( # noqa: S603
command, capture_output=False, text=True, check=True, env=env
)
if result.stderr:
click.echo(result.stderr, err=True)

View File

@@ -1,11 +1,11 @@
from enum import Enum
import os
import subprocess
import click
from crewai.utilities.constants import CREWAI_TRAINED_AGENTS_FILE_ENV
from packaging import version
from crewai_cli.utils import build_env_with_tool_repository_credentials, read_toml
from crewai_cli.utils import build_env_with_all_tool_credentials, read_toml
from crewai_cli.version import get_crewai_version
@@ -14,13 +14,18 @@ class CrewType(Enum):
FLOW = "flow"
def run_crew() -> None:
"""
Run the crew or flow by running a command in the UV environment.
def run_crew(trained_agents_file: str | None = None) -> None:
"""Run the crew or flow by running a command in the UV environment.
Starting from version 0.103.0, this command can be used to run both
standard crews and flows. For flows, it detects the type from pyproject.toml
and automatically runs the appropriate command.
Args:
trained_agents_file: Optional path to a trained-agents pickle produced
by ``crewai train -f``. When set, exported as
``CREWAI_TRAINED_AGENTS_FILE`` so agents load suggestions from this
file instead of the default ``trained_agents_data.pkl``.
"""
crewai_version = get_crewai_version()
min_required_version = "0.71.0"
@@ -44,31 +49,24 @@ def run_crew() -> None:
click.echo(f"Running the {'Flow' if is_flow else 'Crew'}")
# Execute the appropriate command
execute_command(crew_type)
execute_command(crew_type, trained_agents_file=trained_agents_file)
def execute_command(crew_type: CrewType) -> None:
"""
Execute the appropriate command based on crew type.
def execute_command(
crew_type: CrewType, trained_agents_file: str | None = None
) -> None:
"""Execute the appropriate command based on crew type.
Args:
crew_type: The type of crew to run
crew_type: The type of crew to run.
trained_agents_file: Optional trained-agents pickle path forwarded to
the subprocess via the ``CREWAI_TRAINED_AGENTS_FILE`` env var.
"""
command = ["uv", "run", "kickoff" if crew_type == CrewType.FLOW else "run_crew"]
env = os.environ.copy()
try:
pyproject_data = read_toml()
sources = pyproject_data.get("tool", {}).get("uv", {}).get("sources", {})
for source_config in sources.values():
if isinstance(source_config, dict):
index = source_config.get("index")
if index:
index_env = build_env_with_tool_repository_credentials(index)
env.update(index_env)
except Exception: # noqa: S110
pass
env = build_env_with_all_tool_credentials()
if trained_agents_file:
env[CREWAI_TRAINED_AGENTS_FILE_ENV] = trained_agents_file
try:
subprocess.run(command, capture_output=False, text=True, check=True, env=env) # noqa: S603

View File

@@ -91,7 +91,7 @@ class SettingsCommand(BaseCommand):
style="bold red",
)
console.print("Available keys:", style="yellow")
for field_name in Settings.model_fields.keys():
for field_name in Settings.model_fields:
if field_name not in readonly_settings:
console.print(f" - {field_name}", style="yellow")
raise SystemExit(1)

View File

@@ -120,11 +120,11 @@ my_crew/
my_flow/
├── src/my_flow/
│ ├── crews/ # Multiple crew definitions
│ │ └── poem_crew/
│ │ └── content_crew/
│ │ ├── config/
│ │ │ ├── agents.yaml
│ │ │ └── tasks.yaml
│ │ └── poem_crew.py
│ │ └── content_crew.py
│ ├── tools/ # Custom tools
│ ├── main.py # Flow orchestration
│ └── ...
@@ -774,7 +774,7 @@ def calculator(expression: str) -> str:
```
### Built-in Tools (install with `uv add crewai-tools`)
Web/Search: SerperDevTool, ScrapeWebsiteTool, WebsiteSearchTool, EXASearchTool, FirecrawlSearchTool
Web/Search: SerperDevTool, ScrapeWebsiteTool, WebsiteSearchTool, ExaSearchTool, FirecrawlSearchTool
Documents: FileReadTool, DirectoryReadTool, PDFSearchTool, DOCXSearchTool, CSVSearchTool, JSONSearchTool, XMLSearchTool, MDXSearchTool
Code: CodeInterpreterTool, CodeDocsSearchTool, GithubSearchTool
Media: DALL-E Tool, YoutubeChannelSearchTool, YoutubeVideoSearchTool

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.2rc2"
"crewai[tools]==1.14.5a2"
]
[project.scripts]

View File

@@ -38,7 +38,7 @@ crewai run
This command initializes the {{name}} Flow as defined in your configuration.
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.
This example, unmodified, will run a content creation flow on AI Agents and save the output to `output/post.md`.
## Understanding Your Crew

View File

@@ -0,0 +1,33 @@
planner:
role: >
Content Planner
goal: >
Plan a detailed and engaging blog post outline on {topic}
backstory: >
You're an experienced content strategist who excels at creating
structured outlines for blog posts. You know how to organize ideas
into a logical flow that keeps readers engaged from start to finish.
writer:
role: >
Content Writer
goal: >
Write a compelling and well-structured blog post on {topic}
based on the provided outline
backstory: >
You're a skilled writer with a talent for turning outlines into
engaging, informative blog posts. Your writing is clear, conversational,
and backed by solid reasoning. You adapt your tone to the subject matter
while keeping things accessible to a broad audience.
editor:
role: >
Content Editor
goal: >
Review and polish the blog post on {topic} to ensure it is
publication-ready
backstory: >
You're a meticulous editor with years of experience refining written
content. You have an eye for clarity, flow, grammar, and consistency.
You improve prose without changing the author's voice and ensure every
piece you touch is polished and professional.

View File

@@ -0,0 +1,50 @@
planning_task:
description: >
Create a detailed outline for a blog post about {topic}.
The outline should include:
- A compelling title
- An introduction hook
- 3-5 main sections with key points to cover in each
- A conclusion with a call to action
Make the outline detailed enough that a writer can produce
a full blog post from it without additional research.
expected_output: >
A structured blog post outline with a title, introduction notes,
detailed section breakdowns, and conclusion notes.
agent: planner
writing_task:
description: >
Using the outline provided, write a full blog post about {topic}.
Requirements:
- Follow the outline structure closely
- Write in a clear, engaging, and conversational tone
- Each section should be 2-3 paragraphs
- Include a strong introduction and conclusion
- Target around 800-1200 words
expected_output: >
A complete blog post in markdown format, ready for editing.
The post should follow the outline and be well-written with
clear transitions between sections.
agent: writer
editing_task:
description: >
Review and edit the blog post about {topic}.
Focus on:
- Fixing any grammar or spelling errors
- Improving sentence clarity and flow
- Ensuring consistent tone throughout
- Strengthening the introduction and conclusion
- Removing any redundancy
Do not rewrite the post — refine and polish it.
expected_output: >
The final, polished blog post in markdown format without '```'.
Publication-ready with clean formatting and professional prose.
agent: editor
output_file: output/post.md

View File

@@ -8,8 +8,8 @@ from crewai.project import CrewBase, agent, crew, task
@CrewBase
class PoemCrew:
"""Poem Crew"""
class ContentCrew:
"""Content Crew"""
agents: list[BaseAgent]
tasks: list[Task]
@@ -20,26 +20,50 @@ class PoemCrew:
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
# If you would lik to add tools to your crew, you can learn more about it here:
# If you would like to add tools to your crew, you can learn more about it here:
# https://docs.crewai.com/concepts/agents#agent-tools
@agent
def poem_writer(self) -> Agent:
def planner(self) -> Agent:
return Agent(
config=self.agents_config["poem_writer"], # type: ignore[index]
config=self.agents_config["planner"], # type: ignore[index]
)
@agent
def writer(self) -> Agent:
return Agent(
config=self.agents_config["writer"], # type: ignore[index]
)
@agent
def editor(self) -> Agent:
return Agent(
config=self.agents_config["editor"], # type: ignore[index]
)
# To learn more about structured task outputs,
# task dependencies, and task callbacks, check out the documentation:
# https://docs.crewai.com/concepts/tasks#overview-of-a-task
@task
def write_poem(self) -> Task:
def planning_task(self) -> Task:
return Task(
config=self.tasks_config["write_poem"], # type: ignore[index]
config=self.tasks_config["planning_task"], # type: ignore[index]
)
@task
def writing_task(self) -> Task:
return Task(
config=self.tasks_config["writing_task"], # type: ignore[index]
)
@task
def editing_task(self) -> Task:
return Task(
config=self.tasks_config["editing_task"], # type: ignore[index]
)
@crew
def crew(self) -> Crew:
"""Creates the Research Crew"""
"""Creates the Content Crew"""
# To learn how to add knowledge sources to your crew, check out the documentation:
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge

View File

@@ -1 +0,0 @@
"""Poem crew template."""

View File

@@ -1,11 +0,0 @@
poem_writer:
role: >
CrewAI Poem Writer
goal: >
Generate a funny, light heartedpoem about how CrewAI
is awesome with a sentence count of {sentence_count}
backstory: >
You're a creative poet with a talent for capturing the essence of any topic
in a beautiful and engaging way. Known for your ability to craft poems that
resonate with readers, you bring a unique perspective and artistic flair to
every piece you write.

View File

@@ -1,7 +0,0 @@
write_poem:
description: >
Write a poem about how CrewAI is awesome.
Ensure the poem is engaging and adheres to the specified sentence count of {sentence_count}.
expected_output: >
A beautifully crafted poem about CrewAI, with exactly {sentence_count} sentences.
agent: poem_writer

View File

@@ -1,59 +1,64 @@
#!/usr/bin/env python
from random import randint
from pathlib import Path
from pydantic import BaseModel
from crewai.flow import Flow, listen, start
from {{folder_name}}.crews.poem_crew.poem_crew import PoemCrew
from {{folder_name}}.crews.content_crew.content_crew import ContentCrew
class PoemState(BaseModel):
sentence_count: int = 1
poem: str = ""
class ContentState(BaseModel):
topic: str = ""
outline: str = ""
draft: str = ""
final_post: str = ""
class PoemFlow(Flow[PoemState]):
class ContentFlow(Flow[ContentState]):
@start()
def generate_sentence_count(self, crewai_trigger_payload: dict = None):
print("Generating sentence count")
def plan_content(self, crewai_trigger_payload: dict = None):
print("Planning content")
# Use trigger payload if available
if crewai_trigger_payload:
# Example: use trigger data to influence sentence count
self.state.sentence_count = crewai_trigger_payload.get('sentence_count', randint(1, 5))
self.state.topic = crewai_trigger_payload.get("topic", "AI Agents")
print(f"Using trigger payload: {crewai_trigger_payload}")
else:
self.state.sentence_count = randint(1, 5)
self.state.topic = "AI Agents"
@listen(generate_sentence_count)
def generate_poem(self):
print("Generating poem")
print(f"Topic: {self.state.topic}")
@listen(plan_content)
def generate_content(self):
print(f"Generating content on: {self.state.topic}")
result = (
PoemCrew()
ContentCrew()
.crew()
.kickoff(inputs={"sentence_count": self.state.sentence_count})
.kickoff(inputs={"topic": self.state.topic})
)
print("Poem generated", result.raw)
self.state.poem = result.raw
print("Content generated")
self.state.final_post = result.raw
@listen(generate_poem)
def save_poem(self):
print("Saving poem")
with open("poem.txt", "w") as f:
f.write(self.state.poem)
@listen(generate_content)
def save_content(self):
print("Saving content")
output_dir = Path("output")
output_dir.mkdir(exist_ok=True)
with open(output_dir / "post.md", "w") as f:
f.write(self.state.final_post)
print("Post saved to output/post.md")
def kickoff():
poem_flow = PoemFlow()
poem_flow.kickoff()
content_flow = ContentFlow()
content_flow.kickoff()
def plot():
poem_flow = PoemFlow()
poem_flow.plot()
content_flow = ContentFlow()
content_flow.plot()
def run_with_trigger():
@@ -74,10 +79,10 @@ def run_with_trigger():
# Create flow and kickoff with trigger payload
# The @start() methods will automatically receive crewai_trigger_payload parameter
poem_flow = PoemFlow()
content_flow = ContentFlow()
try:
result = poem_flow.kickoff({"crewai_trigger_payload": trigger_payload})
result = content_flow.kickoff({"crewai_trigger_payload": trigger_payload})
return result
except Exception as e:
raise Exception(f"An error occurred while running the flow with trigger: {e}")

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.2rc2"
"crewai[tools]==1.14.5a2"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.2rc2"
"crewai[tools]==1.14.5a2"
]
[tool.crewai]

View File

@@ -9,6 +9,10 @@ from typing import Any
import click
from crewai.events.listeners.tracing.utils import get_user_id
from crewai.utilities.project_utils import (
extract_available_exports,
extract_tools_metadata,
)
from rich.console import Console
from crewai_cli import git
@@ -17,10 +21,10 @@ from crewai_cli.config import Settings
from crewai_cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
from crewai_cli.utils import (
build_env_with_tool_repository_credentials,
extract_available_exports,
get_project_description,
get_project_name,
get_project_version,
read_toml,
tree_copy,
tree_find_and_replace,
)
@@ -101,13 +105,40 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
console.print(
f"[green]Found these tools to publish: {', '.join([e['name'] for e in available_exports])}[/green]"
)
console.print("[bold blue]Extracting tool metadata...[/bold blue]")
try:
tools_metadata = extract_tools_metadata()
except Exception as e:
console.print(
f"[yellow]Warning: Could not extract tool metadata: {e}[/yellow]\n"
f"Publishing will continue without detailed metadata."
)
tools_metadata = []
self._print_tools_preview(tools_metadata)
self._print_current_organization()
build_env = os.environ.copy()
try:
pyproject_data = read_toml()
sources = pyproject_data.get("tool", {}).get("uv", {}).get("sources", {})
for source_config in sources.values():
if isinstance(source_config, dict):
index = source_config.get("index")
if index:
index_env = build_env_with_tool_repository_credentials(index)
build_env.update(index_env)
except Exception: # noqa: S110
pass
with tempfile.TemporaryDirectory() as temp_build_dir:
subprocess.run( # noqa: S603
["uv", "build", "--sdist", "--out-dir", temp_build_dir], # noqa: S607
check=True,
capture_output=False,
env=build_env,
)
tarball_filename = next(
@@ -118,7 +149,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
"Project build failed. Please ensure that the command `uv build --sdist` completes successfully.",
style="bold red",
)
raise SystemExit
raise SystemExit(1)
tarball_path = os.path.join(temp_build_dir, tarball_filename)
with open(tarball_path, "rb") as file:
@@ -134,6 +165,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
description=project_description,
encoded_file=f"data:application/x-gzip;base64,{encoded_tarball}",
available_exports=available_exports,
tools_metadata=tools_metadata,
)
self._validate_response(publish_response)
@@ -246,6 +278,55 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
)
raise SystemExit
def _print_tools_preview(self, tools_metadata: list[dict[str, Any]]) -> None:
if not tools_metadata:
console.print("[yellow]No tool metadata extracted.[/yellow]")
return
console.print(
f"\n[bold]Tools to be published ({len(tools_metadata)}):[/bold]\n"
)
for tool in tools_metadata:
console.print(f" [bold cyan]{tool.get('name', 'Unknown')}[/bold cyan]")
if tool.get("module"):
console.print(f" Module: {tool.get('module')}")
console.print(f" Name: {tool.get('humanized_name', 'N/A')}")
console.print(
f" Description: {tool.get('description', 'N/A')[:80]}{'...' if len(tool.get('description', '')) > 80 else ''}"
)
init_params = tool.get("init_params_schema", {}).get("properties", {})
if init_params:
required = tool.get("init_params_schema", {}).get("required", [])
console.print(" Init parameters:")
for param_name, param_info in init_params.items():
param_type = param_info.get("type", "any")
is_required = param_name in required
req_marker = "[red]*[/red]" if is_required else ""
default = (
f" = {param_info['default']}" if "default" in param_info else ""
)
console.print(
f" - {param_name}: {param_type}{default} {req_marker}"
)
env_vars = tool.get("env_vars", [])
if env_vars:
console.print(" Environment variables:")
for env_var in env_vars:
req_marker = "[red]*[/red]" if env_var.get("required") else ""
default = (
f" (default: {env_var['default']})"
if env_var.get("default")
else ""
)
console.print(
f" - {env_var['name']}: {env_var.get('description', 'N/A')}{default} {req_marker}"
)
console.print()
def _print_current_organization(self) -> None:
settings = Settings()
if settings.org_uuid:

View File

@@ -16,7 +16,7 @@ class TriggersCommand(BaseCommand, PlusAPIMixin):
A class to handle trigger-related operations for CrewAI projects.
"""
def __init__(self):
def __init__(self) -> None:
BaseCommand.__init__(self)
PlusAPIMixin.__init__(self)

View File

@@ -1,5 +1,6 @@
import os
import shutil
from typing import Any
import tomli_w
@@ -11,7 +12,7 @@ def update_crew() -> None:
migrate_pyproject("pyproject.toml", "pyproject.toml")
def migrate_pyproject(input_file, output_file):
def migrate_pyproject(input_file: str, output_file: str) -> None:
"""
Migrate the pyproject.toml to the new format.
@@ -23,8 +24,7 @@ def migrate_pyproject(input_file, output_file):
# Read the input pyproject.toml
pyproject_data = read_toml()
# Initialize the new project structure
new_pyproject = {
new_pyproject: dict[str, Any] = {
"project": {},
"build-system": {"requires": ["hatchling"], "build-backend": "hatchling.build"},
}

View File

@@ -367,3 +367,29 @@ def build_env_with_tool_repository_credentials(
)
return env
def build_env_with_all_tool_credentials() -> dict[str, Any]:
"""Build environment dict with credentials for all tool repository indexes.
Reads ``[tool.uv.sources]`` from ``pyproject.toml`` and merges credentials
for each private index into a copy of the current environment.
Returns:
Environment variables with credentials for all private indexes.
"""
env = os.environ.copy()
try:
pyproject_data = read_toml()
sources = pyproject_data.get("tool", {}).get("uv", {}).get("sources", {})
for source_config in sources.values():
if isinstance(source_config, dict):
index = source_config.get("index")
if index:
index_env = build_env_with_tool_repository_credentials(index)
env.update(index_env)
except Exception: # noqa: S110
pass
return env

View File

@@ -3,7 +3,6 @@
from collections.abc import Mapping
from datetime import datetime, timedelta
from functools import lru_cache
import importlib.metadata
import json
from pathlib import Path
from typing import Any
@@ -11,6 +10,7 @@ from urllib import request
from urllib.error import URLError
import appdirs
from crewai.utilities.version import get_crewai_version
from packaging.version import InvalidVersion, Version, parse
@@ -25,11 +25,6 @@ def _get_cache_file() -> Path:
return cache_dir / "version_cache.json"
def get_crewai_version() -> str:
"""Get the version number of CrewAI running the CLI."""
return importlib.metadata.version("crewai")
def _is_cache_valid(cache_data: Mapping[str, Any]) -> bool:
"""Check if the cache is still valid, less than 24 hours old."""
if "timestamp" not in cache_data: