Merge branch 'main' into fix-skill-archive-symlink-traversal

This commit is contained in:
Rip&Tear
2026-06-24 13:37:52 +08:00
committed by GitHub
15900 changed files with 3246894 additions and 18755 deletions

View File

@@ -8,7 +8,7 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7",
"crewai-core==1.14.8a3",
"click>=8.1.7,<9",
"pydantic>=2.11.9,<2.13",
"pydantic-settings~=2.10.1",

View File

@@ -1 +1 @@
__version__ = "1.14.7"
__version__ = "1.14.8a3"

View File

@@ -40,14 +40,6 @@ def replay_task_command(*args: Any, **kwargs: Any) -> Any:
return _replay_task_command(*args, **kwargs)
def run_flow_definition(*args: Any, **kwargs: Any) -> Any:
from crewai_cli.run_flow_definition import (
run_flow_definition as _run_flow_definition,
)
return _run_flow_definition(*args, **kwargs)
def run_crew(*args: Any, **kwargs: Any) -> Any:
from crewai_cli.run_crew import run_crew as _run_crew
@@ -155,12 +147,18 @@ def uv(uv_args: tuple[str, ...]) -> None:
is_flag=True,
help="Use classic Python/YAML project structure instead of JSON",
)
@click.option(
"--declarative",
is_flag=True,
help="Create a declarative Flow project instead of a Python Flow project",
)
def create(
type: str | None,
name: str | None,
provider: str | None,
skip_provider: bool = False,
classic: bool = False,
declarative: bool = False,
) -> None:
"""Create a new crew, or flow."""
dmn_mode = is_dmn_mode_enabled()
@@ -194,6 +192,8 @@ def create(
if dmn_mode:
skip_provider = True
if type == "crew":
if declarative:
raise click.UsageError("--declarative can only be used with flow projects")
if classic:
from crewai_cli.create_crew import create_crew
@@ -205,7 +205,7 @@ def create(
elif type == "flow":
from crewai_cli.create_flow import create_flow
create_flow(name)
create_flow(name, declarative=declarative)
else:
click.secho("Error: Invalid type. Must be 'crew' or 'flow'.", fg="red")
@@ -468,7 +468,7 @@ def memory(
type=str,
default=None,
help=(
"Path to a trained-agents pickle (produced by `crewai train -f`). "
"Crew-only: path to a trained-agents pickle (produced by `crewai train -f`). "
"When set, agents load suggestions from this file instead of the "
"default trained_agents_data.pkl. Equivalent to setting "
"CREWAI_TRAINED_AGENTS_FILE."
@@ -512,16 +512,13 @@ def install(context: click.Context) -> None:
"--definition",
type=str,
default=None,
help=(
"Experimental: path to a Flow Definition YAML/JSON file, "
"or an inline YAML/JSON string."
),
help="Flow-only: path to a declarative flow definition.",
)
@click.option(
"--inputs",
type=str,
default=None,
help='Experimental: JSON object passed to flow.kickoff(), e.g. \'{"topic":"AI"}\'.',
help='Flow-only: JSON object passed to the declarative flow, e.g. \'{"topic":"AI"}\'.',
)
def run(
trained_agents_file: str | None,
@@ -531,16 +528,14 @@ def run(
"""Run the Crew or Flow."""
if inputs is not None and definition is None:
raise click.UsageError("--inputs requires --definition")
if trained_agents_file is not None and definition is not None:
raise click.UsageError("--filename can only be used when running crews")
if definition is not None:
click.secho(
"Warning: `crewai run --definition` is experimental and may change without notice.",
fg="yellow",
)
run_flow_definition(definition=definition, inputs=inputs)
return
run_crew(trained_agents_file=trained_agents_file)
run_crew(
trained_agents_file=trained_agents_file,
definition=definition,
inputs=inputs,
)
@crewai.command()
@@ -795,10 +790,11 @@ def flow() -> None:
@flow.command(name="kickoff")
def flow_run() -> None:
"""Kickoff the Flow."""
from crewai_cli.kickoff_flow import kickoff_flow
click.echo("Running the Flow")
kickoff_flow()
click.secho(
"The command 'crewai flow kickoff' is deprecated. Use 'crewai run' instead.",
fg="yellow",
)
run_crew(trained_agents_file=None, definition=None, inputs=None)
@flow.command(name="plot")

View File

@@ -5,7 +5,10 @@ import click
from crewai_core.telemetry import Telemetry
def create_flow(name: str) -> None:
DECLARATIVE_FLOW_FOLDERS = ("crews", "tools", "knowledge", "skills")
def create_flow(name: str, *, declarative: bool = False) -> None:
"""Create a new flow."""
folder_name = name.replace(" ", "_").replace("-", "_").lower()
class_name = name.replace("_", " ").replace("-", " ").title().replace(" ", "")
@@ -20,6 +23,17 @@ def create_flow(name: str) -> None:
telemetry = Telemetry()
telemetry.flow_creation_span(class_name)
if declarative:
_create_declarative_flow(name, class_name, folder_name, project_root)
else:
_create_python_flow(name, class_name, folder_name, project_root)
click.secho(f"Flow {name} created successfully!", fg="green", bold=True)
def _create_python_flow(
name: str, class_name: str, folder_name: str, project_root: Path
) -> None:
(project_root / "src" / folder_name).mkdir(parents=True)
(project_root / "src" / folder_name / "crews").mkdir(parents=True)
(project_root / "src" / folder_name / "tools").mkdir(parents=True)
@@ -92,4 +106,41 @@ def create_flow(name: str) -> None:
fg="yellow",
)
click.secho(f"Flow {name} created successfully!", fg="green", bold=True)
def _create_declarative_flow(
name: str, class_name: str, folder_name: str, project_root: Path
) -> None:
project_root.mkdir(parents=True)
package_root = project_root / "src" / folder_name
package_root.mkdir(parents=True)
for folder in DECLARATIVE_FLOW_FOLDERS:
(package_root / folder).mkdir()
package_dir = Path(__file__).parent
templates_dir = package_dir / "templates" / "declarative_flow"
agents_md_src = package_dir / "templates" / "AGENTS.md"
if agents_md_src.exists():
shutil.copy2(agents_md_src, project_root / "AGENTS.md")
for src_file in templates_dir.rglob("*"):
if not src_file.is_file():
continue
relative_path = src_file.relative_to(templates_dir)
dst_file = (
project_root / relative_path
if relative_path.name in {".gitignore", "README.md", "pyproject.toml"}
else package_root / relative_path
)
dst_file.parent.mkdir(parents=True, exist_ok=True)
content = src_file.read_text(encoding="utf-8")
content = content.replace("{{name}}", name)
content = content.replace("{{flow_name}}", class_name)
content = content.replace("{{folder_name}}", folder_name)
dst_file.write_text(content, encoding="utf-8")
(project_root / ".env").write_text("OPENAI_API_KEY=YOUR_API_KEY", encoding="utf-8")
(package_root / "__init__.py").write_text("", encoding="utf-8")
for folder in DECLARATIVE_FLOW_FOLDERS:
(package_root / folder / ".gitkeep").write_text("", encoding="utf-8")

View File

@@ -89,13 +89,16 @@ description = "{name} using crewAI"
authors = [{{ name = "Your Name", email = "you@example.com" }}]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=1.14.7"
"crewai[tools]==1.14.8a1"
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
only-include = ["agents", "crew.jsonc", "tools", "knowledge", "skills"]
[tool.crewai]
type = "crew"
"""
@@ -677,7 +680,7 @@ def _default_agents_and_tasks(
]
crew_settings = {
"process": "sequential",
"memory": False,
"memory": True,
"inputs": {},
}
return agents, tasks, crew_settings

View File

@@ -17,7 +17,7 @@ from textual.binding import Binding, BindingType
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.css.query import NoMatches
from textual.screen import ModalScreen
from textual.widgets import Button, Footer, Header, Static
from textual.widgets import Button, Footer, Header, Input, Static
_SPINNER = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
@@ -34,6 +34,25 @@ _C_MUTED = "#666666" # dimmer than _C_DIM for past timeline
_STEP_NUMBER_RE = re.compile(r"\bstep\s+(\d+)\b", re.IGNORECASE)
_REFINEMENT_RE = re.compile(r"^\s*step\s+(\d+)\s*:\s*(.+)\s*$", re.IGNORECASE)
_INTERNAL_TOOL_NAMES = {"create_reasoning_plan"}
_LOG_ARGS_TEXT_LIMIT = 3_000
_LOG_RESULT_TEXT_LIMIT = 5_000
_LOG_TRUNCATION_SUFFIX = "... [truncated]"
# Background memory saves can emit their start event just after kickoff returns.
_MEMORY_SAVE_DRAIN_GRACE_SECONDS = 2.0
def _is_save_to_memory_tool(tool_name: str | None) -> bool:
return (tool_name or "").replace(" ", "_").lower() == "save_to_memory"
def _truncate_log_text(value: Any, limit: int) -> str | None:
if value is None:
return None
text = str(value)
if len(text) <= limit:
return text
suffix = _LOG_TRUNCATION_SUFFIX
return f"{text[: max(0, limit - len(suffix))]}{suffix}"
def _enable_tracing_in_dotenv() -> None:
@@ -363,6 +382,18 @@ Screen {
height: auto;
}
#conversation-input {
display: none;
height: 3;
border-top: hkey #333333;
background: #1c1c1c;
color: #e0e0e0;
}
#conversation-input:focus {
border-top: hkey #1F7982;
}
Header {
background: #1c1c1c;
color: #FF5A50;
@@ -464,6 +495,7 @@ FooterKey .footer-key--key {
total_tasks: int = 0,
agent_names: list[str] | None = None,
task_names: list[str] | None = None,
conversational: bool = False,
):
super().__init__()
self.title = f"CrewAI — {crew_name}"
@@ -519,10 +551,19 @@ FooterKey .footer-key--key {
self._log_expanded: set[int] = set()
self._log_scroll_needed: bool = False
self._log_line_map: list[tuple[int, int, int]] = []
self._suppressed_memory_save_event_ids: set[str] = set()
self._memory_save_drain_timer: Any = None
self._event_handlers: list[tuple[type, Any]] = []
self._crew: Any = None
self._flow: Any = None
self._is_conversational = conversational
self._conversation_messages: list[tuple[str, str]] = []
self._conversation_turns = 0
self._conversation_turn_in_progress = False
self._conversation_previous_defer_trace_finalization: bool | None = None
self._conversation_exit_commands = {"exit", "quit"}
self._default_inputs: dict[str, Any] | None = None
self._crew_result: Any = None
self._crew_json_path: Any = None
@@ -545,6 +586,10 @@ FooterKey .footer-key--key {
yield Static(id="task-header")
with VerticalScroll(id="scroll-area"):
yield Static(id="main-content")
yield Input(
placeholder="Message the flow...",
id="conversation-input",
)
with VerticalScroll(id="log-panel"):
yield Static(id="log-content")
yield Footer()
@@ -553,7 +598,9 @@ FooterKey .footer-key--key {
self._start_time = time.time()
self._subscribe()
self._tick_timer = self.set_interval(1 / 8, self._tick)
if self._crew:
if self._is_conversational and self._flow:
self._start_conversational_session()
elif self._crew:
self._run_crew_worker()
elif self._crew_json_path:
self._load_and_run_worker()
@@ -633,7 +680,6 @@ FooterKey .footer-key--key {
self.call_from_thread(self._on_crew_failed, str(e))
def _on_crew_done(self, output: str | None) -> None:
self._unsubscribe()
with self._lock:
self._status = "completed"
self._final_output = output
@@ -649,6 +695,8 @@ FooterKey .footer-key--key {
now = time.time()
for entry in self._log_entries:
if entry["status"] == "running":
if entry["tool_name"] == "memory_save":
continue
entry["status"] = "timeout"
entry["error"] = "No result received before crew completed"
entry["duration"] = now - entry["start_time"]
@@ -680,9 +728,9 @@ FooterKey .footer-key--key {
self.call_later(self._focus_activity_log)
self._tick_timer.stop()
self._tick_timer = self.set_interval(1 / 2, self._tick)
self._unsubscribe_if_no_running_memory_save(wait_for_queued=True)
def _on_crew_failed(self, error: str) -> None:
self._unsubscribe()
with self._lock:
self._status = "failed"
self._error = error
@@ -692,12 +740,150 @@ FooterKey .footer-key--key {
now = time.time()
for entry in self._log_entries:
if entry["status"] == "running":
if entry["tool_name"] == "memory_save":
continue
entry["status"] = "error"
entry["error"] = "No result received before crew failed"
entry["duration"] = now - entry["start_time"]
self._tick()
self.call_later(self._focus_activity_log)
self._tick_timer.stop()
self._tick_timer = self.set_interval(1 / 2, self._tick)
self._unsubscribe_if_no_running_memory_save(wait_for_queued=True)
# ── Conversational flow execution ───────────────────────
def _start_conversational_session(self) -> None:
from crewai.events.listeners.tracing.utils import (
set_suppress_tracing_messages,
set_tui_mode,
)
set_tui_mode(True)
set_suppress_tracing_messages(True)
with self._lock:
self._status = "chatting"
self._current_step = None
self._elapsed_frozen = None
self._conversation_previous_defer_trace_finalization = getattr(
self._flow, "defer_trace_finalization", False
)
self._flow.defer_trace_finalization = True
try:
input_widget = self.query_one("#conversation-input", Input)
input_widget.display = True
input_widget.focus()
except Exception: # noqa: S110
pass
def _finalize_conversational_session(self) -> None:
if not (self._is_conversational and self._flow):
return
try:
self._flow.finalize_session_traces()
except Exception: # noqa: S110
pass
previous = self._conversation_previous_defer_trace_finalization
if previous is not None:
try:
self._flow.defer_trace_finalization = previous
except Exception: # noqa: S110
pass
def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id != "conversation-input":
return
if not self._is_conversational:
return
message = event.value.strip()
event.input.value = ""
if not message:
return
if message.lower() in self._conversation_exit_commands:
self._finalize_conversational_session()
self._unsubscribe()
self.exit(self._crew_result)
return
if self._conversation_turn_in_progress:
return
with self._lock:
self._conversation_messages.append(("user", message))
self._conversation_turn_in_progress = True
self._conversation_turns += 1
self._status = "working"
self._current_step = ("yellow", "Thinking…", "")
self._is_streaming = False
self._streaming_text = ""
self._task_full_output = ""
self._current_llm_text = ""
event.input.disabled = True
self._run_conversation_turn_worker(message)
@work(thread=True, exclusive=True, group="conversation")
def _run_conversation_turn_worker(self, message: str) -> None:
from crewai.events.listeners.tracing.utils import (
set_suppress_tracing_messages,
set_tui_mode,
)
set_tui_mode(True)
set_suppress_tracing_messages(True)
try:
result = self._flow.handle_turn(message)
if hasattr(result, "get_full_text") and hasattr(result, "result"):
for _chunk in result:
pass
result = result.result
self.call_from_thread(self._on_conversation_turn_done, result)
except Exception as e:
self.call_from_thread(self._on_conversation_turn_failed, str(e))
def _on_conversation_turn_done(self, result: Any) -> None:
with self._lock:
output = self._stringify_output(result)
self._conversation_messages.append(("assistant", output))
self._crew_result = result
self._conversation_turn_in_progress = False
self._status = "chatting"
self._is_streaming = False
self._streaming_text = ""
self._current_step = None
self._enable_conversation_input()
self._tick()
self._scroll_to_result()
def _on_conversation_turn_failed(self, error: str) -> None:
with self._lock:
self._status = "failed"
self._error = error
self._conversation_turn_in_progress = False
self._is_streaming = False
self._current_step = None
self._enable_conversation_input()
self._tick()
def _enable_conversation_input(self) -> None:
try:
input_widget = self.query_one("#conversation-input", Input)
input_widget.disabled = False
input_widget.focus()
except Exception: # noqa: S110
pass
def _stringify_output(self, result: Any) -> str:
raw_result = getattr(result, "raw", result)
if raw_result is None:
return ""
if isinstance(raw_result, str):
return raw_result
try:
return _json.dumps(raw_result, default=str, ensure_ascii=False)
except TypeError:
return str(raw_result)
# ── Actions ─────────────────────────────────────────────
@@ -757,6 +943,7 @@ FooterKey .footer-key--key {
self._refresh_log_panel()
async def action_quit(self) -> None:
self._finalize_conversational_session()
self._unsubscribe()
self.exit(self._crew_result)
@@ -932,6 +1119,30 @@ FooterKey .footer-key--key {
t = Text()
sidebar_width = 30
if self._is_conversational:
t.append(" CONVERSATION\n", style=f"bold {_C_PRIMARY}")
t.append("\n")
if self._conversation_turn_in_progress:
t.append(f" {self._spinner()} ", style=_C_PRIMARY)
t.append("Working\n", style=f"bold {_C_TEXT}")
elif self._status == "failed":
t.append(" ✘ Failed\n", style=_C_RED)
else:
t.append(" ● Ready\n", style=_C_GREEN)
t.append(f" Turns {self._conversation_turns}\n", style=_C_DIM)
t.append("\n")
t.append(" TOKENS\n", style=f"bold {_C_PRIMARY}")
t.append("\n")
out = self._output_tokens + self._live_out_tokens
t.append(f"{self._input_tokens:,}\n", style=_C_DIM)
t.append(f"{out:,}\n", style=_C_DIM)
t.append("\n")
t.append(" COMMANDS\n", style=f"bold {_C_PRIMARY}")
t.append("\n")
t.append(" quit / exit\n", style=_C_DIM)
widget.update(t)
return
t.append(" TASKS\n", style=f"bold {_C_PRIMARY}")
t.append("\n")
@@ -985,6 +1196,22 @@ FooterKey .footer-key--key {
widget = self.query_one("#task-header", Static)
t = Text()
if self._is_conversational:
if self._status == "failed":
t.append("", style=f"bold {_C_RED}")
t.append("Failed", style=f"bold {_C_RED}")
if self._error:
t.append(f"\n{self._error[:120]}", style=_C_RED)
elif self._conversation_turn_in_progress:
t.append(f"{self._spinner()} ", style=_C_PRIMARY)
t.append("Flow is responding", style=f"bold {_C_PRIMARY}")
else:
t.append("", style=f"bold {_C_GREEN}")
t.append("Conversational flow ready", style=f"bold {_C_GREEN}")
t.append(" Type a message below", style=_C_DIM)
widget.update(t)
return
if self._status == "completed":
elapsed = self._elapsed_frozen or (time.time() - self._start_time)
t.append("", style=f"bold {_C_GREEN}")
@@ -1036,6 +1263,41 @@ FooterKey .footer-key--key {
t = Text()
should_scroll = False
if self._is_conversational:
if not self._conversation_messages and not self._is_streaming:
t.append(" Start the conversation below.\n", style=_C_MUTED)
for role, content in self._conversation_messages:
if role == "user":
t.append("\n You\n", style=f"bold {_C_TEAL}")
else:
t.append("\n Assistant\n", style=f"bold {_C_PRIMARY}")
rendered = _format_json_in_text(_unescape_text(content))
for line in rendered.split("\n"):
style = _C_TEXT if role == "assistant" else _C_DIM
t.append(f" {line}\n", style=style)
if self._is_streaming and self._streaming_text:
text = _unescape_text(self._filtered_streaming_text())
if text.strip():
t.append("\n Assistant\n", style=f"bold {_C_PRIMARY}")
for line in text.rstrip().split("\n")[-40:]:
t.append(f" {line}\n", style=_C_TEXT)
should_scroll = True
if self._status == "failed" and self._error:
t.append("\n Error\n", style=f"bold {_C_RED}")
t.append(f" {self._error}\n", style=_C_RED)
widget.update(t)
if should_scroll:
try:
self.query_one("#scroll-area", VerticalScroll).scroll_end(
animate=False
)
except Exception: # noqa: S110
pass
return
# Plan section
if self._plan and self._plan.get("steps"):
plan_title = self._plan.get("plan", "Plan")
@@ -1514,6 +1776,53 @@ FooterKey .footer-key--key {
pass
self._event_handlers.clear()
def _has_running_memory_save_locked(self) -> bool:
return any(
entry["tool_name"] == "memory_save" and entry["status"] == "running"
for entry in self._log_entries
)
def _on_memory_save_drain_elapsed(self) -> None:
self._memory_save_drain_timer = None
self._unsubscribe_if_no_running_memory_save()
def _schedule_memory_save_drain_unsubscribe(self) -> bool:
loop = getattr(self, "_loop", None)
if loop is None:
return False
if getattr(self, "_thread_id", None) != threading.get_ident():
try:
loop.call_soon_threadsafe(self._schedule_memory_save_drain_unsubscribe)
except RuntimeError:
return False
return True
if self._memory_save_drain_timer is not None:
self._memory_save_drain_timer.stop()
self._memory_save_drain_timer = self.set_timer(
_MEMORY_SAVE_DRAIN_GRACE_SECONDS,
self._on_memory_save_drain_elapsed,
name="memory-save-drain",
)
return True
def _unsubscribe_if_no_running_memory_save(
self, *, wait_for_queued: bool = False
) -> None:
with self._lock:
should_unsubscribe = (
self._status
in {
"completed",
"failed",
}
and not self._has_running_memory_save_locked()
)
if should_unsubscribe:
if wait_for_queued and self._schedule_memory_save_drain_unsubscribe():
return
self._unsubscribe()
def _subscribe(self) -> None:
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.crew_events import CrewKickoffStartedEvent
@@ -1802,6 +2111,8 @@ FooterKey .footer-key--key {
entry["status"] == "running"
and entry["tool_name"] != event.tool_name
):
if entry["tool_name"] == "memory_save":
continue
entry["status"] = "timeout"
entry["error"] = (
"No result received before the next tool started"
@@ -1830,6 +2141,7 @@ FooterKey .footer-key--key {
"duration": None,
"task_idx": self._current_task_idx,
"plan_step_number": plan_step_number,
"event_id": event.event_id,
}
)
self._complete_step("teal", f"{event.tool_name}")
@@ -1923,8 +2235,178 @@ FooterKey .footer-key--key {
MemoryRetrievalCompletedEvent,
MemoryRetrievalFailedEvent,
MemoryRetrievalStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemorySaveStartedEvent,
)
def is_nested_save_to_memory_event(event: Any) -> bool:
if event.parent_event_id is None:
return False
state = crewai_event_bus.runtime_state
if state is None:
return False
parent_node = state.event_record.nodes.get(event.parent_event_id)
parent_event = getattr(parent_node, "event", None)
return getattr(
parent_event, "type", None
) == "tool_usage_started" and _is_save_to_memory_tool(
getattr(parent_event, "tool_name", None)
)
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_memory_save_started(source: Any, event: MemorySaveStartedEvent) -> None:
with self._lock:
if is_nested_save_to_memory_event(event):
self._suppressed_memory_save_event_ids.add(event.event_id)
return
for entry in reversed(self._log_entries):
if (
_is_save_to_memory_tool(entry["tool_name"])
and entry.get("event_id") == event.parent_event_id
):
self._suppressed_memory_save_event_ids.add(event.event_id)
return
for entry in reversed(self._log_entries):
if (
entry["tool_name"] == "memory_save"
and entry.get("started_event_id") == event.event_id
):
entry["args"] = _truncate_log_text(
event.value, _LOG_ARGS_TEXT_LIMIT
)
return
self._log_entries.append(
{
"tool_name": "memory_save",
"status": "running",
"args": _truncate_log_text(event.value, _LOG_ARGS_TEXT_LIMIT),
"result": None,
"error": None,
"start_time": time.time(),
"duration": None,
"task_idx": self._current_task_idx,
"event_id": event.event_id,
}
)
self._register_handler(MemorySaveStartedEvent, on_memory_save_started)
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_memory_save_completed(
source: Any, event: MemorySaveCompletedEvent
) -> None:
with self._lock:
if (
event.started_event_id in self._suppressed_memory_save_event_ids
or is_nested_save_to_memory_event(event)
):
if event.started_event_id is not None:
self._suppressed_memory_save_event_ids.discard(
event.started_event_id
)
else:
for entry in reversed(self._log_entries):
has_started_event_match = (
event.started_event_id is not None
and (
entry.get("event_id") == event.started_event_id
or entry.get("started_event_id")
== event.started_event_id
)
)
has_running_event_without_id = (
event.started_event_id is None
and entry["status"] == "running"
)
if entry["tool_name"] == "memory_save" and (
has_running_event_without_id or has_started_event_match
):
entry["status"] = "success"
entry["duration"] = event.save_time_ms / 1000
entry["result"] = _truncate_log_text(
event.value, _LOG_RESULT_TEXT_LIMIT
)
entry["error"] = None
entry["started_event_id"] = event.started_event_id
break
else:
self._log_entries.append(
{
"tool_name": "memory_save",
"status": "success",
"args": None,
"result": _truncate_log_text(
event.value, _LOG_RESULT_TEXT_LIMIT
),
"error": None,
"start_time": time.time(),
"duration": event.save_time_ms / 1000,
"task_idx": self._current_task_idx,
"started_event_id": event.started_event_id,
}
)
self._unsubscribe_if_no_running_memory_save(wait_for_queued=True)
self._register_handler(MemorySaveCompletedEvent, on_memory_save_completed)
@crewai_event_bus.on(MemorySaveFailedEvent)
def on_memory_save_failed(source: Any, event: MemorySaveFailedEvent) -> None:
with self._lock:
if (
event.started_event_id in self._suppressed_memory_save_event_ids
or is_nested_save_to_memory_event(event)
):
if event.started_event_id is not None:
self._suppressed_memory_save_event_ids.discard(
event.started_event_id
)
else:
for idx, entry in reversed(list(enumerate(self._log_entries))):
has_started_event_match = (
event.started_event_id is not None
and (
entry.get("event_id") == event.started_event_id
or entry.get("started_event_id")
== event.started_event_id
)
)
has_running_event_without_id = (
event.started_event_id is None
and entry["status"] == "running"
)
if entry["tool_name"] == "memory_save" and (
has_running_event_without_id or has_started_event_match
):
entry["status"] = "error"
entry["error"] = event.error
entry["duration"] = time.time() - entry["start_time"]
entry["started_event_id"] = event.started_event_id
self._log_expanded.add(idx)
break
else:
self._log_entries.append(
{
"tool_name": "memory_save",
"status": "error",
"args": _truncate_log_text(
event.value, _LOG_ARGS_TEXT_LIMIT
),
"result": None,
"error": event.error,
"start_time": time.time(),
"duration": 0,
"task_idx": self._current_task_idx,
"started_event_id": event.started_event_id,
}
)
self._log_expanded.add(len(self._log_entries) - 1)
self._unsubscribe_if_no_running_memory_save(wait_for_queued=True)
self._register_handler(MemorySaveFailedEvent, on_memory_save_failed)
@crewai_event_bus.on(MemoryRetrievalStartedEvent)
def on_memory_retrieval_started(
source: Any, event: MemoryRetrievalStartedEvent

View File

@@ -1,15 +1,11 @@
from __future__ import annotations
from pathlib import Path
import re
import shutil
import tempfile
from typing import Any
import zipfile
from crewai_cli import git
from crewai_cli.deploy.validate import normalize_package_name
from crewai_cli.utils import parse_toml
_EXCLUDED_DIRS = {
@@ -38,8 +34,6 @@ _EXCLUDED_SUFFIXES = {
".pyc",
".pyo",
}
_SCRIPT_KEY_PATTERN = re.compile(r"^\s*(?P<key>[A-Za-z0-9_.-]+|\"[^\"]+\"|'[^']+')\s*=")
_SECTION_PATTERN = re.compile(r"^\s*\[[^\]]+\]\s*(?:#.*)?$")
def create_project_zip(
@@ -143,267 +137,7 @@ def _stage_project(root: Path, files: list[Path]) -> Path:
destination = staging_root / relative_path
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, destination)
if _is_json_crew_project(staging_root):
_add_json_crew_deploy_wrapper(staging_root)
except Exception:
shutil.rmtree(staging_root, ignore_errors=True)
raise
return staging_root
def _is_json_crew_project(root: Path) -> bool:
"""Return True for JSON crew projects that need a Python deploy wrapper."""
if not ((root / "crew.jsonc").is_file() or (root / "crew.json").is_file()):
return False
project = _read_pyproject(root)
tool_config = project.get("tool") or {}
crewai_config = tool_config.get("crewai") if isinstance(tool_config, dict) else None
declared_type = (
crewai_config.get("type") if isinstance(crewai_config, dict) else None
)
if declared_type == "flow":
return False
package_name = _package_name(root)
if package_name is None:
raise ValueError(
"Could not derive a valid Python package name from [project].name."
)
return not (root / "src" / package_name / "crew.py").is_file()
def _read_pyproject(root: Path) -> dict[str, Any]:
"""Read pyproject.toml, returning an empty mapping on missing or invalid data."""
pyproject_path = root / "pyproject.toml"
if not pyproject_path.is_file():
return {}
try:
pyproject = parse_toml(pyproject_path.read_text())
except Exception:
return {}
return pyproject if isinstance(pyproject, dict) else {}
def _package_name(root: Path) -> str | None:
"""Return the normalized Python package name for the project."""
project = _read_pyproject(root).get("project")
if not isinstance(project, dict):
return None
name = project.get("name")
if not isinstance(name, str) or not name.strip():
return None
package_name = normalize_package_name(name)
return package_name or None
def _class_name(package_name: str) -> str:
"""Return the generated wrapper class name for a package."""
parts = [part for part in re.split(r"[^a-zA-Z0-9]+", package_name) if part]
class_name = "".join(part[:1].upper() + part[1:] for part in parts)
if not class_name:
return "JsonCrew"
if class_name[0].isdigit():
return f"Crew{class_name}"
return class_name
def _add_json_crew_deploy_wrapper(root: Path) -> None:
"""Add Python wrapper files required to deploy a JSON crew project."""
package_name = _package_name(root)
if package_name is None:
raise ValueError(
"Could not derive a valid Python package name from [project].name."
)
package_dir = root / "src" / package_name
config_dir = package_dir / "config"
config_dir.mkdir(parents=True, exist_ok=True)
class_name = _class_name(package_name)
crew_filename = "crew.jsonc" if (root / "crew.jsonc").is_file() else "crew.json"
(package_dir / "__init__.py").write_text("", encoding="utf-8")
(config_dir / "agents.yaml").write_text("{}\n", encoding="utf-8")
(config_dir / "tasks.yaml").write_text("{}\n", encoding="utf-8")
(package_dir / "crew.py").write_text(
_json_crew_py(class_name, crew_filename),
encoding="utf-8",
)
(package_dir / "main.py").write_text(
_json_main_py(package_name, class_name),
encoding="utf-8",
)
_ensure_project_scripts(root, package_name)
def _json_crew_py(class_name: str, crew_filename: str) -> str:
"""Render the generated crew.py module for a JSON crew."""
return f'''from pathlib import Path
from crewai import Crew
from crewai.project import CrewBase, crew
from crewai.project.crew_loader import load_crew
def _crew_path() -> Path:
return Path(__file__).resolve().parents[2] / "{crew_filename}"
@CrewBase
class {class_name}:
"""Compatibility wrapper for a JSON-defined CrewAI project."""
@crew
def crew(self) -> Crew:
crew_instance, default_inputs = load_crew(_crew_path())
self.default_inputs = default_inputs
return crew_instance
'''
def _json_main_py(package_name: str, class_name: str) -> str:
"""Render the generated main.py entrypoints for a JSON crew."""
return f"""#!/usr/bin/env python
import json
import sys
from {package_name}.crew import {class_name}
def _load():
wrapper = {class_name}()
crew = wrapper.crew()
return crew, getattr(wrapper, "default_inputs", {{}})
def run():
crew, inputs = _load()
return crew.kickoff(inputs=inputs)
def train():
crew, inputs = _load()
return crew.train(
n_iterations=int(sys.argv[1]),
filename=sys.argv[2],
inputs=inputs,
)
def replay():
crew, _ = _load()
return crew.replay(task_id=sys.argv[1])
def test():
crew, inputs = _load()
return crew.test(
n_iterations=int(sys.argv[1]),
eval_llm=sys.argv[2],
inputs=inputs,
)
def run_with_trigger():
if len(sys.argv) < 2:
raise ValueError("No trigger payload provided.")
crew, inputs = _load()
trigger_payload = json.loads(sys.argv[1])
return crew.kickoff(
inputs={{**inputs, "crewai_trigger_payload": trigger_payload}}
)
"""
def _ensure_project_scripts(root: Path, package_name: str) -> None:
"""Ensure generated wrappers have project script entrypoints."""
pyproject_path = root / "pyproject.toml"
if not pyproject_path.is_file():
return
content = pyproject_path.read_text(encoding="utf-8")
entries = _project_script_entries(package_name)
pyproject_path.write_text(
_update_project_scripts(content, entries),
encoding="utf-8",
)
def _project_script_entries(package_name: str) -> dict[str, str]:
"""Return script entrypoints required by the generated JSON wrapper."""
return {
package_name: f"{package_name}.main:run",
"run_crew": f"{package_name}.main:run",
"train": f"{package_name}.main:train",
"replay": f"{package_name}.main:replay",
"test": f"{package_name}.main:test",
"run_with_trigger": f"{package_name}.main:run_with_trigger",
}
def _update_project_scripts(content: str, entries: dict[str, str]) -> str:
"""Add or replace generated script entries in pyproject.toml content."""
lines = content.rstrip().splitlines()
header_index = _project_scripts_header_index(lines)
if header_index is None:
return content.rstrip() + _project_scripts_block(entries)
end_index = _section_end_index(lines, header_index + 1)
seen: set[str] = set()
for index in range(header_index + 1, end_index):
key = _script_key(lines[index])
if key in entries:
lines[index] = _script_line(key, entries[key])
seen.add(key)
missing_lines = [
_script_line(key, value) for key, value in entries.items() if key not in seen
]
lines[end_index:end_index] = missing_lines
return "\n".join(lines).rstrip() + "\n"
def _project_scripts_header_index(lines: list[str]) -> int | None:
"""Return the line index of the project scripts table, if present."""
for index, line in enumerate(lines):
if line.strip() == "[project.scripts]":
return index
return None
def _section_end_index(lines: list[str], start_index: int) -> int:
"""Return the exclusive end index for a TOML table section."""
for index in range(start_index, len(lines)):
if _SECTION_PATTERN.match(lines[index]):
return index
return len(lines)
def _script_key(line: str) -> str | None:
"""Return the script key for a pyproject script line."""
match = _SCRIPT_KEY_PATTERN.match(line)
if not match:
return None
key = match.group("key")
if key.startswith(("'", '"')) and key.endswith(("'", '"')):
return key[1:-1]
return key
def _script_line(key: str, value: str) -> str:
"""Render a project script TOML entry."""
return f'{key} = "{value}"'
def _project_scripts_block(entries: dict[str, str]) -> str:
"""Render a project scripts TOML table."""
lines = ["", "", "[project.scripts]"]
lines.extend(_script_line(key, value) for key, value in entries.items())
return "\n".join(lines) + "\n"

View File

@@ -212,8 +212,16 @@ class DeployValidator:
if crew_path is None:
return self.results
agents_dir = self.project_root / "agents"
self._check_pyproject()
self._check_lockfile()
agents_dir_ok = self._check_json_agents_dir(agents_dir)
project = None
try:
project = validate_crew_project(crew_path, self.project_root / "agents")
if agents_dir_ok:
project = validate_crew_project(crew_path, agents_dir)
except JSONProjectValidationError as e:
self._add(
Severity.ERROR,
@@ -232,15 +240,27 @@ class DeployValidator:
)
return self.results
agents_dir = self.project_root / "agents"
self._check_pyproject()
self._check_lockfile()
self._check_env_vars_json(crew_path, agents_dir, project.agent_names)
if project is not None:
self._check_env_vars_json(crew_path, agents_dir, project.agent_names)
self._check_version_vs_lockfile()
return self.results
def _check_json_agents_dir(self, agents_dir: Path) -> bool:
if agents_dir.is_dir():
return True
self._add(
Severity.ERROR,
"missing_agents_dir",
"Cannot find agents/ directory",
detail=(
"JSON crew projects load agent definitions from "
f"{agents_dir.relative_to(self.project_root)}/*.jsonc or *.json."
),
hint="Create agents/ and add one JSON or JSONC file per agent.",
)
return False
def _check_env_vars_json(
self, crew_path: Path, agents_dir: Path, agent_names: list[str]
) -> None:

View File

@@ -1,12 +1,94 @@
from __future__ import annotations
import importlib
import inspect
from pathlib import Path
import subprocess
import sys
from typing import Any
import click
def _project_script_target(script_name: str) -> str | None:
try:
from crewai_cli.utils import read_toml
pyproject = read_toml()
except Exception:
return None
target = pyproject.get("project", {}).get("scripts", {}).get(script_name)
return target if isinstance(target, str) else None
def _prepare_project_import_path() -> None:
cwd = Path.cwd()
for path in (cwd / "src", cwd):
path_str = str(path)
if path.exists() and path_str not in sys.path:
sys.path.insert(0, path_str)
def _load_conversational_flow_from_kickoff_script() -> Any | None:
target = _project_script_target("kickoff")
if not target or ":" not in target:
return None
module_name, _callable_name = target.split(":", 1)
_prepare_project_import_path()
try:
module = importlib.import_module(module_name)
from crewai.flow.flow import Flow
except Exception:
return None
for value in vars(module).values():
if (
inspect.isclass(value)
and value is not Flow
and issubclass(value, Flow)
and getattr(value, "conversational", False)
):
return value()
for value in vars(module).values():
if (
isinstance(value, Flow)
and getattr(value, "conversational", False)
and callable(getattr(value, "handle_turn", None))
):
return value
return None
def _run_conversational_flow_tui(flow: Any) -> Any:
from crewai_cli.crew_run_tui import CrewRunApp
app = CrewRunApp(
crew_name=getattr(flow, "name", None) or type(flow).__name__,
conversational=True,
)
app._flow = flow
app.run()
if app._status == "failed":
raise SystemExit(1)
return app._crew_result
def kickoff_flow() -> None:
"""
Kickoff the flow by running a command in the UV environment.
"""
flow = _load_conversational_flow_from_kickoff_script()
if flow is not None:
_run_conversational_flow_tui(flow)
return
command = ["uv", "run", "kickoff"]
try:

View File

@@ -5,19 +5,27 @@ import click
def plot_flow() -> None:
"""
Plot the flow by running a command in the UV environment.
Plot the flow from declarative config or the Python UV entrypoint.
"""
command = ["uv", "run", "plot"]
from crewai_cli.run_declarative_flow import (
configured_project_declarative_flow,
plot_declarative_flow_in_project_env,
)
try:
result = subprocess.run(command, capture_output=False, text=True, check=True) # noqa: S603
if definition := configured_project_declarative_flow():
plot_declarative_flow_in_project_env(definition)
else:
command = ["uv", "run", "plot"]
if result.stderr:
click.echo(result.stderr, err=True)
try:
subprocess.run( # noqa: S603
command, capture_output=False, text=True, check=True
)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while plotting the flow: {e}", err=True)
click.echo(e.output, err=True)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while plotting the flow: {e}", err=True)
raise SystemExit(1) from e
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
raise SystemExit(1) from e

View File

@@ -1,16 +1,15 @@
from __future__ import annotations
from collections.abc import Callable
from contextlib import AbstractContextManager, nullcontext
from enum import Enum
import os
from pathlib import Path
import re
import subprocess
import sys
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast
import click
from crewai.project.json_loader import find_crew_json_file
from crewai_core.constants import CREWAI_TRAINED_AGENTS_FILE_ENV
from packaging import version
@@ -27,17 +26,21 @@ if TYPE_CHECKING:
from crewai_cli.crew_run_tui import CrewRunApp
class CrewType(Enum):
STANDARD = "standard"
FLOW = "flow"
# Must accept the same names as the kickoff interpolation pattern in
# crewai.utilities.string_utils (_VARIABLE_PATTERN), including hyphens —
# otherwise placeholders are interpolated at runtime but never prompted for.
_INPUT_PLACEHOLDER_RE = re.compile(r"(?<!{){([A-Za-z_][A-Za-z0-9_\-]*)}(?!})")
_CREWAI_CLI_RUNNER_PACKAGE_DIR_ENV = "CREWAI_CLI_RUNNER_PACKAGE_DIR"
_CREWAI_RUNNER_SOURCE_DIR_ENV = "CREWAI_RUNNER_SOURCE_DIR"
_FULL_CREWAI_INSTALL_MESSAGE = """\
CrewAI CLI is installed without the `crewai` package required to run crews.
Install the full CrewAI prerelease package:
uv tool install --force --prerelease=allow 'crewai[tools]==1.14.8a1'
The quotes are required in zsh so `crewai[tools]` is not treated as a glob.
"""
_JSON_CREW_RUNNER_CODE = """
import importlib.util
import os
@@ -72,12 +75,39 @@ module_spec.loader.exec_module(module)
from crewai_core.constants import CREWAI_TRAINED_AGENTS_FILE_ENV
module._run_json_crew(
trained_agents_file=os.getenv(CREWAI_TRAINED_AGENTS_FILE_ENV)
)
try:
module._run_json_crew(
trained_agents_file=os.getenv(CREWAI_TRAINED_AGENTS_FILE_ENV)
)
except module.click.ClickException as exc:
exc.show()
raise SystemExit(exc.exit_code)
""".strip()
def _import_find_crew_json_file() -> Callable[[], Path | None]:
from crewai.project.json_loader import find_crew_json_file as _find_crew_json_file
return cast("Callable[[], Path | None]", _find_crew_json_file)
def _is_missing_crewai_package(exc: ModuleNotFoundError) -> bool:
return bool(exc.name and exc.name.startswith("crewai"))
def _full_crewai_install_error() -> click.ClickException:
return click.ClickException(_FULL_CREWAI_INSTALL_MESSAGE)
def find_crew_json_file() -> Path | None:
try:
return _import_find_crew_json_file()()
except ModuleNotFoundError as exc:
if _is_missing_crewai_package(exc):
raise _full_crewai_install_error() from exc
raise
def _has_json_crew() -> bool:
"""Check if this is a JSON-defined crew project.
@@ -501,7 +531,11 @@ def _print_post_tui_summary(app: CrewRunApp) -> None:
)
def run_crew(trained_agents_file: str | None = None) -> None:
def run_crew(
trained_agents_file: str | None = None,
definition: str | None = None,
inputs: str | None = None,
) -> None:
"""Run the crew or flow.
Args:
@@ -509,15 +543,98 @@ def run_crew(trained_agents_file: str | None = None) -> None:
by ``crewai train -f``. When set, exported as
``CREWAI_TRAINED_AGENTS_FILE`` so agents load suggestions from this
file instead of the default ``trained_agents_data.pkl``.
definition: Optional path to a declarative Flow definition.
inputs: Optional JSON object passed to a declarative Flow.
"""
# JSON crew projects take precedence
if inputs is not None and definition is None:
raise click.UsageError("--inputs requires --definition")
if definition is not None:
_run_explicit_declarative_flow(
definition=definition,
inputs=inputs,
trained_agents_file=trained_agents_file,
)
return
if _has_json_crew():
_run_json_crew_in_project_env(trained_agents_file=trained_agents_file)
return
pyproject_data = read_toml()
_warn_if_old_poetry_project(pyproject_data)
project_type = _get_project_type(pyproject_data)
if project_type == "flow":
_run_flow_project(
pyproject_data=pyproject_data,
trained_agents_file=trained_agents_file,
)
return
_run_classic_crew_project(
pyproject_data=pyproject_data,
trained_agents_file=trained_agents_file,
)
def _run_explicit_declarative_flow(
definition: str, inputs: str | None, trained_agents_file: str | None
) -> None:
if trained_agents_file is not None:
raise click.UsageError("--filename can only be used when running crews")
from crewai_cli.run_declarative_flow import run_declarative_flow
run_declarative_flow(definition=definition, inputs=inputs)
def _run_flow_project(
pyproject_data: dict[str, Any], trained_agents_file: str | None
) -> None:
if trained_agents_file is not None:
raise click.UsageError("--filename can only be used when running crews")
from crewai_cli.run_declarative_flow import (
configured_project_declarative_flow,
run_declarative_flow_in_project_env,
)
if definition := configured_project_declarative_flow(pyproject_data):
run_declarative_flow_in_project_env(definition=definition)
return
from crewai_cli.kickoff_flow import (
_load_conversational_flow_from_kickoff_script,
_run_conversational_flow_tui,
)
flow = _load_conversational_flow_from_kickoff_script()
if flow is not None:
_run_conversational_flow_tui(flow)
return
_execute_uv_script("kickoff", entity_type="flow")
def _run_classic_crew_project(
pyproject_data: dict[str, Any], trained_agents_file: str | None
) -> None:
_execute_uv_script(
"run_crew",
entity_type="crew",
trained_agents_file=trained_agents_file,
)
def _get_project_type(pyproject_data: dict[str, Any]) -> str | None:
project_type = pyproject_data.get("tool", {}).get("crewai", {}).get("type")
return project_type if isinstance(project_type, str) else None
def _warn_if_old_poetry_project(pyproject_data: dict[str, Any]) -> None:
crewai_version = get_crewai_version()
min_required_version = "0.71.0"
pyproject_data = read_toml()
if pyproject_data.get("tool", {}).get("poetry") and (
version.parse(crewai_version) < version.parse(min_required_version)
@@ -528,25 +645,22 @@ def run_crew(trained_agents_file: str | None = None) -> None:
fg="red",
)
is_flow = pyproject_data.get("tool", {}).get("crewai", {}).get("type") == "flow"
crew_type = CrewType.FLOW if is_flow else CrewType.STANDARD
click.echo(f"Running the {'Flow' if is_flow else 'Crew'}")
execute_command(crew_type, trained_agents_file=trained_agents_file)
def execute_command(
crew_type: CrewType, trained_agents_file: str | None = None
def _execute_uv_script(
script_name: str,
*,
entity_type: str,
trained_agents_file: str | None = None,
) -> None:
"""Execute the appropriate command based on crew type.
"""Execute a project script through uv.
Args:
crew_type: The type of crew to run.
script_name: The project script to run.
entity_type: The user-facing entity being run.
trained_agents_file: Optional trained-agents pickle path forwarded to
the subprocess via the ``CREWAI_TRAINED_AGENTS_FILE`` env var.
"""
command = ["uv", "run", "kickoff" if crew_type == CrewType.FLOW else "run_crew"]
command = ["uv", "run", script_name]
env = build_env_with_all_tool_credentials()
if trained_agents_file:
@@ -556,21 +670,20 @@ def execute_command(
subprocess.run(command, capture_output=False, text=True, check=True, env=env) # noqa: S603
except subprocess.CalledProcessError as e:
handle_error(e, crew_type)
_handle_run_error(e, entity_type)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
def handle_error(error: subprocess.CalledProcessError, crew_type: CrewType) -> None:
def _handle_run_error(error: subprocess.CalledProcessError, entity_type: str) -> None:
"""
Handle subprocess errors with appropriate messaging.
Args:
error: The subprocess error that occurred
crew_type: The type of crew that was being run
entity_type: The type of entity that was being run
"""
entity_type = "flow" if crew_type == CrewType.FLOW else "crew"
click.echo(f"An error occurred while running the {entity_type}: {error}", err=True)
if error.output:

View File

@@ -0,0 +1,241 @@
from __future__ import annotations
import json
from pathlib import Path, PureWindowsPath
import subprocess
from typing import Any
import click
from pydantic import ValidationError
from crewai_cli.utils import build_env_with_all_tool_credentials
def run_declarative_flow_in_project_env(
definition: str | Path, inputs: str | None = None
) -> None:
"""Run a declarative flow inside the project's Python environment."""
if is_declarative_flow_project_env() or not _has_project_file():
run_declarative_flow(definition=definition, inputs=inputs)
return
if inputs is not None:
raise click.UsageError("--inputs is only supported with --definition")
_execute_declarative_flow_command(["uv", "run", "crewai", "run"])
def plot_declarative_flow_in_project_env(definition: str | Path) -> None:
"""Plot a declarative flow inside the project's Python environment."""
if is_declarative_flow_project_env() or not _has_project_file():
plot_declarative_flow(definition=definition)
return
_execute_declarative_flow_command(["uv", "run", "crewai", "flow", "plot"])
def run_declarative_flow(definition: str | Path, inputs: str | None = None) -> None:
"""Run a declarative flow from a definition path."""
parsed_inputs = _parse_inputs(inputs)
try:
flow = load_declarative_flow(definition)
result = flow.kickoff(inputs=parsed_inputs)
except Exception as exc:
click.echo(
f"An error occurred while running the declarative flow: {exc}", err=True
)
raise SystemExit(1) from exc
click.echo(_format_result(result))
def plot_declarative_flow(definition: str | Path) -> None:
"""Plot a declarative flow from a definition path."""
try:
flow = load_declarative_flow(definition)
flow.plot()
except Exception as exc:
click.echo(
f"An error occurred while plotting the declarative flow: {exc}", err=True
)
raise SystemExit(1) from exc
def load_declarative_flow(definition: str | Path) -> Any:
"""Load a declarative Flow instance from a definition path."""
try:
from crewai.flow.flow import Flow
except ImportError as exc:
click.echo(
"Running declarative flows requires the full crewai package.",
err=True,
)
raise SystemExit(1) from exc
definition_path = Path(definition).expanduser()
try:
if not definition_path.is_file():
if definition_path.exists():
click.echo(
f"Invalid --definition path: {definition} is not a file.",
err=True,
)
raise SystemExit(1)
click.echo(
f"Invalid --definition path: {definition} does not exist.", err=True
)
raise SystemExit(1)
except OSError as exc:
click.echo(f"Invalid --definition path: {definition} ({exc})", err=True)
raise SystemExit(1) from exc
try:
return Flow.from_declaration(path=definition_path)
except (OSError, UnicodeError, ValueError, ValidationError) as exc:
click.echo(
f"Unable to read --definition path {definition_path}: {exc}",
err=True,
)
raise SystemExit(1) from exc
def configured_project_declarative_flow(
pyproject_data: dict[str, Any] | None = None,
project_root: Path | None = None,
) -> Path | None:
"""Return the configured declarative flow source for flow projects."""
if pyproject_data is None:
try:
from crewai_cli.utils import read_toml
pyproject_data = read_toml()
except Exception:
return None
crewai_config = pyproject_data.get("tool", {}).get("crewai", {})
if crewai_config.get("type") != "flow":
return None
definition = crewai_config.get("definition")
if not isinstance(definition, str):
return None
definition = definition.strip()
if not definition:
return None
return _resolve_project_definition_path(
definition=definition,
project_root=project_root or Path.cwd(),
)
def _resolve_project_definition_path(definition: str, project_root: Path) -> Path:
definition_path = Path(definition)
windows_definition_path = PureWindowsPath(definition)
if definition.startswith("~"):
raise click.UsageError(
"[tool.crewai] definition must be a project-local path; "
f"got {definition!r}."
)
if definition_path.is_absolute() or windows_definition_path.is_absolute():
raise click.UsageError(
"[tool.crewai] definition must be relative to the project root; "
f"got {definition!r}."
)
try:
root = project_root.resolve(strict=True)
except OSError as exc:
raise click.UsageError(
f"Invalid project root for [tool.crewai] definition: {exc}"
) from exc
candidate = root / definition_path
try:
resolved_candidate = candidate.resolve(strict=False)
except OSError as exc:
raise click.UsageError(
f"Invalid [tool.crewai] definition path {definition!r}: {exc}"
) from exc
if not resolved_candidate.is_relative_to(root):
raise click.UsageError(
"[tool.crewai] definition must resolve inside the project root; "
f"got {definition!r}."
)
if not resolved_candidate.exists():
raise click.UsageError(
"[tool.crewai] definition must point to an existing file; "
f"got {definition!r}."
)
if not resolved_candidate.is_file():
raise click.UsageError(
"[tool.crewai] definition must point to a regular file; "
f"got {definition!r}."
)
return resolved_candidate
def _execute_declarative_flow_command(command: list[str]) -> None:
env = build_env_with_all_tool_credentials()
try:
subprocess.run( # noqa: S603
command,
capture_output=False,
text=True,
check=True,
env=env,
)
except subprocess.CalledProcessError as e:
raise SystemExit(e.returncode) from e
except Exception as e:
click.echo(
f"An unexpected error occurred while running the declarative flow: {e}",
err=True,
)
raise SystemExit(1) from e
def is_declarative_flow_project_env() -> bool:
import os
return os.environ.get("UV_RUN_RECURSION_DEPTH") is not None
def _has_project_file(project_root: Path | None = None) -> bool:
root = project_root or Path.cwd()
return (root / "pyproject.toml").is_file()
def _parse_inputs(inputs: str | None) -> dict[str, Any] | None:
if inputs is None:
return None
try:
parsed = json.loads(inputs)
except json.JSONDecodeError as exc:
click.echo(f"Invalid --inputs JSON: {exc}", err=True)
raise SystemExit(1) from exc
if not isinstance(parsed, dict):
click.echo("Invalid --inputs JSON: expected an object.", err=True)
raise SystemExit(1)
return parsed
def _format_result(result: Any) -> str:
raw_result = getattr(result, "raw", result)
if isinstance(raw_result, str):
return raw_result
try:
return json.dumps(raw_result, default=str)
except TypeError:
return str(raw_result)

View File

@@ -1,113 +0,0 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import click
def run_flow_definition(definition: str, inputs: str | None = None) -> None:
"""Run a flow from a Flow Definition YAML/JSON string or file path."""
try:
from crewai.flow.flow import Flow
from crewai.flow.flow_definition import FlowDefinition
except ImportError as exc:
click.echo(
"Running flows from definitions requires the full crewai package.",
err=True,
)
raise SystemExit(1) from exc
parsed_inputs = _parse_inputs(inputs)
definition_source = _read_definition_source(definition)
try:
flow_definition = _parse_flow_definition(FlowDefinition, definition_source)
flow = Flow.from_definition(flow_definition)
result = flow.kickoff(inputs=parsed_inputs)
except Exception as exc:
click.echo(
f"An error occurred while running the flow definition: {exc}", err=True
)
raise SystemExit(1) from exc
click.echo(_format_result(result))
def _parse_inputs(inputs: str | None) -> dict[str, Any] | None:
if inputs is None:
return None
try:
parsed = json.loads(inputs)
except json.JSONDecodeError as exc:
click.echo(f"Invalid --inputs JSON: {exc}", err=True)
raise SystemExit(1) from exc
if not isinstance(parsed, dict):
click.echo("Invalid --inputs JSON: expected an object.", err=True)
raise SystemExit(1)
return parsed
def _read_definition_source(definition: str) -> str:
path = Path(definition).expanduser()
try:
is_file = path.is_file()
except OSError as exc:
if _looks_like_inline_definition(definition):
return definition
click.echo(f"Invalid --definition path: {definition} ({exc})", err=True)
raise SystemExit(1) from exc
if is_file:
try:
return path.read_text(encoding="utf-8")
except (OSError, UnicodeError) as exc:
click.echo(
f"Unable to read --definition path {path}: {exc}",
err=True,
)
raise SystemExit(1) from exc
try:
if path.exists():
click.echo(
f"Invalid --definition path: {definition} is not a file.", err=True
)
raise SystemExit(1)
except OSError as exc:
click.echo(f"Invalid --definition path: {definition} ({exc})", err=True)
raise SystemExit(1) from exc
return definition
def _looks_like_inline_definition(definition: str) -> bool:
stripped = definition.lstrip()
return "\n" in definition or stripped.startswith(("{", "---")) or ":" in stripped
def _parse_flow_definition(flow_definition_cls: type[Any], source: str) -> Any:
if _looks_like_json(source):
return flow_definition_cls.from_json(source)
return flow_definition_cls.from_yaml(source)
def _looks_like_json(source: str) -> bool:
stripped = source.lstrip()
return stripped.startswith("{")
def _format_result(result: Any) -> str:
raw_result = getattr(result, "raw", result)
if isinstance(raw_result, str):
return raw_result
try:
return json.dumps(raw_result, default=str)
except TypeError:
return str(raw_result)

View File

@@ -62,7 +62,7 @@ crewai create flow <name> --skip_provider # New flow project
# Running
crewai run # Run crew or flow (auto-detects from pyproject.toml)
crewai flow kickoff # Legacy flow execution
crewai flow kickoff # Deprecated compatibility alias for crewai run
# Testing & training
crewai test # Test crew (default: 2 iterations, gpt-4o-mini)
@@ -767,10 +767,11 @@ class CustomSearchTool(BaseTool):
```python
from crewai.tools import tool
@tool("Calculator")
def calculator(expression: str) -> str:
"""Evaluates a mathematical expression and returns the result."""
return str(eval(expression))
@tool("WordCount")
def word_count(text: str) -> str:
"""Counts the number of words in the given text."""
count = len(text.split())
return f"Word count: {count}"
```
### Built-in Tools (install with `uv add crewai-tools`)

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7"
"crewai[tools]==1.14.8a3"
]
[project.scripts]

View File

@@ -0,0 +1,5 @@
.env
.venv/
__pycache__/
.crewai/
output/

View File

@@ -0,0 +1,17 @@
# {{name}} Flow
This project defines a declarative CrewAI Flow in `src/{{folder_name}}/flow.yaml`.
## Install
```bash
crewai install
```
## Run
```bash
crewai run
```
Edit the declarative flow definition at `src/{{folder_name}}/flow.yaml` to change the flow. Add reusable crews under `src/{{folder_name}}/crews/`, custom Python tools under `src/{{folder_name}}/tools/`, and shared knowledge files under `src/{{folder_name}}/knowledge/`.

View File

@@ -0,0 +1,15 @@
schema: crewai.flow/v1
name: {{flow_name}}
description: A declarative CrewAI Flow.
state:
type: dict
default:
topic: AI agents
methods:
start:
start: true
do:
call: expression
expr: state.topic

View File

@@ -0,0 +1,20 @@
[project]
name = "{{folder_name}}"
version = "0.1.0"
description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.8a3"
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/{{folder_name}}"]
[tool.crewai]
type = "flow"
definition = "src/{{folder_name}}/flow.yaml"

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7"
"crewai[tools]==1.14.8a3"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7"
"crewai[tools]==1.14.8a3"
]
[tool.crewai]

View File

@@ -132,7 +132,7 @@ def test_create_project_zip_excludes_symlinked_files(tmp_path: Path):
assert names == {"pyproject.toml"}
def test_create_project_zip_adds_json_project_wrapper(tmp_path: Path):
def test_create_project_zip_preserves_json_project_shape(tmp_path: Path):
(tmp_path / "pyproject.toml").write_text(
"""
[project]
@@ -157,8 +157,6 @@ type = "crew"
try:
with zipfile.ZipFile(archive_path) as archive:
names = set(archive.namelist())
crew_py = archive.read("src/json_crew/crew.py").decode()
main_py = archive.read("src/json_crew/main.py").decode()
pyproject = archive.read("pyproject.toml").decode()
finally:
archive_path.unlink(missing_ok=True)
@@ -166,18 +164,50 @@ type = "crew"
assert "uv.lock" not in names
assert "crew.jsonc" in names
assert "agents/researcher.jsonc" in names
assert "src/json_crew/__init__.py" in names
assert "src/json_crew/crew.py" in names
assert "src/json_crew/main.py" in names
assert "src/json_crew/config/agents.yaml" in names
assert "src/json_crew/config/tasks.yaml" in names
assert "load_crew(_crew_path())" in crew_py
assert "JsonCrew" in crew_py
assert "from json_crew.crew import JsonCrew" in main_py
assert "run_crew = \"json_crew.main:run\"" in pyproject
assert all(not name.startswith("src/") for name in names)
assert "run_crew" not in pyproject
assert "json_crew =" not in pyproject
assert "[project.scripts]" not in pyproject
def test_create_project_zip_updates_existing_json_project_scripts(tmp_path: Path):
def test_create_project_zip_keeps_json_project_root_shape(tmp_path: Path):
(tmp_path / "pyproject.toml").write_text(
"""
[project]
name = "json_crew"
version = "0.1.0"
dependencies = ["crewai[tools]==1.14.8a1"]
[tool.crewai]
type = "crew"
""".strip()
+ "\n"
)
(tmp_path / "uv.lock").write_text("# lock\n")
(tmp_path / "agents").mkdir()
(tmp_path / "agents" / "foo.jsonc").write_text("{}\n")
(tmp_path / "crew.jsonc").write_text("{}\n")
archive_path = create_project_zip("json_crew", project_dir=tmp_path)
try:
with zipfile.ZipFile(archive_path) as archive:
names = set(archive.namelist())
pyproject = archive.read("pyproject.toml").decode()
finally:
archive_path.unlink(missing_ok=True)
assert names == {
"agents/foo.jsonc",
"crew.jsonc",
"pyproject.toml",
"uv.lock",
}
assert "run_crew" not in pyproject
assert "json_crew =" not in pyproject
assert "[project.scripts]" not in pyproject
def test_create_project_zip_does_not_rewrite_json_project_scripts(tmp_path: Path):
(tmp_path / "pyproject.toml").write_text(
"""
[project]
@@ -203,14 +233,10 @@ type = "crew"
finally:
archive_path.unlink(missing_ok=True)
assert 'json_crew = "json_crew.main:run"' in pyproject
assert 'run_crew = "json_crew.main:run"' in pyproject
assert 'train = "json_crew.main:train"' in pyproject
assert 'replay = "json_crew.main:replay"' in pyproject
assert 'test = "json_crew.main:test"' in pyproject
assert 'run_with_trigger = "json_crew.main:run_with_trigger"' in pyproject
assert 'json_crew = "old.module:run"' in pyproject
assert 'run_crew = "old.module:run"' in pyproject
assert 'custom = "custom.module:main"' in pyproject
assert "old.module:run" not in pyproject
assert pyproject.count("[project.scripts]") == 1
assert "[tool.crewai]" in pyproject
@@ -221,7 +247,7 @@ type = "crew"
'[tool]\ncrewai = "invalid"\n',
],
)
def test_create_project_zip_adds_json_wrapper_for_malformed_tool_config(
def test_create_project_zip_preserves_json_project_with_malformed_tool_config(
tmp_path: Path, tool_config: str
):
(tmp_path / "pyproject.toml").write_text(
@@ -244,12 +270,13 @@ version = "0.1.0"
finally:
archive_path.unlink(missing_ok=True)
assert "src/json_crew/crew.py" in names
assert "src/json_crew/main.py" in names
assert "run_crew = \"json_crew.main:run\"" in pyproject
assert names == {"crew.jsonc", "pyproject.toml"}
assert "run_crew" not in pyproject
assert "json_crew =" not in pyproject
assert "[project.scripts]" not in pyproject
def test_create_project_zip_rejects_empty_normalized_package_name(tmp_path: Path):
def test_create_project_zip_accepts_json_project_without_package_name(tmp_path: Path):
(tmp_path / "pyproject.toml").write_text(
"""
[project]
@@ -263,8 +290,15 @@ type = "crew"
)
(tmp_path / "crew.jsonc").write_text("{}\n")
with pytest.raises(
ValueError,
match=r"Could not derive a valid Python package name",
):
create_project_zip("invalid", project_dir=tmp_path)
archive_path = create_project_zip("invalid", project_dir=tmp_path)
try:
with zipfile.ZipFile(archive_path) as archive:
names = set(archive.namelist())
pyproject = archive.read("pyproject.toml").decode()
finally:
archive_path.unlink(missing_ok=True)
assert names == {"crew.jsonc", "pyproject.toml"}
assert "run_crew" not in pyproject
assert "json_crew =" not in pyproject
assert "[project.scripts]" not in pyproject

View File

@@ -200,6 +200,41 @@ def test_json_runtime_fields_are_deploy_errors(tmp_path: Path) -> None:
assert "runtime-only" in finding.detail
def test_json_crew_requires_agents_dir_without_classic_errors(tmp_path: Path) -> None:
_scaffold_json_crew(tmp_path)
for path in (tmp_path / "agents").iterdir():
path.unlink()
(tmp_path / "agents").rmdir()
v = DeployValidator(project_root=tmp_path)
v.run()
codes = _codes(v)
assert "missing_agents_dir" in codes
assert "missing_src_dir" not in codes
assert "missing_crew_py" not in codes
assert "missing_agents_yaml" not in codes
assert "missing_tasks_yaml" not in codes
def test_json_crew_reports_project_metadata_before_invalid_json(
tmp_path: Path,
) -> None:
_scaffold_json_crew(tmp_path)
(tmp_path / "pyproject.toml").unlink()
(tmp_path / "uv.lock").unlink()
(tmp_path / "crew.jsonc").write_text('{"agents": ["researcher"], "tasks": []}\n')
v = DeployValidator(project_root=tmp_path)
v.run()
codes = _codes(v)
assert "missing_pyproject" in codes
assert "missing_lockfile" in codes
assert "invalid_crew_json" in codes
assert "missing_src_dir" not in codes
def test_missing_pyproject_errors(tmp_path: Path) -> None:
v = _run_without_import_check(tmp_path)
assert "missing_pyproject" in _codes(v)

View File

@@ -12,6 +12,7 @@ from crewai_cli.cli import (
deploy_remove,
deply_status,
flow_add_crew,
flow_run,
login,
reset_memories,
run,
@@ -126,38 +127,75 @@ def test_run_uses_project_runner_by_default(run_crew, runner):
result = runner.invoke(run)
assert result.exit_code == 0
run_crew.assert_called_once_with(trained_agents_file=None)
run_crew.assert_called_once_with(
trained_agents_file=None,
definition=None,
inputs=None,
)
assert "experimental" not in result.output.lower()
@mock.patch("crewai_cli.cli.run_flow_definition")
def test_run_with_definition_uses_definition_runner(run_flow_definition, runner):
@mock.patch("crewai_cli.cli.run_crew")
def test_run_with_definition_uses_project_runner(run_crew, runner):
result = runner.invoke(
run,
["--definition", "flow.yaml", "--inputs", '{"topic":"AI"}'],
)
assert result.exit_code == 0
assert (
"Warning: `crewai run --definition` is experimental and may change without notice."
in result.output
)
run_flow_definition.assert_called_once_with(
definition="flow.yaml", inputs='{"topic":"AI"}'
run_crew.assert_called_once_with(
trained_agents_file=None,
definition="flow.yaml",
inputs='{"topic":"AI"}',
)
@mock.patch("crewai_cli.cli.run_crew")
@mock.patch("crewai_cli.cli.run_flow_definition")
def test_run_rejects_inputs_without_definition(run_flow_definition, run_crew, runner):
def test_run_rejects_inputs_without_definition(run_crew, runner):
result = runner.invoke(run, ["--inputs", '{"topic":"AI"}'])
assert result.exit_code == 2
assert "Error: --inputs requires --definition" in result.output
run_flow_definition.assert_not_called()
run_crew.assert_not_called()
@mock.patch("crewai_cli.cli.run_crew")
def test_run_rejects_filename_with_definition(run_crew, runner):
result = runner.invoke(run, ["--definition", "flow.yaml", "--filename", "x.pkl"])
assert result.exit_code == 2
assert "Error: --filename can only be used when running crews" in result.output
run_crew.assert_not_called()
@mock.patch("crewai_cli.cli.run_crew")
def test_run_passes_filename_to_project_runner(run_crew, runner):
result = runner.invoke(run, ["--filename", "trained.pkl"])
assert result.exit_code == 0
run_crew.assert_called_once_with(
trained_agents_file="trained.pkl",
definition=None,
inputs=None,
)
@mock.patch("crewai_cli.cli.run_crew")
def test_flow_kickoff_is_deprecated_and_uses_run_path(run_crew, runner):
result = runner.invoke(flow_run)
assert result.exit_code == 0
run_crew.assert_called_once_with(
trained_agents_file=None,
definition=None,
inputs=None,
)
assert (
"The command 'crewai flow kickoff' is deprecated. Use 'crewai run' instead."
in result.output
)
@mock.patch("crewai_cli.create_json_crew.create_json_crew")
def test_create_crew_in_dmn_mode_skips_provider_prompts(create_json_crew, runner):
result = runner.invoke(create, ["crew", "DMN Crew"], env={"CREWAI_DMN": "True"})
@@ -166,6 +204,23 @@ def test_create_crew_in_dmn_mode_skips_provider_prompts(create_json_crew, runner
create_json_crew.assert_called_once_with("DMN Crew", None, True)
@mock.patch("crewai_cli.create_flow.create_flow")
def test_create_flow_declarative_uses_declarative_scaffold(create_flow, runner):
result = runner.invoke(create, ["flow", "My Flow", "--declarative"])
assert result.exit_code == 0
create_flow.assert_called_once_with("My Flow", declarative=True)
@mock.patch("crewai_cli.create_json_crew.create_json_crew")
def test_create_crew_rejects_declarative_flag(create_json_crew, runner):
result = runner.invoke(create, ["crew", "My Crew", "--declarative"])
assert result.exit_code == 2
assert "--declarative can only be used with flow projects" in result.output
create_json_crew.assert_not_called()
def test_create_requires_type_in_dmn_mode(runner):
result = runner.invoke(create, env={"CREWAI_DMN": "True"})

View File

@@ -5,7 +5,10 @@ from pathlib import Path
from unittest import mock
import pytest
import tomli
from click.testing import CliRunner
from packaging.requirements import Requirement
from packaging.version import Version
import crewai_cli.create_json_crew as json_crew
import crewai_cli.tui_picker as tui_picker
from crewai_cli.create_crew import create_crew, create_folder_structure
@@ -709,8 +712,34 @@ def test_json_create_provider_preselects_default_model(tmp_path, monkeypatch):
default_llm="openai/gpt-5.5",
)
assert (tmp_path / "json_crew" / "crew.jsonc").exists()
assert not (tmp_path / "json_crew" / "src").exists()
assert not (tmp_path / "json_crew" / "tests").exists()
assert not (tmp_path / "json_crew" / "config.jsonc").exists()
generated_paths = {
path.relative_to(tmp_path / "json_crew").as_posix()
for path in (tmp_path / "json_crew").rglob("*")
if path.is_file()
}
assert not any(
path.endswith("/crew.py") or path == "crew.py" for path in generated_paths
)
assert not any(
path.endswith("/agents.yaml") or path == "agents.yaml"
for path in generated_paths
)
assert not any(
path.endswith("/tasks.yaml") or path == "tasks.yaml"
for path in generated_paths
)
assert not any(path.startswith("src/") for path in generated_paths)
pyproject = tomli.loads((tmp_path / "json_crew" / "pyproject.toml").read_text())
dependency = pyproject["project"]["dependencies"][0]
assert dependency == "crewai[tools]==1.14.8a1"
assert Version("1.14.8a1") in Requirement(dependency).specifier
assert pyproject["tool"]["hatch"]["build"]["targets"]["wheel"][
"only-include"
] == ["agents", "crew.jsonc", "tools", "knowledge", "skills"]
crew_template = (tmp_path / "json_crew" / "crew.jsonc").read_text()
assert (
@@ -838,7 +867,7 @@ def test_json_create_dmn_mode_uses_non_interactive_defaults(tmp_path, monkeypatc
crew_template = (project_root / "crew.jsonc").read_text()
agent_template = (project_root / "agents" / "researcher.jsonc").read_text()
assert '"memory": false' in crew_template
assert '"memory": true' in crew_template
assert '"description": "Research current AI trends and write a concise summary."' in (
crew_template
)

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
from pathlib import Path
from click.testing import CliRunner
from pytest import MonkeyPatch
import tomli
from crewai_cli.cli import crewai
from crewai_cli.create_flow import create_flow
def test_create_flow_declarative_project_can_run(
tmp_path: Path, monkeypatch: MonkeyPatch
):
monkeypatch.chdir(tmp_path)
create_flow("Research Flow", declarative=True)
project_root = tmp_path / "research_flow"
assert project_root.is_dir()
pyproject = tomli.loads(
(project_root / "pyproject.toml").read_text(encoding="utf-8")
)
assert pyproject["project"]["name"] == "research_flow"
assert pyproject["project"]["requires-python"]
assert pyproject["project"]["dependencies"]
assert (project_root / pyproject["tool"]["crewai"]["definition"]).is_file()
monkeypatch.chdir(project_root)
result = CliRunner().invoke(crewai, ["run"], env={"UV_RUN_RECURSION_DEPTH": "1"})
assert result.exit_code == 0
assert "Running the Flow" not in result.output
assert "AI agents" in result.output

View File

@@ -4,6 +4,11 @@ import time
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemorySaveStartedEvent,
)
from crewai.events.types.observation_events import (
GoalAchievedEarlyEvent,
PlanRefinementEvent,
@@ -21,7 +26,12 @@ from crewai.events.types.tool_usage_events import (
)
from crewai_cli.command import AuthenticationRequiredError
from crewai_cli import run_crew
from crewai_cli.crew_run_tui import CrewRunApp
from crewai_cli.crew_run_tui import (
CrewRunApp,
_LOG_ARGS_TEXT_LIMIT,
_LOG_RESULT_TEXT_LIMIT,
_LOG_TRUNCATION_SUFFIX,
)
def _app_with_plan() -> CrewRunApp:
@@ -116,6 +126,52 @@ def test_chain_deploy_does_not_login_for_deploy_exit(monkeypatch, capsys) -> Non
assert "Deploy failed with exit code 42" in capsys.readouterr().out
def test_conversation_turn_done_records_assistant_message() -> None:
class RawResult:
raw = "hello from the flow"
app = CrewRunApp(conversational=True)
app._conversation_turn_in_progress = True
app._enable_conversation_input = lambda: None # type: ignore[method-assign]
app._tick = lambda: None # type: ignore[method-assign]
app._scroll_to_result = lambda: None # type: ignore[method-assign]
app._on_conversation_turn_done(RawResult())
assert app._conversation_messages == [("assistant", "hello from the flow")]
assert app._conversation_turn_in_progress is False
assert app._status == "chatting"
assert isinstance(app._crew_result, RawResult)
@pytest.mark.asyncio
async def test_conversation_input_submits_turn() -> None:
class FakeFlow:
defer_trace_finalization = False
def handle_turn(self, message: str) -> str:
return f"reply: {message}"
def finalize_session_traces(self) -> None:
pass
app = CrewRunApp(crew_name="Demo", conversational=True)
app._flow = FakeFlow()
async with app.run_test() as pilot:
await pilot.click("#conversation-input")
await pilot.press("h", "i", "enter")
for _ in range(50):
await pilot.pause(0.05)
if app._conversation_messages[-1:] == [("assistant", "reply: hi")]:
break
assert app._conversation_messages == [
("user", "hi"),
("assistant", "reply: hi"),
]
def test_plan_step_status_updates_only_the_explicit_step() -> None:
app = _app_with_plan()
@@ -335,6 +391,396 @@ def test_internal_reasoning_function_call_is_hidden_from_activity_log() -> None:
assert app._current_task_steps == []
def test_memory_save_events_are_shown_in_activity_log() -> None:
app = _app_with_plan()
app._current_task_idx = 1
app._subscribe()
try:
_emit_event(
MemorySaveStartedEvent(
value="2 memories (background)",
metadata={},
source_type="unified_memory",
)
)
_emit_event(
MemorySaveCompletedEvent(
value="2 memories saved",
metadata={},
save_time_ms=123,
source_type="unified_memory",
)
)
finally:
app._unsubscribe()
assert len(app._log_entries) == 1
assert app._log_entries[0]["tool_name"] == "memory_save"
assert app._log_entries[0]["status"] == "success"
assert app._log_entries[0]["args"] == "2 memories (background)"
assert app._log_entries[0]["result"] == "2 memories saved"
assert app._log_entries[0]["error"] is None
assert app._log_entries[0]["duration"] == 0.123
assert app._log_entries[0]["task_idx"] == 1
def test_nested_memory_save_event_is_hidden_for_save_to_memory_tool() -> None:
app = _app_with_plan()
app._subscribe()
try:
tool_args = {"contents": ["Fact to remember."]}
_emit_event(
ToolUsageStartedEvent(
tool_name="save_to_memory",
tool_args=tool_args,
)
)
_emit_event(
MemorySaveStartedEvent(
value="Fact to remember.",
metadata={},
source_type="unified_memory",
)
)
_emit_event(
MemorySaveCompletedEvent(
value="Fact to remember.",
metadata={},
save_time_ms=123,
source_type="unified_memory",
)
)
now = datetime.now()
_emit_event(
ToolUsageFinishedEvent(
tool_name="save_to_memory",
tool_args=tool_args,
started_at=now,
finished_at=now,
output="Saved to memory.",
)
)
finally:
app._unsubscribe()
assert len(app._log_entries) == 1
assert app._log_entries[0]["tool_name"] == "save_to_memory"
assert app._log_entries[0]["status"] == "success"
assert app._log_entries[0]["result"] == "Saved to memory."
def test_memory_save_failure_is_shown_in_activity_log() -> None:
app = _app_with_plan()
app._subscribe()
try:
_emit_event(
MemorySaveStartedEvent(
value="background save",
metadata={},
source_type="unified_memory",
)
)
_emit_event(
MemorySaveFailedEvent(
value="background save",
metadata={},
error="embedding connection failed",
source_type="unified_memory",
)
)
finally:
app._unsubscribe()
assert app._log_entries[0]["tool_name"] == "memory_save"
assert app._log_entries[0]["status"] == "error"
assert app._log_entries[0]["error"] == "embedding connection failed"
assert app._log_expanded == {0}
def test_memory_save_completion_updates_timed_out_row() -> None:
app = _app_with_plan()
app._subscribe()
try:
_emit_event(
MemorySaveStartedEvent(
value="9 memories (background)",
metadata={},
source_type="unified_memory",
)
)
app._log_entries[0]["status"] = "timeout"
app._log_entries[0]["error"] = "No result received before crew completed"
app._log_entries[0]["duration"] = 8.3
_emit_event(
MemorySaveCompletedEvent(
value="9 memories saved",
metadata={},
save_time_ms=8300,
source_type="unified_memory",
)
)
finally:
app._unsubscribe()
assert len(app._log_entries) == 1
assert app._log_entries[0]["tool_name"] == "memory_save"
assert app._log_entries[0]["status"] == "success"
assert app._log_entries[0]["result"] == "9 memories saved"
assert app._log_entries[0]["error"] is None
assert app._log_entries[0]["duration"] == 8.3
def test_memory_save_completion_with_unmatched_id_does_not_update_running_row() -> None:
app = _app_with_plan()
app._subscribe()
try:
_emit_event(
MemorySaveStartedEvent(
value="first background save",
metadata={},
source_type="unified_memory",
parent_event_id="manual-parent",
)
)
_emit_event(
MemorySaveStartedEvent(
value="second background save",
metadata={},
source_type="unified_memory",
parent_event_id="manual-parent",
)
)
_emit_event(
MemorySaveCompletedEvent(
value="orphan save completed",
metadata={},
save_time_ms=2800,
source_type="unified_memory",
parent_event_id="manual-parent",
started_event_id="missing-memory-save-start",
)
)
finally:
app._unsubscribe()
assert [entry["status"] for entry in app._log_entries] == [
"running",
"running",
"success",
]
assert app._log_entries[0]["args"] == "first background save"
assert app._log_entries[1]["args"] == "second background save"
assert app._log_entries[2]["result"] == "orphan save completed"
assert app._log_entries[2]["started_event_id"] == "missing-memory-save-start"
def test_memory_save_failure_with_unmatched_id_does_not_update_running_row() -> None:
app = _app_with_plan()
app._subscribe()
try:
_emit_event(
MemorySaveStartedEvent(
value="first background save",
metadata={},
source_type="unified_memory",
parent_event_id="manual-parent",
)
)
_emit_event(
MemorySaveStartedEvent(
value="second background save",
metadata={},
source_type="unified_memory",
parent_event_id="manual-parent",
)
)
_emit_event(
MemorySaveFailedEvent(
value="orphan save failed",
metadata={},
error="embedding connection failed",
source_type="unified_memory",
parent_event_id="manual-parent",
started_event_id="missing-memory-save-start",
)
)
finally:
app._unsubscribe()
assert [entry["status"] for entry in app._log_entries] == [
"running",
"running",
"error",
]
assert app._log_entries[0]["args"] == "first background save"
assert app._log_entries[1]["args"] == "second background save"
assert app._log_entries[2]["args"] == "orphan save failed"
assert app._log_entries[2]["error"] == "embedding connection failed"
assert app._log_entries[2]["started_event_id"] == "missing-memory-save-start"
assert app._log_expanded == {2}
def test_memory_save_completion_without_id_does_not_update_stale_row() -> None:
app = _app_with_plan()
now = time.time()
app._log_entries = [
{
"tool_name": "memory_save",
"status": "running",
"args": "current background save",
"result": None,
"error": None,
"start_time": now,
"duration": None,
"task_idx": 1,
},
{
"tool_name": "memory_save",
"status": "success",
"args": "stale background save",
"result": "stale save completed",
"error": None,
"start_time": now - 10,
"duration": 1.0,
"task_idx": 1,
},
]
app._subscribe()
try:
_emit_event(
MemorySaveCompletedEvent(
value="current save completed",
metadata={},
save_time_ms=2800,
source_type="unified_memory",
parent_event_id="manual-parent",
)
)
finally:
app._unsubscribe()
assert [entry["status"] for entry in app._log_entries] == [
"success",
"success",
]
assert app._log_entries[0]["args"] == "current background save"
assert app._log_entries[0]["result"] == "current save completed"
assert app._log_entries[1]["args"] == "stale background save"
assert app._log_entries[1]["result"] == "stale save completed"
def test_memory_save_failure_without_id_does_not_update_stale_row() -> None:
app = _app_with_plan()
now = time.time()
app._log_entries = [
{
"tool_name": "memory_save",
"status": "running",
"args": "current background save",
"result": None,
"error": None,
"start_time": now,
"duration": None,
"task_idx": 1,
},
{
"tool_name": "memory_save",
"status": "success",
"args": "stale background save",
"result": "stale save completed",
"error": None,
"start_time": now - 10,
"duration": 1.0,
"task_idx": 1,
},
]
app._subscribe()
try:
_emit_event(
MemorySaveFailedEvent(
value="current save failed",
metadata={},
error="embedding connection failed",
source_type="unified_memory",
parent_event_id="manual-parent",
)
)
finally:
app._unsubscribe()
assert [entry["status"] for entry in app._log_entries] == ["error", "success"]
assert app._log_entries[0]["args"] == "current background save"
assert app._log_entries[0]["error"] == "embedding connection failed"
assert app._log_entries[1]["args"] == "stale background save"
assert app._log_entries[1]["result"] == "stale save completed"
assert app._log_entries[1]["error"] is None
assert app._log_expanded == {0}
def test_memory_save_payloads_are_truncated_in_activity_log() -> None:
app = _app_with_plan()
long_args = "a" * (_LOG_ARGS_TEXT_LIMIT + 10)
long_result = "r" * (_LOG_RESULT_TEXT_LIMIT + 10)
app._subscribe()
try:
_emit_event(
MemorySaveStartedEvent(
value=long_args,
metadata={},
source_type="unified_memory",
)
)
_emit_event(
MemorySaveCompletedEvent(
value=long_result,
metadata={},
save_time_ms=8300,
source_type="unified_memory",
)
)
finally:
app._unsubscribe()
assert len(app._log_entries[0]["args"]) == _LOG_ARGS_TEXT_LIMIT
assert app._log_entries[0]["args"].endswith(_LOG_TRUNCATION_SUFFIX)
assert len(app._log_entries[0]["result"]) == _LOG_RESULT_TEXT_LIMIT
assert app._log_entries[0]["result"].endswith(_LOG_TRUNCATION_SUFFIX)
def test_starting_next_tool_does_not_timeout_memory_save() -> None:
app = _app_with_plan()
app._subscribe()
try:
_emit_event(
MemorySaveStartedEvent(
value="9 memories (background)",
metadata={},
source_type="unified_memory",
)
)
_emit_event(
ToolUsageStartedEvent(
tool_name="read_website_content",
tool_args={"url": "https://example.com"},
)
)
finally:
app._unsubscribe()
assert app._log_entries[0]["tool_name"] == "memory_save"
assert app._log_entries[0]["status"] == "running"
assert app._log_entries[0]["error"] is None
assert app._log_entries[1]["tool_name"] == "read_website_content"
assert app._log_entries[1]["status"] == "running"
def test_tool_failure_does_not_override_successful_plan_step_completion() -> None:
app = _app_with_plan()
app._subscribe()
@@ -480,6 +926,187 @@ async def test_crew_done_does_not_mark_unfinished_tool_successful() -> None:
assert app._plan_step_status == {1: "failed", 2: "done", 3: "done"}
@pytest.mark.asyncio
async def test_crew_done_does_not_timeout_memory_save() -> None:
app = _app_with_plan()
async with app.run_test(size=(100, 40)) as pilot:
app._log_entries = [
{
"tool_name": "memory_save",
"status": "running",
"args": "9 memories (background)",
"result": None,
"error": None,
"start_time": time.time() - 8,
"duration": None,
"task_idx": 1,
},
{
"tool_name": "search",
"status": "running",
"args": '{"query": "CrewAI"}',
"result": None,
"error": None,
"start_time": time.time() - 2,
"duration": None,
"task_idx": 1,
},
]
app._on_crew_done("final output")
await pilot.pause()
assert app._log_entries[0]["status"] == "running"
assert app._log_entries[0]["error"] is None
assert app._log_entries[1]["status"] == "timeout"
assert app._log_entries[1]["error"] == "No result received before crew completed"
@pytest.mark.asyncio
async def test_crew_done_keeps_memory_save_subscription_until_completion(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(
"crewai_cli.crew_run_tui._MEMORY_SAVE_DRAIN_GRACE_SECONDS", 0.05
)
app = _app_with_plan()
auto_unsubscribed = False
async with app.run_test(size=(100, 40)) as pilot:
try:
assert app._event_handlers
started_event = MemorySaveStartedEvent(
value="9 memories (background)",
metadata={},
source_type="unified_memory",
)
_emit_event(started_event)
app._on_crew_done("final output")
await pilot.pause()
assert app._log_entries[0]["status"] == "running"
assert app._event_handlers
_emit_event(
MemorySaveCompletedEvent(
value="9 memories saved",
metadata={},
save_time_ms=8300,
source_type="unified_memory",
started_event_id=started_event.event_id,
)
)
await pilot.pause()
assert app._event_handlers
await pilot.pause(0.08)
auto_unsubscribed = not app._event_handlers
finally:
app._unsubscribe()
assert app._log_entries[0]["tool_name"] == "memory_save"
assert app._log_entries[0]["status"] == "success"
assert app._log_entries[0]["result"] == "9 memories saved"
assert app._log_entries[0]["error"] is None
assert app._log_entries[0]["duration"] == 8.3
assert auto_unsubscribed is True
@pytest.mark.asyncio
async def test_crew_done_waits_for_queued_memory_save_events(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(
"crewai_cli.crew_run_tui._MEMORY_SAVE_DRAIN_GRACE_SECONDS", 0.05
)
app = _app_with_plan()
auto_unsubscribed = False
async with app.run_test(size=(100, 40)) as pilot:
try:
assert app._event_handlers
app._on_crew_done("final output")
assert app._event_handlers
started_event = MemorySaveStartedEvent(
value="9 memories (background)",
metadata={},
source_type="unified_memory",
parent_event_id="manual-parent",
)
_emit_event(started_event)
await pilot.pause()
assert app._log_entries[0]["tool_name"] == "memory_save"
assert app._log_entries[0]["status"] == "running"
_emit_event(
MemorySaveCompletedEvent(
value="9 memories saved",
metadata={},
save_time_ms=8300,
source_type="unified_memory",
parent_event_id="manual-parent",
started_event_id=started_event.event_id,
)
)
await pilot.pause()
assert app._event_handlers
await pilot.pause(0.08)
auto_unsubscribed = not app._event_handlers
finally:
app._unsubscribe()
assert app._log_entries[0]["tool_name"] == "memory_save"
assert app._log_entries[0]["status"] == "success"
assert app._log_entries[0]["args"] == "9 memories (background)"
assert app._log_entries[0]["result"] == "9 memories saved"
assert app._log_entries[0]["error"] is None
assert app._log_entries[0]["duration"] == 8.3
assert auto_unsubscribed is True
@pytest.mark.asyncio
async def test_crew_failed_does_not_timeout_memory_save() -> None:
app = _app_with_plan()
async with app.run_test(size=(100, 40)) as pilot:
app._log_entries = [
{
"tool_name": "memory_save",
"status": "running",
"args": "9 memories (background)",
"result": None,
"error": None,
"start_time": time.time() - 8,
"duration": None,
"task_idx": 1,
},
{
"tool_name": "search",
"status": "running",
"args": '{"query": "CrewAI"}',
"result": None,
"error": None,
"start_time": time.time() - 2,
"duration": None,
"task_idx": 1,
},
]
app._on_crew_failed("boom")
await pilot.pause()
assert app._log_entries[0]["status"] == "running"
assert app._log_entries[0]["error"] is None
assert app._log_entries[1]["status"] == "error"
assert app._log_entries[1]["error"] == "No result received before crew failed"
def test_streamed_step_observation_updates_named_step_only() -> None:
app = _app_with_plan()

View File

@@ -0,0 +1,248 @@
from __future__ import annotations
from pathlib import Path
import subprocess
import click
import pytest
from click.testing import CliRunner
from crewai_cli.cli import flow_run
import crewai_cli.plot_flow as plot_flow_module
FLOW_YAML = """\
schema: crewai.flow/v1
name: TestFlow
config:
suppress_flow_events: true
methods:
begin:
start: true
do:
call: expression
expr: "'AI'"
"""
def _write_flow_project(project_root: Path) -> None:
(project_root / "flow.yaml").write_text(FLOW_YAML, encoding="utf-8")
(project_root / "pyproject.toml").write_text(
'[project]\nname = "demo"\n\n'
'[tool.crewai]\ntype = "flow"\ndefinition = "flow.yaml"\n',
encoding="utf-8",
)
def test_flow_kickoff_runs_configured_declarative_definition(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
_write_flow_project(tmp_path)
monkeypatch.chdir(tmp_path)
monkeypatch.setenv("UV_RUN_RECURSION_DEPTH", "1")
result = CliRunner().invoke(flow_run)
assert result.exit_code == 0
assert (
"The command 'crewai flow kickoff' is deprecated. Use 'crewai run' instead."
in result.output
)
assert "AI\n" in result.output
assert "Running the Flow" not in result.output
def test_plot_flow_runs_configured_declarative_definition(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
_write_flow_project(tmp_path)
monkeypatch.chdir(tmp_path)
monkeypatch.setenv("UV_RUN_RECURSION_DEPTH", "1")
plot_flow_module.plot_flow()
def test_flow_kickoff_delegates_to_run_crew(
monkeypatch: pytest.MonkeyPatch,
) -> None:
calls = []
monkeypatch.setattr(
"crewai_cli.cli.run_crew",
lambda **kwargs: calls.append(kwargs),
)
result = CliRunner().invoke(flow_run)
assert result.exit_code == 0
assert calls == [
{"trained_agents_file": None, "definition": None, "inputs": None},
]
def test_plot_flow_keeps_python_entrypoint_without_definition(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
subprocess_calls = []
monkeypatch.chdir(tmp_path)
monkeypatch.setattr(
subprocess,
"run",
lambda command, **kwargs: subprocess_calls.append((command, kwargs)),
)
plot_flow_module.plot_flow()
assert subprocess_calls == [
(
["uv", "run", "plot"],
{"capture_output": False, "text": True, "check": True},
)
]
def test_configured_project_declarative_flow(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
monkeypatch.chdir(tmp_path)
definition_path = tmp_path / "flow.yaml"
definition_path.write_text(FLOW_YAML, encoding="utf-8")
(tmp_path / "pyproject.toml").write_text(
'[tool.crewai]\ntype = "flow"\ndefinition = " flow.yaml "\n',
encoding="utf-8",
)
from crewai_cli.run_declarative_flow import configured_project_declarative_flow
assert configured_project_declarative_flow() == definition_path.resolve()
@pytest.mark.parametrize(
("definition", "expected_error"),
[
("C:/tmp/flow.yaml", "must be relative to the project root"),
("~/flow.yaml", "must be a project-local path"),
("../flow.yaml", "must resolve inside the project root"),
],
)
def test_configured_project_declarative_flow_rejects_unsafe_paths(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
definition: str,
expected_error: str,
) -> None:
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text(
f'[tool.crewai]\ntype = "flow"\ndefinition = "{definition}"\n',
encoding="utf-8",
)
from crewai_cli.run_declarative_flow import configured_project_declarative_flow
with pytest.raises(click.UsageError) as exc_info:
configured_project_declarative_flow()
assert expected_error in exc_info.value.message
def test_configured_project_declarative_flow_allows_normalized_project_path(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
monkeypatch.chdir(tmp_path)
definition_path = tmp_path / "flow.yaml"
definition_path.write_text(FLOW_YAML, encoding="utf-8")
(tmp_path / "src").mkdir()
(tmp_path / "pyproject.toml").write_text(
'[tool.crewai]\ntype = "flow"\ndefinition = "src/../flow.yaml"\n',
encoding="utf-8",
)
from crewai_cli.run_declarative_flow import configured_project_declarative_flow
assert configured_project_declarative_flow() == definition_path.resolve()
def test_configured_project_declarative_flow_rejects_absolute_path(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
monkeypatch.chdir(tmp_path)
definition = tmp_path / "flow.yaml"
(tmp_path / "pyproject.toml").write_text(
f'[tool.crewai]\ntype = "flow"\ndefinition = "{definition.as_posix()}"\n',
encoding="utf-8",
)
from crewai_cli.run_declarative_flow import configured_project_declarative_flow
with pytest.raises(click.UsageError) as exc_info:
configured_project_declarative_flow()
assert "must be relative to the project root" in exc_info.value.message
def test_configured_project_declarative_flow_rejects_symlink_escape(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
monkeypatch.chdir(tmp_path)
outside_definition = tmp_path.parent / "outside-flow.yaml"
outside_definition.write_text(FLOW_YAML, encoding="utf-8")
link = tmp_path / "flow.yaml"
try:
link.symlink_to(outside_definition)
except (NotImplementedError, OSError) as exc:
pytest.skip(f"symlinks unavailable: {exc}")
(tmp_path / "pyproject.toml").write_text(
'[tool.crewai]\ntype = "flow"\ndefinition = "flow.yaml"\n',
encoding="utf-8",
)
from crewai_cli.run_declarative_flow import configured_project_declarative_flow
with pytest.raises(click.UsageError) as exc_info:
configured_project_declarative_flow()
assert "must resolve inside the project root" in exc_info.value.message
def test_configured_project_declarative_flow_rejects_missing_file(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text(
'[tool.crewai]\ntype = "flow"\ndefinition = "missing-flow.yaml"\n',
encoding="utf-8",
)
from crewai_cli.run_declarative_flow import configured_project_declarative_flow
with pytest.raises(click.UsageError) as exc_info:
configured_project_declarative_flow()
assert "must point to an existing file" in exc_info.value.message
def test_configured_project_declarative_flow_rejects_directory(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
monkeypatch.chdir(tmp_path)
(tmp_path / "flow.yaml").mkdir()
(tmp_path / "pyproject.toml").write_text(
'[tool.crewai]\ntype = "flow"\ndefinition = "flow.yaml"\n',
encoding="utf-8",
)
from crewai_cli.run_declarative_flow import configured_project_declarative_flow
with pytest.raises(click.UsageError) as exc_info:
configured_project_declarative_flow()
assert "must point to a regular file" in exc_info.value.message

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
import sys
from crewai_cli import kickoff_flow
def test_loads_conversational_flow_from_kickoff_script(tmp_path, monkeypatch) -> None:
package_dir = tmp_path / "src" / "demo_chat"
package_dir.mkdir(parents=True)
(package_dir / "__init__.py").write_text("")
(package_dir / "main.py").write_text(
"\n".join(
[
"from crewai.flow import Flow",
"",
"class DemoChatFlow(Flow):",
" conversational = True",
]
)
)
(tmp_path / "pyproject.toml").write_text(
"\n".join(
[
"[project]",
'name = "demo-chat"',
"[project.scripts]",
'kickoff = "demo_chat.main:kickoff"',
]
)
)
monkeypatch.chdir(tmp_path)
sys.modules.pop("demo_chat.main", None)
sys.modules.pop("demo_chat", None)
flow = kickoff_flow._load_conversational_flow_from_kickoff_script()
assert flow is not None
assert type(flow).__name__ == "DemoChatFlow"
assert flow.conversational is True
def test_kickoff_flow_falls_back_to_uv_when_no_conversational_flow(
monkeypatch,
) -> None:
calls: list[list[str]] = []
def fake_run(command, capture_output, text, check):
calls.append(command)
class Result:
stderr = ""
return Result()
monkeypatch.setattr(
kickoff_flow, "_load_conversational_flow_from_kickoff_script", lambda: None
)
monkeypatch.setattr(kickoff_flow.subprocess, "run", fake_run)
kickoff_flow.kickoff_flow()
assert calls == [["uv", "run", "kickoff"]]

View File

@@ -5,12 +5,33 @@ from pathlib import Path
import subprocess
import sys
import click
import pytest
from crewai_core.constants import CREWAI_TRAINED_AGENTS_FILE_ENV
import crewai_cli.run_crew as run_crew_module
def test_missing_crewai_package_shows_full_install_hint(monkeypatch):
def missing_crewai_package():
raise ModuleNotFoundError("No module named 'crewai'", name="crewai")
monkeypatch.setattr(
run_crew_module, "_import_find_crew_json_file", missing_crewai_package
)
with pytest.raises(click.ClickException) as exc_info:
run_crew_module.find_crew_json_file()
message = exc_info.value.message
assert "CrewAI CLI is installed without the `crewai` package" in message
assert (
"uv tool install --force --prerelease=allow 'crewai[tools]==1.14.8a1'"
in message
)
assert "quotes are required in zsh" in message
def test_run_crew_forwards_trained_agents_file_to_json_crews(monkeypatch):
"""crewai run -f must reach JSON crews, not only classic subprocess crews."""
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: True)
@@ -547,3 +568,175 @@ def test_has_json_crew_true_without_pyproject(monkeypatch, tmp_path: Path):
(tmp_path / "crew.jsonc").write_text("{}")
assert run_crew_module._has_json_crew() is True
def test_run_crew_rejects_inputs_without_definition():
with pytest.raises(click.UsageError) as exc_info:
run_crew_module.run_crew(inputs='{"topic":"AI"}')
assert "--inputs requires --definition" in exc_info.value.message
def test_run_crew_rejects_filename_with_explicit_definition():
with pytest.raises(click.UsageError) as exc_info:
run_crew_module.run_crew(
trained_agents_file="trained.pkl",
definition="flow.yaml",
)
assert "--filename can only be used when running crews" in exc_info.value.message
def test_run_crew_runs_explicit_declarative_definition(monkeypatch, capsys):
calls = []
def fake_run_declarative_flow(definition: str, inputs: str | None = None):
calls.append((definition, inputs))
monkeypatch.setattr(
"crewai_cli.run_declarative_flow.run_declarative_flow",
fake_run_declarative_flow,
)
run_crew_module.run_crew(definition="flow.yaml", inputs='{"topic":"AI"}')
captured = capsys.readouterr()
assert "experimental" not in captured.out.lower()
assert calls == [("flow.yaml", '{"topic":"AI"}')]
def test_run_crew_runs_classic_crew_project(monkeypatch, capsys):
calls = []
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False)
monkeypatch.setattr(
run_crew_module,
"read_toml",
lambda: {"tool": {"crewai": {"type": "crew"}}},
)
monkeypatch.setattr(
run_crew_module,
"_execute_uv_script",
lambda script_name, **kwargs: calls.append((script_name, kwargs)),
)
run_crew_module.run_crew(trained_agents_file="trained.pkl")
assert capsys.readouterr().out == ""
assert calls == [
(
"run_crew",
{"entity_type": "crew", "trained_agents_file": "trained.pkl"},
)
]
def test_run_crew_runs_python_flow_project(monkeypatch, capsys):
calls = []
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False)
monkeypatch.setattr(
run_crew_module,
"read_toml",
lambda: {"tool": {"crewai": {"type": "flow"}}},
)
monkeypatch.setattr(
run_crew_module,
"_execute_uv_script",
lambda script_name, **kwargs: calls.append((script_name, kwargs)),
)
monkeypatch.setattr(
"crewai_cli.kickoff_flow._load_conversational_flow_from_kickoff_script",
lambda: None,
)
run_crew_module.run_crew()
assert capsys.readouterr().out == ""
assert calls == [("kickoff", {"entity_type": "flow"})]
def test_run_crew_runs_conversational_flow_tui(monkeypatch, capsys):
class Flow:
pass
flow = Flow()
calls = []
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False)
monkeypatch.setattr(
run_crew_module,
"read_toml",
lambda: {"tool": {"crewai": {"type": "flow"}}},
)
monkeypatch.setattr(
"crewai_cli.kickoff_flow._load_conversational_flow_from_kickoff_script",
lambda: flow,
)
monkeypatch.setattr(
"crewai_cli.kickoff_flow._run_conversational_flow_tui",
lambda loaded_flow: calls.append(loaded_flow),
)
monkeypatch.setattr(
run_crew_module,
"_execute_uv_script",
lambda *_args, **_kwargs: pytest.fail(
"conversational flows must use the TUI"
),
)
run_crew_module.run_crew()
assert capsys.readouterr().out == ""
assert calls == [flow]
def test_run_crew_rejects_filename_for_flow_project(monkeypatch):
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False)
monkeypatch.setattr(
run_crew_module,
"read_toml",
lambda: {"tool": {"crewai": {"type": "flow"}}},
)
with pytest.raises(click.UsageError) as exc_info:
run_crew_module.run_crew(trained_agents_file="trained.pkl")
assert "--filename can only be used when running crews" in exc_info.value.message
def test_run_crew_runs_configured_declarative_flow_project(
monkeypatch, tmp_path: Path, capsys
):
calls = []
monkeypatch.chdir(tmp_path)
definition_path = tmp_path / "flow.yaml"
definition_path.write_text("schema: crewai.flow/v1\n", encoding="utf-8")
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False)
monkeypatch.setattr(
run_crew_module,
"read_toml",
lambda: {
"tool": {
"crewai": {
"type": "flow",
"definition": "flow.yaml",
}
}
},
)
monkeypatch.setattr(
"crewai_cli.run_declarative_flow.run_declarative_flow_in_project_env",
lambda definition, inputs=None: calls.append((definition, inputs)),
)
monkeypatch.setattr(
run_crew_module,
"_execute_uv_script",
lambda *_args, **_kwargs: pytest.fail("declarative flows must not run kickoff"),
)
run_crew_module.run_crew()
assert capsys.readouterr().out == ""
assert calls == [(definition_path.resolve(), None)]

View File

@@ -0,0 +1,148 @@
from __future__ import annotations
from pathlib import Path
import pytest
import crewai_cli.run_declarative_flow as run_declarative_flow_module
FLOW_YAML = """\
schema: crewai.flow/v1
name: TestFlow
config:
suppress_flow_events: true
methods:
begin:
start: true
do:
call: expression
expr: state.topic
"""
def test_run_declarative_flow_reads_definition_file(
tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
definition_path = tmp_path / "flow.yaml"
definition_path.write_text(FLOW_YAML, encoding="utf-8")
run_declarative_flow_module.run_declarative_flow(
str(definition_path), '{"topic":"AI"}'
)
assert capsys.readouterr().out == "AI\n"
def test_run_declarative_flow_rejects_non_object_inputs(
tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
definition_path = tmp_path / "flow.yaml"
definition_path.write_text(FLOW_YAML, encoding="utf-8")
with pytest.raises(SystemExit):
run_declarative_flow_module.run_declarative_flow(
str(definition_path), '["not", "an", "object"]'
)
assert "Invalid --inputs JSON: expected an object." in capsys.readouterr().err
def test_run_declarative_flow_reports_missing_file(
capsys: pytest.CaptureFixture[str],
) -> None:
with pytest.raises(SystemExit):
run_declarative_flow_module.run_declarative_flow("missing-flow.yaml")
assert (
"Invalid --definition path: missing-flow.yaml does not exist."
in capsys.readouterr().err
)
def test_run_declarative_flow_reports_empty_file(
tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
definition_path = tmp_path / "flow.yaml"
definition_path.write_text(" \n", encoding="utf-8")
with pytest.raises(SystemExit):
run_declarative_flow_module.run_declarative_flow(str(definition_path))
assert "Flow declaration file is empty" in capsys.readouterr().err
@pytest.mark.parametrize(
"contents, expected_error",
[
("[]\n", "Flow declaration must contain a mapping"),
("schema: crewai.flow/v1\nmethods: {}\n", "Field required"),
],
)
def test_load_declarative_flow_reports_invalid_declarations(
tmp_path: Path,
capsys: pytest.CaptureFixture[str],
contents: str,
expected_error: str,
) -> None:
definition_path = tmp_path / "flow.yaml"
definition_path.write_text(contents, encoding="utf-8")
with pytest.raises(SystemExit) as exc_info:
run_declarative_flow_module.load_declarative_flow(str(definition_path))
assert exc_info.value.code == 1
stderr = capsys.readouterr().err
assert f"Unable to read --definition path {definition_path}:" in stderr
assert expected_error in stderr
def test_run_declarative_flow_in_project_env_uses_uv(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
subprocess_calls = []
monkeypatch.chdir(tmp_path)
monkeypatch.delenv("UV_RUN_RECURSION_DEPTH", raising=False)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
monkeypatch.setattr(
run_declarative_flow_module,
"build_env_with_all_tool_credentials",
lambda: {"EXISTING": "value"},
)
monkeypatch.setattr(
run_declarative_flow_module.subprocess,
"run",
lambda command, **kwargs: subprocess_calls.append((command, kwargs)),
)
run_declarative_flow_module.run_declarative_flow_in_project_env("flow.yaml")
assert subprocess_calls == [
(
["uv", "run", "crewai", "run"],
{
"capture_output": False,
"text": True,
"check": True,
"env": {"EXISTING": "value"},
},
)
]
def test_run_declarative_flow_in_process_inside_uv(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
capsys: pytest.CaptureFixture[str],
) -> None:
monkeypatch.chdir(tmp_path)
monkeypatch.setenv("UV_RUN_RECURSION_DEPTH", "1")
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
(tmp_path / "flow.yaml").write_text(FLOW_YAML, encoding="utf-8")
run_declarative_flow_module.run_declarative_flow_in_project_env(
"flow.yaml", '{"topic":"AI"}'
)
assert capsys.readouterr().out == "AI\n"

View File

@@ -1,156 +0,0 @@
from __future__ import annotations
import json
import sys
import types
import pytest
import yaml
from crewai_cli.run_flow_definition import run_flow_definition
class _FakeFlow:
def __init__(self, definition):
self.definition = definition
def kickoff(self, inputs=None):
return {
"flow": self.definition["name"],
"inputs": inputs or {},
}
class _FakeFlowFactory:
@classmethod
def from_definition(cls, definition):
return _FakeFlow(definition)
class _FakeFlowDefinition:
@classmethod
def from_yaml(cls, source):
return yaml.safe_load(source)
@classmethod
def from_json(cls, source):
return json.loads(source)
@pytest.fixture
def fake_flow_runtime(monkeypatch):
crewai_module = types.ModuleType("crewai")
flow_package = types.ModuleType("crewai.flow")
flow_module = types.ModuleType("crewai.flow.flow")
flow_definition_module = types.ModuleType("crewai.flow.flow_definition")
flow_module.Flow = _FakeFlowFactory
flow_definition_module.FlowDefinition = _FakeFlowDefinition
monkeypatch.setitem(sys.modules, "crewai", crewai_module)
monkeypatch.setitem(sys.modules, "crewai.flow", flow_package)
monkeypatch.setitem(sys.modules, "crewai.flow.flow", flow_module)
monkeypatch.setitem(
sys.modules, "crewai.flow.flow_definition", flow_definition_module
)
def _captured_json(capsys):
return json.loads(capsys.readouterr().out)
def test_run_flow_definition_reads_definition_file(
tmp_path, capsys, fake_flow_runtime
):
definition_path = tmp_path / "flow.yaml"
definition_path.write_text("schema: crewai.flow/v1\nname: TestFlow\n")
run_flow_definition(str(definition_path), '{"topic":"AI"}')
assert _captured_json(capsys) == {
"flow": "TestFlow",
"inputs": {"topic": "AI"},
}
@pytest.mark.parametrize(
("definition_source", "expected_flow_name"),
[
pytest.param(
"schema: crewai.flow/v1\nname: InlineFlow\n",
"InlineFlow",
id="inline-yaml",
),
pytest.param(
'{"schema":"crewai.flow/v1","name":"InlineJsonFlow"}',
"InlineJsonFlow",
id="inline-json",
),
pytest.param(
'{"schema":"crewai.flow/v1","name":"' + ("JsonFlow" * 500) + '"}',
"JsonFlow" * 500,
id="large-inline-json",
),
],
)
def test_run_flow_definition_accepts_inline_definitions(
definition_source, expected_flow_name, capsys, fake_flow_runtime
):
run_flow_definition(definition_source)
assert _captured_json(capsys) == {"flow": expected_flow_name, "inputs": {}}
@pytest.mark.parametrize(
("filename", "definition_source", "expected_flow_name"),
[
pytest.param(
"flow.yaml",
"schema: crewai.flow/v1\nname: YamlFileFlow\n",
"YamlFileFlow",
id="yaml-file",
),
pytest.param(
"flow.json",
'{"schema":"crewai.flow/v1","name":"JsonFlow"}',
"JsonFlow",
id="json-file",
),
],
)
def test_run_flow_definition_accepts_definition_files(
filename, definition_source, expected_flow_name, tmp_path, capsys, fake_flow_runtime
):
definition_path = tmp_path / filename
definition_path.write_text(definition_source)
run_flow_definition(str(definition_path))
assert _captured_json(capsys) == {"flow": expected_flow_name, "inputs": {}}
def test_run_flow_definition_rejects_non_object_inputs(fake_flow_runtime, capsys):
with pytest.raises(SystemExit):
run_flow_definition("name: TestFlow", '["not", "an", "object"]')
assert "Invalid --inputs JSON: expected an object." in capsys.readouterr().err
def test_run_flow_definition_reports_unreadable_file(
monkeypatch, tmp_path, capsys, fake_flow_runtime
):
definition_path = tmp_path / "flow.yaml"
definition_path.write_text("schema: crewai.flow/v1\nname: TestFlow\n")
def raise_permission_error(self, *args, **kwargs):
raise PermissionError("no access")
monkeypatch.setattr("pathlib.Path.read_text", raise_permission_error)
with pytest.raises(SystemExit):
run_flow_definition(str(definition_path))
err = capsys.readouterr().err
assert "Unable to read --definition path" in err
assert str(definition_path) in err
assert "no access" in err