mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-01 13:18:10 +00:00
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Let users run a Flow from a Flow Definition YAML file or inline string without writing Python, passing kickoff inputs as `--inputs` JSON. The flag is gated behind an experimental warning since the definition format may still change.
114 lines
3.3 KiB
Python
114 lines
3.3 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import click
|
|
|
|
|
|
def run_flow_definition(definition: str, inputs: str | None = None) -> None:
|
|
"""Run a flow from a Flow Definition YAML/JSON string or file path."""
|
|
try:
|
|
from crewai.flow.flow import Flow
|
|
from crewai.flow.flow_definition import FlowDefinition
|
|
except ImportError as exc:
|
|
click.echo(
|
|
"Running flows from definitions requires the full crewai package.",
|
|
err=True,
|
|
)
|
|
raise SystemExit(1) from exc
|
|
|
|
parsed_inputs = _parse_inputs(inputs)
|
|
definition_source = _read_definition_source(definition)
|
|
|
|
try:
|
|
flow_definition = _parse_flow_definition(FlowDefinition, definition_source)
|
|
flow = Flow.from_definition(flow_definition)
|
|
result = flow.kickoff(inputs=parsed_inputs)
|
|
except Exception as exc:
|
|
click.echo(
|
|
f"An error occurred while running the flow definition: {exc}", err=True
|
|
)
|
|
raise SystemExit(1) from exc
|
|
|
|
click.echo(_format_result(result))
|
|
|
|
|
|
def _parse_inputs(inputs: str | None) -> dict[str, Any] | None:
|
|
if inputs is None:
|
|
return None
|
|
|
|
try:
|
|
parsed = json.loads(inputs)
|
|
except json.JSONDecodeError as exc:
|
|
click.echo(f"Invalid --inputs JSON: {exc}", err=True)
|
|
raise SystemExit(1) from exc
|
|
|
|
if not isinstance(parsed, dict):
|
|
click.echo("Invalid --inputs JSON: expected an object.", err=True)
|
|
raise SystemExit(1)
|
|
|
|
return parsed
|
|
|
|
|
|
def _read_definition_source(definition: str) -> str:
|
|
path = Path(definition).expanduser()
|
|
try:
|
|
is_file = path.is_file()
|
|
except OSError as exc:
|
|
if _looks_like_inline_definition(definition):
|
|
return definition
|
|
click.echo(f"Invalid --definition path: {definition} ({exc})", err=True)
|
|
raise SystemExit(1) from exc
|
|
|
|
if is_file:
|
|
try:
|
|
return path.read_text(encoding="utf-8")
|
|
except (OSError, UnicodeError) as exc:
|
|
click.echo(
|
|
f"Unable to read --definition path {path}: {exc}",
|
|
err=True,
|
|
)
|
|
raise SystemExit(1) from exc
|
|
|
|
try:
|
|
if path.exists():
|
|
click.echo(
|
|
f"Invalid --definition path: {definition} is not a file.", err=True
|
|
)
|
|
raise SystemExit(1)
|
|
except OSError as exc:
|
|
click.echo(f"Invalid --definition path: {definition} ({exc})", err=True)
|
|
raise SystemExit(1) from exc
|
|
|
|
return definition
|
|
|
|
|
|
def _looks_like_inline_definition(definition: str) -> bool:
|
|
stripped = definition.lstrip()
|
|
return "\n" in definition or stripped.startswith(("{", "---")) or ":" in stripped
|
|
|
|
|
|
def _parse_flow_definition(flow_definition_cls: type[Any], source: str) -> Any:
|
|
if _looks_like_json(source):
|
|
return flow_definition_cls.from_json(source)
|
|
|
|
return flow_definition_cls.from_yaml(source)
|
|
|
|
|
|
def _looks_like_json(source: str) -> bool:
|
|
stripped = source.lstrip()
|
|
return stripped.startswith("{")
|
|
|
|
|
|
def _format_result(result: Any) -> str:
|
|
raw_result = getattr(result, "raw", result)
|
|
if isinstance(raw_result, str):
|
|
return raw_result
|
|
|
|
try:
|
|
return json.dumps(raw_result, default=str)
|
|
except TypeError:
|
|
return str(raw_result)
|