Add experimental crewai run --definition for flows (#6147)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

Let users run a Flow from a Flow Definition YAML file or inline string
without writing Python, passing kickoff inputs as `--inputs` JSON. The
flag is gated behind an experimental warning since the definition format
may still change.
This commit is contained in:
Vini Brasil
2026-06-12 22:31:05 -07:00
committed by GitHub
parent 6ad821b157
commit d80719df81
4 changed files with 338 additions and 2 deletions

View File

@@ -26,6 +26,7 @@ from crewai_cli.remote_template.main import TemplateCommand
from crewai_cli.replay_from_task import replay_task_command
from crewai_cli.reset_memories_command import reset_memories_command
from crewai_cli.run_crew import run_crew
from crewai_cli.run_flow_definition import run_flow_definition
from crewai_cli.settings.main import SettingsCommand
from crewai_cli.task_outputs import load_task_outputs
from crewai_cli.tools.main import ToolCommand
@@ -398,8 +399,36 @@ def install(context: click.Context) -> None:
"CREWAI_TRAINED_AGENTS_FILE."
),
)
def run(trained_agents_file: str | None) -> None:
"""Run the Crew."""
@click.option(
"--definition",
type=str,
default=None,
help=(
"Experimental: path to a Flow Definition YAML/JSON file, "
"or an inline YAML/JSON string."
),
)
@click.option(
"--inputs",
type=str,
default=None,
help='Experimental: JSON object passed to flow.kickoff(), e.g. \'{"topic":"AI"}\'.',
)
def run(
trained_agents_file: str | None, definition: str | None, inputs: str | None
) -> None:
"""Run the Crew or Flow."""
if inputs is not None and definition is None:
raise click.UsageError("--inputs requires --definition")
if definition is not None:
click.secho(
"Warning: `crewai run --definition` is experimental and may change without notice.",
fg="yellow",
)
run_flow_definition(definition=definition, inputs=inputs)
return
run_crew(trained_agents_file=trained_agents_file)

View File

@@ -0,0 +1,113 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import click
def run_flow_definition(definition: str, inputs: str | None = None) -> None:
"""Run a flow from a Flow Definition YAML/JSON string or file path."""
try:
from crewai.flow.flow import Flow
from crewai.flow.flow_definition import FlowDefinition
except ImportError as exc:
click.echo(
"Running flows from definitions requires the full crewai package.",
err=True,
)
raise SystemExit(1) from exc
parsed_inputs = _parse_inputs(inputs)
definition_source = _read_definition_source(definition)
try:
flow_definition = _parse_flow_definition(FlowDefinition, definition_source)
flow = Flow.from_definition(flow_definition)
result = flow.kickoff(inputs=parsed_inputs)
except Exception as exc:
click.echo(
f"An error occurred while running the flow definition: {exc}", err=True
)
raise SystemExit(1) from exc
click.echo(_format_result(result))
def _parse_inputs(inputs: str | None) -> dict[str, Any] | None:
if inputs is None:
return None
try:
parsed = json.loads(inputs)
except json.JSONDecodeError as exc:
click.echo(f"Invalid --inputs JSON: {exc}", err=True)
raise SystemExit(1) from exc
if not isinstance(parsed, dict):
click.echo("Invalid --inputs JSON: expected an object.", err=True)
raise SystemExit(1)
return parsed
def _read_definition_source(definition: str) -> str:
path = Path(definition).expanduser()
try:
is_file = path.is_file()
except OSError as exc:
if _looks_like_inline_definition(definition):
return definition
click.echo(f"Invalid --definition path: {definition} ({exc})", err=True)
raise SystemExit(1) from exc
if is_file:
try:
return path.read_text(encoding="utf-8")
except (OSError, UnicodeError) as exc:
click.echo(
f"Unable to read --definition path {path}: {exc}",
err=True,
)
raise SystemExit(1) from exc
try:
if path.exists():
click.echo(
f"Invalid --definition path: {definition} is not a file.", err=True
)
raise SystemExit(1)
except OSError as exc:
click.echo(f"Invalid --definition path: {definition} ({exc})", err=True)
raise SystemExit(1) from exc
return definition
def _looks_like_inline_definition(definition: str) -> bool:
stripped = definition.lstrip()
return "\n" in definition or stripped.startswith(("{", "---")) or ":" in stripped
def _parse_flow_definition(flow_definition_cls: type[Any], source: str) -> Any:
if _looks_like_json(source):
return flow_definition_cls.from_json(source)
return flow_definition_cls.from_yaml(source)
def _looks_like_json(source: str) -> bool:
stripped = source.lstrip()
return stripped.startswith("{")
def _format_result(result: Any) -> str:
raw_result = getattr(result, "raw", result)
if isinstance(raw_result, str):
return raw_result
try:
return json.dumps(raw_result, default=str)
except TypeError:
return str(raw_result)