diff --git a/lib/cli/src/crewai_cli/cli.py b/lib/cli/src/crewai_cli/cli.py index b153885f3..1a64a74f3 100644 --- a/lib/cli/src/crewai_cli/cli.py +++ b/lib/cli/src/crewai_cli/cli.py @@ -40,12 +40,12 @@ def replay_task_command(*args: Any, **kwargs: Any) -> Any: return _replay_task_command(*args, **kwargs) -def run_flow_definition(*args: Any, **kwargs: Any) -> Any: - from crewai_cli.run_flow_definition import ( - run_flow_definition as _run_flow_definition, +def run_declarative_flow(*args: Any, **kwargs: Any) -> Any: + from crewai_cli.run_declarative_flow import ( + run_declarative_flow as _run_declarative_flow, ) - return _run_flow_definition(*args, **kwargs) + return _run_declarative_flow(*args, **kwargs) def run_crew(*args: Any, **kwargs: Any) -> Any: @@ -155,12 +155,18 @@ def uv(uv_args: tuple[str, ...]) -> None: is_flag=True, help="Use classic Python/YAML project structure instead of JSON", ) +@click.option( + "--declarative", + is_flag=True, + help="Create a declarative Flow project instead of a Python Flow project", +) def create( type: str | None, name: str | None, provider: str | None, skip_provider: bool = False, classic: bool = False, + declarative: bool = False, ) -> None: """Create a new crew, or flow.""" dmn_mode = is_dmn_mode_enabled() @@ -194,6 +200,8 @@ def create( if dmn_mode: skip_provider = True if type == "crew": + if declarative: + raise click.UsageError("--declarative can only be used with flow projects") if classic: from crewai_cli.create_crew import create_crew @@ -205,7 +213,7 @@ def create( elif type == "flow": from crewai_cli.create_flow import create_flow - create_flow(name) + create_flow(name, declarative=declarative) else: click.secho("Error: Invalid type. Must be 'crew' or 'flow'.", fg="red") @@ -512,10 +520,7 @@ def install(context: click.Context) -> None: "--definition", type=str, default=None, - help=( - "Experimental: path to a Flow Definition YAML/JSON file, " - "or an inline YAML/JSON string." - ), + help="Experimental: path to a declarative Flow YAML/JSON file.", ) @click.option( "--inputs", @@ -537,7 +542,7 @@ def run( "Warning: `crewai run --definition` is experimental and may change without notice.", fg="yellow", ) - run_flow_definition(definition=definition, inputs=inputs) + run_declarative_flow(definition=definition, inputs=inputs) return run_crew(trained_agents_file=trained_agents_file) diff --git a/lib/cli/src/crewai_cli/create_flow.py b/lib/cli/src/crewai_cli/create_flow.py index 5042d7679..adaa3d3bf 100644 --- a/lib/cli/src/crewai_cli/create_flow.py +++ b/lib/cli/src/crewai_cli/create_flow.py @@ -5,7 +5,10 @@ import click from crewai_core.telemetry import Telemetry -def create_flow(name: str) -> None: +DECLARATIVE_FLOW_FOLDERS = ("crews", "tools", "knowledge", "skills") + + +def create_flow(name: str, *, declarative: bool = False) -> None: """Create a new flow.""" folder_name = name.replace(" ", "_").replace("-", "_").lower() class_name = name.replace("_", " ").replace("-", " ").title().replace(" ", "") @@ -20,6 +23,17 @@ def create_flow(name: str) -> None: telemetry = Telemetry() telemetry.flow_creation_span(class_name) + if declarative: + _create_declarative_flow(name, class_name, folder_name, project_root) + else: + _create_python_flow(name, class_name, folder_name, project_root) + + click.secho(f"Flow {name} created successfully!", fg="green", bold=True) + + +def _create_python_flow( + name: str, class_name: str, folder_name: str, project_root: Path +) -> None: (project_root / "src" / folder_name).mkdir(parents=True) (project_root / "src" / folder_name / "crews").mkdir(parents=True) (project_root / "src" / folder_name / "tools").mkdir(parents=True) @@ -92,4 +106,41 @@ def create_flow(name: str) -> None: fg="yellow", ) - click.secho(f"Flow {name} created successfully!", fg="green", bold=True) + +def _create_declarative_flow( + name: str, class_name: str, folder_name: str, project_root: Path +) -> None: + project_root.mkdir(parents=True) + package_root = project_root / "src" / folder_name + package_root.mkdir(parents=True) + for folder in DECLARATIVE_FLOW_FOLDERS: + (package_root / folder).mkdir() + + package_dir = Path(__file__).parent + templates_dir = package_dir / "templates" / "declarative_flow" + + agents_md_src = package_dir / "templates" / "AGENTS.md" + if agents_md_src.exists(): + shutil.copy2(agents_md_src, project_root / "AGENTS.md") + + for src_file in templates_dir.rglob("*"): + if not src_file.is_file(): + continue + + relative_path = src_file.relative_to(templates_dir) + dst_file = ( + project_root / relative_path + if relative_path.name in {".gitignore", "README.md", "pyproject.toml"} + else package_root / relative_path + ) + dst_file.parent.mkdir(parents=True, exist_ok=True) + content = src_file.read_text(encoding="utf-8") + content = content.replace("{{name}}", name) + content = content.replace("{{flow_name}}", class_name) + content = content.replace("{{folder_name}}", folder_name) + dst_file.write_text(content, encoding="utf-8") + + (project_root / ".env").write_text("OPENAI_API_KEY=YOUR_API_KEY", encoding="utf-8") + (package_root / "__init__.py").write_text("", encoding="utf-8") + for folder in DECLARATIVE_FLOW_FOLDERS: + (package_root / folder / ".gitkeep").write_text("", encoding="utf-8") diff --git a/lib/cli/src/crewai_cli/kickoff_flow.py b/lib/cli/src/crewai_cli/kickoff_flow.py index b5bc0d81e..ff5f317dd 100644 --- a/lib/cli/src/crewai_cli/kickoff_flow.py +++ b/lib/cli/src/crewai_cli/kickoff_flow.py @@ -5,19 +5,27 @@ import click def kickoff_flow() -> None: """ - Kickoff the flow by running a command in the UV environment. + Kickoff the flow from declarative config or the Python UV entrypoint. """ - command = ["uv", "run", "kickoff"] + from crewai_cli.run_declarative_flow import ( + configured_project_declarative_flow, + run_declarative_flow_in_project_env, + ) - try: - result = subprocess.run(command, capture_output=False, text=True, check=True) # noqa: S603 + if definition := configured_project_declarative_flow(): + run_declarative_flow_in_project_env(definition=definition) + else: + command = ["uv", "run", "kickoff"] - if result.stderr: - click.echo(result.stderr, err=True) + try: + subprocess.run( # noqa: S603 + command, capture_output=False, text=True, check=True + ) - except subprocess.CalledProcessError as e: - click.echo(f"An error occurred while running the flow: {e}", err=True) - click.echo(e.output, err=True) + except subprocess.CalledProcessError as e: + click.echo(f"An error occurred while running the flow: {e}", err=True) + raise SystemExit(1) from e - except Exception as e: - click.echo(f"An unexpected error occurred: {e}", err=True) + except Exception as e: + click.echo(f"An unexpected error occurred: {e}", err=True) + raise SystemExit(1) from e diff --git a/lib/cli/src/crewai_cli/plot_flow.py b/lib/cli/src/crewai_cli/plot_flow.py index d97ccba77..d79bdc58b 100644 --- a/lib/cli/src/crewai_cli/plot_flow.py +++ b/lib/cli/src/crewai_cli/plot_flow.py @@ -5,19 +5,27 @@ import click def plot_flow() -> None: """ - Plot the flow by running a command in the UV environment. + Plot the flow from declarative config or the Python UV entrypoint. """ - command = ["uv", "run", "plot"] + from crewai_cli.run_declarative_flow import ( + configured_project_declarative_flow, + plot_declarative_flow_in_project_env, + ) - try: - result = subprocess.run(command, capture_output=False, text=True, check=True) # noqa: S603 + if definition := configured_project_declarative_flow(): + plot_declarative_flow_in_project_env(definition) + else: + command = ["uv", "run", "plot"] - if result.stderr: - click.echo(result.stderr, err=True) + try: + subprocess.run( # noqa: S603 + command, capture_output=False, text=True, check=True + ) - except subprocess.CalledProcessError as e: - click.echo(f"An error occurred while plotting the flow: {e}", err=True) - click.echo(e.output, err=True) + except subprocess.CalledProcessError as e: + click.echo(f"An error occurred while plotting the flow: {e}", err=True) + raise SystemExit(1) from e - except Exception as e: - click.echo(f"An unexpected error occurred: {e}", err=True) + except Exception as e: + click.echo(f"An unexpected error occurred: {e}", err=True) + raise SystemExit(1) from e diff --git a/lib/cli/src/crewai_cli/run_declarative_flow.py b/lib/cli/src/crewai_cli/run_declarative_flow.py new file mode 100644 index 000000000..af7431b02 --- /dev/null +++ b/lib/cli/src/crewai_cli/run_declarative_flow.py @@ -0,0 +1,212 @@ +from __future__ import annotations + +import json +from pathlib import Path +import subprocess +from typing import Any + +import click + +from crewai_cli.utils import build_env_with_all_tool_credentials + + +def run_declarative_flow_in_project_env( + definition: str, inputs: str | None = None +) -> None: + """Run a declarative flow inside the project's Python environment.""" + if is_declarative_flow_project_env() or not _has_project_file(): + run_declarative_flow(definition=definition, inputs=inputs) + return + + if inputs is not None: + raise click.UsageError("--inputs is only supported with --definition") + + _execute_declarative_flow_command(["uv", "run", "crewai", "flow", "kickoff"]) + + +def plot_declarative_flow_in_project_env(definition: str) -> None: + """Plot a declarative flow inside the project's Python environment.""" + if is_declarative_flow_project_env() or not _has_project_file(): + plot_declarative_flow(definition=definition) + return + + _execute_declarative_flow_command(["uv", "run", "crewai", "flow", "plot"]) + + +def run_declarative_flow(definition: str, inputs: str | None = None) -> None: + """Run a declarative flow from a YAML/JSON file path.""" + parsed_inputs = _parse_inputs(inputs) + + try: + flow = load_declarative_flow(definition) + result = flow.kickoff(inputs=parsed_inputs) + except Exception as exc: + click.echo( + f"An error occurred while running the declarative flow: {exc}", err=True + ) + raise SystemExit(1) from exc + + click.echo(_format_result(result)) + + +def plot_declarative_flow(definition: str) -> None: + """Plot a declarative flow from a YAML/JSON file path.""" + try: + flow = load_declarative_flow(definition) + flow.plot() + except Exception as exc: + click.echo( + f"An error occurred while plotting the declarative flow: {exc}", err=True + ) + raise SystemExit(1) from exc + + +def load_declarative_flow(definition: str) -> Any: + """Load a declarative Flow instance from a YAML/JSON file path.""" + try: + from crewai.flow.flow import Flow + from crewai.flow.flow_definition import FlowDefinition + except ImportError as exc: + click.echo( + "Running declarative flows requires the full crewai package.", + err=True, + ) + raise SystemExit(1) from exc + + definition_path = Path(definition).expanduser() + definition_source = _read_declarative_flow_source(definition_path, definition) + + flow_definition = _parse_declarative_flow( + FlowDefinition, + definition_source, + source_path=definition_path, + ) + return Flow.from_definition(flow_definition) + + +def configured_project_declarative_flow( + pyproject_data: dict[str, Any] | None = None, +) -> str | None: + """Return the configured declarative flow source for flow projects.""" + if pyproject_data is None: + try: + from crewai_cli.utils import read_toml + + pyproject_data = read_toml() + except Exception: + return None + + crewai_config = pyproject_data.get("tool", {}).get("crewai", {}) + if crewai_config.get("type") != "flow": + return None + definition = crewai_config.get("definition") + if not isinstance(definition, str): + return None + return definition.strip() or None + + +def _execute_declarative_flow_command(command: list[str]) -> None: + env = build_env_with_all_tool_credentials() + + try: + subprocess.run( # noqa: S603 + command, + capture_output=False, + text=True, + check=True, + env=env, + ) + except subprocess.CalledProcessError as e: + raise SystemExit(e.returncode) from e + except Exception as e: + click.echo( + f"An unexpected error occurred while running the declarative flow: {e}", + err=True, + ) + raise SystemExit(1) from e + + +def is_declarative_flow_project_env() -> bool: + import os + + return os.environ.get("UV_RUN_RECURSION_DEPTH") is not None + + +def _has_project_file(project_root: Path | None = None) -> bool: + root = project_root or Path.cwd() + return (root / "pyproject.toml").is_file() + + +def _parse_inputs(inputs: str | None) -> dict[str, Any] | None: + if inputs is None: + return None + + try: + parsed = json.loads(inputs) + except json.JSONDecodeError as exc: + click.echo(f"Invalid --inputs JSON: {exc}", err=True) + raise SystemExit(1) from exc + + if not isinstance(parsed, dict): + click.echo("Invalid --inputs JSON: expected an object.", err=True) + raise SystemExit(1) + + return parsed + + +def _read_declarative_flow_source(path: Path, definition: str) -> str: + try: + if path.is_file(): + source = _read_declarative_flow_file(path) + elif path.exists(): + click.echo( + f"Invalid --definition path: {definition} is not a file.", err=True + ) + raise SystemExit(1) + else: + click.echo( + f"Invalid --definition path: {definition} does not exist.", err=True + ) + raise SystemExit(1) + except OSError as exc: + click.echo(f"Invalid --definition path: {definition} ({exc})", err=True) + raise SystemExit(1) from exc + + return source + + +def _read_declarative_flow_file(path: Path) -> str: + try: + source = path.read_text(encoding="utf-8") + except (OSError, UnicodeError) as exc: + click.echo( + f"Unable to read --definition path {path}: {exc}", + err=True, + ) + raise SystemExit(1) from exc + return source + + +def _parse_declarative_flow( + flow_definition_cls: type[Any], source: str, *, source_path: Path +) -> Any: + if _looks_like_json(source): + return flow_definition_cls.from_json(source, source_path=source_path) + + return flow_definition_cls.from_yaml(source, source_path=source_path) + + +def _looks_like_json(source: str) -> bool: + stripped = source.lstrip() + return stripped.startswith("{") + + +def _format_result(result: Any) -> str: + raw_result = getattr(result, "raw", result) + if isinstance(raw_result, str): + return raw_result + + try: + return json.dumps(raw_result, default=str) + except TypeError: + return str(raw_result) diff --git a/lib/cli/src/crewai_cli/run_flow_definition.py b/lib/cli/src/crewai_cli/run_flow_definition.py deleted file mode 100644 index 7acb6d9fe..000000000 --- a/lib/cli/src/crewai_cli/run_flow_definition.py +++ /dev/null @@ -1,113 +0,0 @@ -from __future__ import annotations - -import json -from pathlib import Path -from typing import Any - -import click - - -def run_flow_definition(definition: str, inputs: str | None = None) -> None: - """Run a flow from a Flow Definition YAML/JSON string or file path.""" - try: - from crewai.flow.flow import Flow - from crewai.flow.flow_definition import FlowDefinition - except ImportError as exc: - click.echo( - "Running flows from definitions requires the full crewai package.", - err=True, - ) - raise SystemExit(1) from exc - - parsed_inputs = _parse_inputs(inputs) - definition_source = _read_definition_source(definition) - - try: - flow_definition = _parse_flow_definition(FlowDefinition, definition_source) - flow = Flow.from_definition(flow_definition) - result = flow.kickoff(inputs=parsed_inputs) - except Exception as exc: - click.echo( - f"An error occurred while running the flow definition: {exc}", err=True - ) - raise SystemExit(1) from exc - - click.echo(_format_result(result)) - - -def _parse_inputs(inputs: str | None) -> dict[str, Any] | None: - if inputs is None: - return None - - try: - parsed = json.loads(inputs) - except json.JSONDecodeError as exc: - click.echo(f"Invalid --inputs JSON: {exc}", err=True) - raise SystemExit(1) from exc - - if not isinstance(parsed, dict): - click.echo("Invalid --inputs JSON: expected an object.", err=True) - raise SystemExit(1) - - return parsed - - -def _read_definition_source(definition: str) -> str: - path = Path(definition).expanduser() - try: - is_file = path.is_file() - except OSError as exc: - if _looks_like_inline_definition(definition): - return definition - click.echo(f"Invalid --definition path: {definition} ({exc})", err=True) - raise SystemExit(1) from exc - - if is_file: - try: - return path.read_text(encoding="utf-8") - except (OSError, UnicodeError) as exc: - click.echo( - f"Unable to read --definition path {path}: {exc}", - err=True, - ) - raise SystemExit(1) from exc - - try: - if path.exists(): - click.echo( - f"Invalid --definition path: {definition} is not a file.", err=True - ) - raise SystemExit(1) - except OSError as exc: - click.echo(f"Invalid --definition path: {definition} ({exc})", err=True) - raise SystemExit(1) from exc - - return definition - - -def _looks_like_inline_definition(definition: str) -> bool: - stripped = definition.lstrip() - return "\n" in definition or stripped.startswith(("{", "---")) or ":" in stripped - - -def _parse_flow_definition(flow_definition_cls: type[Any], source: str) -> Any: - if _looks_like_json(source): - return flow_definition_cls.from_json(source) - - return flow_definition_cls.from_yaml(source) - - -def _looks_like_json(source: str) -> bool: - stripped = source.lstrip() - return stripped.startswith("{") - - -def _format_result(result: Any) -> str: - raw_result = getattr(result, "raw", result) - if isinstance(raw_result, str): - return raw_result - - try: - return json.dumps(raw_result, default=str) - except TypeError: - return str(raw_result) diff --git a/lib/cli/src/crewai_cli/templates/declarative_flow/.gitignore b/lib/cli/src/crewai_cli/templates/declarative_flow/.gitignore new file mode 100644 index 000000000..9b826004b --- /dev/null +++ b/lib/cli/src/crewai_cli/templates/declarative_flow/.gitignore @@ -0,0 +1,5 @@ +.env +.venv/ +__pycache__/ +.crewai/ +output/ diff --git a/lib/cli/src/crewai_cli/templates/declarative_flow/README.md b/lib/cli/src/crewai_cli/templates/declarative_flow/README.md new file mode 100644 index 000000000..2de72c4df --- /dev/null +++ b/lib/cli/src/crewai_cli/templates/declarative_flow/README.md @@ -0,0 +1,17 @@ +# {{name}} Flow + +This project defines a CrewAI Flow in `src/{{folder_name}}/flow.yaml`. + +## Install + +```bash +crewai install +``` + +## Run + +```bash +crewai flow kickoff +``` + +Edit `src/{{folder_name}}/flow.yaml` to change the flow. Add reusable crews under `src/{{folder_name}}/crews/`, custom Python tools under `src/{{folder_name}}/tools/`, and shared knowledge files under `src/{{folder_name}}/knowledge/`. diff --git a/lib/cli/src/crewai_cli/templates/declarative_flow/flow.yaml b/lib/cli/src/crewai_cli/templates/declarative_flow/flow.yaml new file mode 100644 index 000000000..3b07891fe --- /dev/null +++ b/lib/cli/src/crewai_cli/templates/declarative_flow/flow.yaml @@ -0,0 +1,15 @@ +schema: crewai.flow/v1 +name: {{flow_name}} +description: A declarative CrewAI Flow. + +state: + type: dict + default: + topic: AI agents + +methods: + start: + start: true + do: + call: expression + expr: state.topic diff --git a/lib/cli/src/crewai_cli/templates/declarative_flow/pyproject.toml b/lib/cli/src/crewai_cli/templates/declarative_flow/pyproject.toml new file mode 100644 index 000000000..e4a9a8693 --- /dev/null +++ b/lib/cli/src/crewai_cli/templates/declarative_flow/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "{{folder_name}}" +version = "0.1.0" +description = "{{name}} using crewAI" +authors = [{ name = "Your Name", email = "you@example.com" }] +requires-python = ">=3.10,<3.14" +dependencies = [ + "crewai[tools]==1.14.8a2" +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/{{folder_name}}"] + +[tool.crewai] +type = "flow" +definition = "src/{{folder_name}}/flow.yaml" diff --git a/lib/cli/tests/test_cli.py b/lib/cli/tests/test_cli.py index 3b5ce277f..9d8802f27 100644 --- a/lib/cli/tests/test_cli.py +++ b/lib/cli/tests/test_cli.py @@ -130,8 +130,8 @@ def test_run_uses_project_runner_by_default(run_crew, runner): assert "experimental" not in result.output.lower() -@mock.patch("crewai_cli.cli.run_flow_definition") -def test_run_with_definition_uses_definition_runner(run_flow_definition, runner): +@mock.patch("crewai_cli.cli.run_declarative_flow") +def test_run_with_definition_uses_definition_runner(run_declarative_flow, runner): result = runner.invoke( run, ["--definition", "flow.yaml", "--inputs", '{"topic":"AI"}'], @@ -142,19 +142,21 @@ def test_run_with_definition_uses_definition_runner(run_flow_definition, runner) "Warning: `crewai run --definition` is experimental and may change without notice." in result.output ) - run_flow_definition.assert_called_once_with( + run_declarative_flow.assert_called_once_with( definition="flow.yaml", inputs='{"topic":"AI"}' ) @mock.patch("crewai_cli.cli.run_crew") -@mock.patch("crewai_cli.cli.run_flow_definition") -def test_run_rejects_inputs_without_definition(run_flow_definition, run_crew, runner): +@mock.patch("crewai_cli.cli.run_declarative_flow") +def test_run_rejects_inputs_without_definition( + run_declarative_flow, run_crew, runner +): result = runner.invoke(run, ["--inputs", '{"topic":"AI"}']) assert result.exit_code == 2 assert "Error: --inputs requires --definition" in result.output - run_flow_definition.assert_not_called() + run_declarative_flow.assert_not_called() run_crew.assert_not_called() @@ -166,6 +168,23 @@ def test_create_crew_in_dmn_mode_skips_provider_prompts(create_json_crew, runner create_json_crew.assert_called_once_with("DMN Crew", None, True) +@mock.patch("crewai_cli.create_flow.create_flow") +def test_create_flow_declarative_uses_declarative_scaffold(create_flow, runner): + result = runner.invoke(create, ["flow", "My Flow", "--declarative"]) + + assert result.exit_code == 0 + create_flow.assert_called_once_with("My Flow", declarative=True) + + +@mock.patch("crewai_cli.create_json_crew.create_json_crew") +def test_create_crew_rejects_declarative_flag(create_json_crew, runner): + result = runner.invoke(create, ["crew", "My Crew", "--declarative"]) + + assert result.exit_code == 2 + assert "--declarative can only be used with flow projects" in result.output + create_json_crew.assert_not_called() + + def test_create_requires_type_in_dmn_mode(runner): result = runner.invoke(create, env={"CREWAI_DMN": "True"}) diff --git a/lib/cli/tests/test_create_flow.py b/lib/cli/tests/test_create_flow.py new file mode 100644 index 000000000..2fa941e58 --- /dev/null +++ b/lib/cli/tests/test_create_flow.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from pathlib import Path + +from click.testing import CliRunner +from pytest import MonkeyPatch +import tomli + +from crewai_cli.cli import crewai +from crewai_cli.create_flow import create_flow + + +def test_create_flow_declarative_project_can_run( + tmp_path: Path, monkeypatch: MonkeyPatch +): + monkeypatch.chdir(tmp_path) + create_flow("Research Flow", declarative=True) + + project_root = tmp_path / "research_flow" + assert project_root.is_dir() + + pyproject = tomli.loads( + (project_root / "pyproject.toml").read_text(encoding="utf-8") + ) + assert pyproject["project"]["name"] == "research_flow" + assert pyproject["project"]["requires-python"] + assert pyproject["project"]["dependencies"] + assert (project_root / pyproject["tool"]["crewai"]["definition"]).is_file() + + monkeypatch.chdir(project_root) + result = CliRunner().invoke( + crewai, ["flow", "kickoff"], env={"UV_RUN_RECURSION_DEPTH": "1"} + ) + + assert result.exit_code == 0 + assert "Running the Flow" in result.output + assert "AI agents" in result.output diff --git a/lib/cli/tests/test_flow_commands.py b/lib/cli/tests/test_flow_commands.py new file mode 100644 index 000000000..6154ff642 --- /dev/null +++ b/lib/cli/tests/test_flow_commands.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +from collections.abc import Callable +from pathlib import Path +import subprocess + +import pytest + +import crewai_cli.kickoff_flow as kickoff_flow_module +import crewai_cli.plot_flow as plot_flow_module + + +FLOW_YAML = """\ +schema: crewai.flow/v1 +name: TestFlow +config: + suppress_flow_events: true +methods: + begin: + start: true + do: + call: expression + expr: "'AI'" +""" + + +def _write_flow_project(project_root: Path) -> None: + (project_root / "flow.yaml").write_text(FLOW_YAML, encoding="utf-8") + (project_root / "pyproject.toml").write_text( + '[project]\nname = "demo"\n\n' + '[tool.crewai]\ntype = "flow"\ndefinition = "flow.yaml"\n', + encoding="utf-8", + ) + + +def test_kickoff_flow_runs_configured_declarative_definition( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], +) -> None: + _write_flow_project(tmp_path) + monkeypatch.chdir(tmp_path) + monkeypatch.setenv("UV_RUN_RECURSION_DEPTH", "1") + + kickoff_flow_module.kickoff_flow() + + assert capsys.readouterr().out == "AI\n" + + +def test_plot_flow_runs_configured_declarative_definition( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + _write_flow_project(tmp_path) + monkeypatch.chdir(tmp_path) + monkeypatch.setenv("UV_RUN_RECURSION_DEPTH", "1") + + plot_flow_module.plot_flow() + + +@pytest.mark.parametrize( + ("command", "expected"), + [ + pytest.param(kickoff_flow_module.kickoff_flow, ["uv", "run", "kickoff"]), + pytest.param(plot_flow_module.plot_flow, ["uv", "run", "plot"]), + ], +) +def test_flow_commands_keep_python_entrypoint_without_definition( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + command: Callable[[], None], + expected: list[str], +) -> None: + subprocess_calls = [] + + monkeypatch.chdir(tmp_path) + monkeypatch.setattr( + subprocess, + "run", + lambda command, **kwargs: subprocess_calls.append((command, kwargs)), + ) + + command() + + assert subprocess_calls == [ + ( + expected, + {"capture_output": False, "text": True, "check": True}, + ) + ] + + +def test_configured_project_declarative_flow( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.chdir(tmp_path) + (tmp_path / "pyproject.toml").write_text( + '[tool.crewai]\ntype = "flow"\ndefinition = " flow.yaml "\n', + encoding="utf-8", + ) + + from crewai_cli.run_declarative_flow import configured_project_declarative_flow + + assert configured_project_declarative_flow() == "flow.yaml" diff --git a/lib/cli/tests/test_run_declarative_flow.py b/lib/cli/tests/test_run_declarative_flow.py new file mode 100644 index 000000000..9808d6b17 --- /dev/null +++ b/lib/cli/tests/test_run_declarative_flow.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +from pathlib import Path + +import pytest + +import crewai_cli.run_declarative_flow as run_declarative_flow_module + + +FLOW_YAML = """\ +schema: crewai.flow/v1 +name: TestFlow +config: + suppress_flow_events: true +methods: + begin: + start: true + do: + call: expression + expr: state.topic +""" + + +def test_run_declarative_flow_reads_definition_file( + tmp_path: Path, capsys: pytest.CaptureFixture[str] +) -> None: + definition_path = tmp_path / "flow.yaml" + definition_path.write_text(FLOW_YAML, encoding="utf-8") + + run_declarative_flow_module.run_declarative_flow( + str(definition_path), '{"topic":"AI"}' + ) + + assert capsys.readouterr().out == "AI\n" + + +def test_run_declarative_flow_rejects_non_object_inputs( + tmp_path: Path, capsys: pytest.CaptureFixture[str] +) -> None: + definition_path = tmp_path / "flow.yaml" + definition_path.write_text(FLOW_YAML, encoding="utf-8") + + with pytest.raises(SystemExit): + run_declarative_flow_module.run_declarative_flow( + str(definition_path), '["not", "an", "object"]' + ) + + assert "Invalid --inputs JSON: expected an object." in capsys.readouterr().err + + +def test_run_declarative_flow_reports_missing_file( + capsys: pytest.CaptureFixture[str], +) -> None: + with pytest.raises(SystemExit): + run_declarative_flow_module.run_declarative_flow("missing-flow.yaml") + + assert ( + "Invalid --definition path: missing-flow.yaml does not exist." + in capsys.readouterr().err + ) + + +def test_run_declarative_flow_in_project_env_uses_uv( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + subprocess_calls = [] + + monkeypatch.chdir(tmp_path) + monkeypatch.delenv("UV_RUN_RECURSION_DEPTH", raising=False) + (tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n") + monkeypatch.setattr( + run_declarative_flow_module, + "build_env_with_all_tool_credentials", + lambda: {"EXISTING": "value"}, + ) + monkeypatch.setattr( + run_declarative_flow_module.subprocess, + "run", + lambda command, **kwargs: subprocess_calls.append((command, kwargs)), + ) + + run_declarative_flow_module.run_declarative_flow_in_project_env("flow.yaml") + + assert subprocess_calls == [ + ( + ["uv", "run", "crewai", "flow", "kickoff"], + { + "capture_output": False, + "text": True, + "check": True, + "env": {"EXISTING": "value"}, + }, + ) + ] + + +def test_run_declarative_flow_in_process_inside_uv( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + capsys: pytest.CaptureFixture[str], +) -> None: + monkeypatch.chdir(tmp_path) + monkeypatch.setenv("UV_RUN_RECURSION_DEPTH", "1") + (tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n") + (tmp_path / "flow.yaml").write_text(FLOW_YAML, encoding="utf-8") + + run_declarative_flow_module.run_declarative_flow_in_project_env( + "flow.yaml", '{"topic":"AI"}' + ) + + assert capsys.readouterr().out == "AI\n" diff --git a/lib/cli/tests/test_run_flow_definition.py b/lib/cli/tests/test_run_flow_definition.py deleted file mode 100644 index 532f810be..000000000 --- a/lib/cli/tests/test_run_flow_definition.py +++ /dev/null @@ -1,156 +0,0 @@ -from __future__ import annotations - -import json -import sys -import types - -import pytest -import yaml - -from crewai_cli.run_flow_definition import run_flow_definition - - -class _FakeFlow: - def __init__(self, definition): - self.definition = definition - - def kickoff(self, inputs=None): - return { - "flow": self.definition["name"], - "inputs": inputs or {}, - } - - -class _FakeFlowFactory: - @classmethod - def from_definition(cls, definition): - return _FakeFlow(definition) - - -class _FakeFlowDefinition: - @classmethod - def from_yaml(cls, source): - return yaml.safe_load(source) - - @classmethod - def from_json(cls, source): - return json.loads(source) - - -@pytest.fixture -def fake_flow_runtime(monkeypatch): - crewai_module = types.ModuleType("crewai") - flow_package = types.ModuleType("crewai.flow") - flow_module = types.ModuleType("crewai.flow.flow") - flow_definition_module = types.ModuleType("crewai.flow.flow_definition") - - flow_module.Flow = _FakeFlowFactory - flow_definition_module.FlowDefinition = _FakeFlowDefinition - - monkeypatch.setitem(sys.modules, "crewai", crewai_module) - monkeypatch.setitem(sys.modules, "crewai.flow", flow_package) - monkeypatch.setitem(sys.modules, "crewai.flow.flow", flow_module) - monkeypatch.setitem( - sys.modules, "crewai.flow.flow_definition", flow_definition_module - ) - - -def _captured_json(capsys): - return json.loads(capsys.readouterr().out) - - -def test_run_flow_definition_reads_definition_file( - tmp_path, capsys, fake_flow_runtime -): - definition_path = tmp_path / "flow.yaml" - definition_path.write_text("schema: crewai.flow/v1\nname: TestFlow\n") - - run_flow_definition(str(definition_path), '{"topic":"AI"}') - - assert _captured_json(capsys) == { - "flow": "TestFlow", - "inputs": {"topic": "AI"}, - } - - -@pytest.mark.parametrize( - ("definition_source", "expected_flow_name"), - [ - pytest.param( - "schema: crewai.flow/v1\nname: InlineFlow\n", - "InlineFlow", - id="inline-yaml", - ), - pytest.param( - '{"schema":"crewai.flow/v1","name":"InlineJsonFlow"}', - "InlineJsonFlow", - id="inline-json", - ), - pytest.param( - '{"schema":"crewai.flow/v1","name":"' + ("JsonFlow" * 500) + '"}', - "JsonFlow" * 500, - id="large-inline-json", - ), - ], -) -def test_run_flow_definition_accepts_inline_definitions( - definition_source, expected_flow_name, capsys, fake_flow_runtime -): - run_flow_definition(definition_source) - - assert _captured_json(capsys) == {"flow": expected_flow_name, "inputs": {}} - - -@pytest.mark.parametrize( - ("filename", "definition_source", "expected_flow_name"), - [ - pytest.param( - "flow.yaml", - "schema: crewai.flow/v1\nname: YamlFileFlow\n", - "YamlFileFlow", - id="yaml-file", - ), - pytest.param( - "flow.json", - '{"schema":"crewai.flow/v1","name":"JsonFlow"}', - "JsonFlow", - id="json-file", - ), - ], -) -def test_run_flow_definition_accepts_definition_files( - filename, definition_source, expected_flow_name, tmp_path, capsys, fake_flow_runtime -): - definition_path = tmp_path / filename - definition_path.write_text(definition_source) - - run_flow_definition(str(definition_path)) - - assert _captured_json(capsys) == {"flow": expected_flow_name, "inputs": {}} - - -def test_run_flow_definition_rejects_non_object_inputs(fake_flow_runtime, capsys): - with pytest.raises(SystemExit): - run_flow_definition("name: TestFlow", '["not", "an", "object"]') - - assert "Invalid --inputs JSON: expected an object." in capsys.readouterr().err - - -def test_run_flow_definition_reports_unreadable_file( - monkeypatch, tmp_path, capsys, fake_flow_runtime -): - definition_path = tmp_path / "flow.yaml" - definition_path.write_text("schema: crewai.flow/v1\nname: TestFlow\n") - - def raise_permission_error(self, *args, **kwargs): - raise PermissionError("no access") - - monkeypatch.setattr("pathlib.Path.read_text", raise_permission_error) - - with pytest.raises(SystemExit): - run_flow_definition(str(definition_path)) - - err = capsys.readouterr().err - assert "Unable to read --definition path" in err - assert str(definition_path) in err - assert "no access" in err diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 5c277d3ce..6f05853d0 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -1,6 +1,6 @@ -"""Flow Structure: the serializable, language-agnostic Flow contract. +"""Flow Definition: the serializable, declarative Flow contract. -Defines :class:`FlowDefinition` and its sub-models — a static, textual +Defines :class:`FlowDefinition` and its sub-models — a static, declarative (JSON/YAML) representation of a Flow: its methods, trigger conditions, state, and configuration. It is independent of the Python authoring layer that may have produced it and of the engine that runs it (see @@ -11,6 +11,7 @@ from __future__ import annotations import json import logging +from pathlib import Path import re from typing import Annotated, Any, Literal, TypeAlias, cast @@ -18,6 +19,7 @@ from pydantic import ( BaseModel, ConfigDict, Field, + PrivateAttr, field_serializer, model_validator, ) @@ -406,10 +408,19 @@ class FlowCrewActionDefinition(BaseModel): ) call: Literal["crew"] = Field( - description="Action discriminator. Use crew to run an inline Crew definition.", + description=( + "Action discriminator. Use crew to run an inline or referenced Crew " + "definition." + ), examples=["crew"], ) - with_: CrewDefinition = Field( + from_declaration: str | None = Field( + default=None, + description="Path to a JSON/JSONC Crew declaration file or folder.", + examples=["crews/research_crew"], + ) + with_: CrewDefinition | None = Field( + default=None, alias="with", description="Inline Crew definition to load and execute for this action.", examples=[ @@ -430,10 +441,26 @@ class FlowCrewActionDefinition(BaseModel): "agent": "researcher", } ], - "inputs": {"topic": "${state.topic}"}, } ], ) + inputs: dict[str, ExpressionData] | None = Field( + default=None, + description=( + "Input overrides passed to the Crew. String values are evaluated as CEL " + "only when the trimmed value starts with ${ and ends with }; all other " + "values are literal." + ), + examples=[{"topic": "${state.topic}"}], + ) + + @model_validator(mode="after") + def _validate_crew_source(self) -> FlowCrewActionDefinition: + if bool(self.from_declaration) == (self.with_ is not None): + raise ValueError( + "crew action requires exactly one of from_declaration or with" + ) + return self class FlowAgentActionDefinition(BaseModel): @@ -684,10 +711,12 @@ class FlowDefinition(BaseModel): arbitrary_types_allowed=True, ) + _source_path: Path | None = PrivateAttr(default=None) + schema_: Literal["crewai.flow/v1"] = Field( default="crewai.flow/v1", alias="schema", - description="Flow Definition schema identifier and version.", + description="Declarative Flow schema identifier and version.", examples=["crewai.flow/v1"], ) name: str = Field( @@ -764,29 +793,45 @@ class FlowDefinition(BaseModel): allow_unicode=True, ) + @property + def source_path(self) -> Path | None: + """Original definition file path, when loaded from a file.""" + return self._source_path + + @property + def source_dir(self) -> Path | None: + """Directory used to resolve relative paths in the definition.""" + if self._source_path is None: + return None + return self._source_path.parent + @classmethod - def from_dict(cls, data: dict[str, Any]) -> FlowDefinition: + def from_dict( + cls, data: dict[str, Any], *, source_path: Path | None = None + ) -> FlowDefinition: """Load a definition from a dictionary.""" definition = cls.model_validate(data) + if source_path is not None: + definition._source_path = source_path.expanduser().resolve() log_flow_definition_issues(definition) return definition @classmethod - def from_json(cls, data: str) -> FlowDefinition: + def from_json(cls, data: str, *, source_path: Path | None = None) -> FlowDefinition: """Load a definition from JSON.""" - return cls.from_dict(json.loads(data)) + return cls.from_dict(json.loads(data), source_path=source_path) @classmethod - def from_yaml(cls, data: str) -> FlowDefinition: + def from_yaml(cls, data: str, *, source_path: Path | None = None) -> FlowDefinition: """Load a definition from YAML.""" loaded = yaml.safe_load(data) or {} if not isinstance(loaded, dict): raise ValueError("Flow definition YAML must contain a mapping") - return cls.from_dict(loaded) + return cls.from_dict(loaded, source_path=source_path) @classmethod def json_schema(cls) -> dict[str, Any]: - """Return the JSON Schema for the Flow Definition contract.""" + """Return the JSON Schema for the declarative Flow contract.""" return cls.model_json_schema(by_alias=True) @@ -826,10 +871,16 @@ def _validate_action_cel( return if isinstance(action, FlowCrewActionDefinition): - Expression(cast(ExpressionData, action.with_.inputs)).validate_template( - allowed_roots=allowed_roots, - source=f"{path}.with.inputs", - ) + if action.with_ is not None: + Expression(cast(ExpressionData, action.with_.inputs)).validate_template( + allowed_roots=allowed_roots, + source=f"{path}.with.inputs", + ) + if action.inputs is not None: + Expression(cast(ExpressionData, action.inputs)).validate_template( + allowed_roots=allowed_roots, + source=f"{path}.inputs", + ) return if isinstance(action, FlowAgentActionDefinition): diff --git a/lib/crewai/src/crewai/flow/runtime/_actions.py b/lib/crewai/src/crewai/flow/runtime/_actions.py index c437e274b..c8f118775 100644 --- a/lib/crewai/src/crewai/flow/runtime/_actions.py +++ b/lib/crewai/src/crewai/flow/runtime/_actions.py @@ -8,6 +8,7 @@ from collections.abc import Awaitable, Callable import contextvars import inspect import os +from pathlib import Path from typing import TYPE_CHECKING, Any, Protocol, cast from crewai.flow.expressions import Expression, ExpressionData @@ -128,16 +129,34 @@ class CrewAction: self.definition = definition async def run(self, *_args: Any, **kwargs: Any) -> Any: - from crewai.project.crew_loader import load_crew_from_definition + from crewai.project.crew_loader import load_crew, load_crew_from_definition local_context = _pop_local_context(kwargs) - crew_definition = self.definition.with_ + if self.definition.from_declaration is not None: + crew, default_inputs = load_crew( + _resolve_crew_declaration( + self.definition.from_declaration, + base_dir=self.flow._definition.source_dir, + ) + ) + input_template = {**default_inputs, **(self.definition.inputs or {})} + else: + crew_definition = self.definition.with_ + if crew_definition is None: + raise ValueError( + "crew action requires exactly one of from_declaration or with" + ) + input_template = { + **crew_definition.inputs, + **(self.definition.inputs or {}), + } + crew, _ = load_crew_from_definition(crew_definition, source="crew action") + inputs = Expression.from_flow( - cast(ExpressionData, crew_definition.inputs), + cast(ExpressionData, input_template), self.flow, local_context=local_context, ).render_template() - crew, _ = load_crew_from_definition(crew_definition, source="crew action") return await crew.kickoff_async(inputs=inputs) @@ -359,3 +378,29 @@ def _pop_local_context(kwargs: dict[str, Any]) -> LocalContext | None: if not isinstance(local_context, dict): raise TypeError("flow definition local context must be a mapping") return cast(LocalContext, local_context) + + +def _resolve_crew_declaration( + from_declaration: str, *, base_dir: Path | None = None +) -> Path: + path = Path(from_declaration).expanduser() + if base_dir is not None: + resolved_base_dir = base_dir.expanduser().resolve() + if not path.is_absolute(): + path = resolved_base_dir / path + resolved_path = path.resolve() + if not resolved_path.is_relative_to(resolved_base_dir): + raise ValueError( + "crew declaration path must be within the flow definition directory" + ) + path = resolved_path + + if not path.is_dir(): + return path + + for name in ("crew.jsonc", "crew.json"): + candidate = path / name + if candidate.is_file(): + return candidate + + return path / "crew.jsonc" diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index 1ed8dbcf9..693d75ef5 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -1005,8 +1005,8 @@ methods: description: Research {topic} expected_output: Findings about {topic} agent: researcher - inputs: - topic: "${state.topic}" + inputs: + topic: "${state.topic}" start: true """ @@ -1020,6 +1020,183 @@ methods: } +def test_crew_action_runs_crew_from_declaration( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +): + from crewai import Crew + + project_root = tmp_path / "project" + crew_root = project_root / "crews" / "research_crew" + agents_root = crew_root / "agents" + agents_root.mkdir(parents=True) + (agents_root / "researcher.jsonc").write_text( + """ +{ + "role": "Researcher", + "goal": "Research {topic}", + "backstory": "Knows things." +} +""", + encoding="utf-8", + ) + (crew_root / "crew.jsonc").write_text( + """ +{ + "name": "referenced_research", + "agents": ["researcher"], + "tasks": [ + { + "name": "research_task", + "description": "Research {topic}", + "expected_output": "Findings about {topic}", + "agent": "researcher" + } + ], + "inputs": { + "topic": "Default topic", + "audience": "developers" + } +} +""", + encoding="utf-8", + ) + + async def fake_kickoff_async( + self: Crew, inputs: dict[str, Any] | None = None, **_kwargs: Any + ) -> dict[str, Any]: + return { + "crew": self.name, + "tasks": [task.description for task in self.tasks], + "inputs": inputs, + } + + monkeypatch.setattr(Crew, "kickoff_async", fake_kickoff_async) + monkeypatch.chdir(project_root) + + yaml_str = """ +schema: crewai.flow/v1 +name: CrewFlow +methods: + research: + do: + call: crew + from_declaration: crews/research_crew + inputs: + topic: "${state.topic}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"topic": "AI"}) == { + "crew": "referenced_research", + "tasks": ["Research {topic}"], + "inputs": {"topic": "AI", "audience": "developers"}, + } + + +def test_crew_action_from_declaration_resolves_relative_to_flow_file( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +): + from crewai import Crew + + project_root = tmp_path / "project" + crew_root = project_root / "crews" / "research_crew" + agents_root = crew_root / "agents" + agents_root.mkdir(parents=True) + (agents_root / "researcher.jsonc").write_text( + """ +{ + "role": "Researcher", + "goal": "Research {topic}", + "backstory": "Knows things." +} +""", + encoding="utf-8", + ) + (crew_root / "crew.jsonc").write_text( + """ +{ + "name": "relative_research", + "agents": ["researcher"], + "tasks": [ + { + "description": "Research {topic}", + "expected_output": "Findings about {topic}", + "agent": "researcher" + } + ], + "inputs": { + "topic": "Default topic" + } +} +""", + encoding="utf-8", + ) + + async def fake_kickoff_async( + self: Crew, inputs: dict[str, Any] | None = None, **_kwargs: Any + ) -> dict[str, Any]: + return {"crew": self.name, "inputs": inputs} + + monkeypatch.setattr(Crew, "kickoff_async", fake_kickoff_async) + + flow_path = project_root / "flow.yaml" + yaml_str = """ +schema: crewai.flow/v1 +name: CrewFlow +methods: + research: + do: + call: crew + from_declaration: crews/research_crew + inputs: + topic: "${state.topic}" + start: true +""" + flow_path.write_text(yaml_str, encoding="utf-8") + + other_cwd = tmp_path / "other" + other_cwd.mkdir() + monkeypatch.chdir(other_cwd) + + flow = Flow.from_definition( + FlowDefinition.from_yaml(yaml_str, source_path=flow_path) + ) + + assert flow.kickoff(inputs={"topic": "AI"}) == { + "crew": "relative_research", + "inputs": {"topic": "AI"}, + } + + +def test_crew_action_from_declaration_rejects_paths_outside_flow_file( + tmp_path: Path, +): + flow_path = tmp_path / "project" / "flow.yaml" + flow_path.parent.mkdir() + yaml_str = """ +schema: crewai.flow/v1 +name: CrewFlow +methods: + research: + do: + call: crew + from_declaration: ../outside/crew.jsonc + start: true +""" + + flow = Flow.from_definition( + FlowDefinition.from_yaml(yaml_str, source_path=flow_path) + ) + + with pytest.raises( + ValueError, + match="crew declaration path must be within the flow definition directory", + ): + flow.kickoff() + + def test_crew_action_round_trips_with_inline_definition(): definition = FlowDefinition.from_dict( { @@ -1047,8 +1224,8 @@ def test_crew_action_round_trips_with_inline_definition(): "agent": "researcher", } ], - "inputs": {"topic": "${state.topic}"}, }, + "inputs": {"topic": "${state.topic}"}, }, } }, @@ -1062,6 +1239,9 @@ def test_crew_action_round_trips_with_inline_definition(): ]["role"] == "Researcher" ) + assert definition.to_dict()["methods"]["research"]["do"]["inputs"] == { + "topic": "${state.topic}" + } def test_crew_action_normalizes_named_agent_list_definition(): @@ -1162,7 +1342,7 @@ def test_crew_action_rejects_incomplete_inline_agent_definition(): ) -def test_crew_action_rejects_ref(): +def test_crew_action_rejects_python_ref_field(): with pytest.raises(ValidationError, match="ref"): FlowDefinition.from_dict( { @@ -1174,7 +1354,6 @@ def test_crew_action_rejects_ref(): "do": { "call": "crew", "ref": "project.crew:build_crew", - "with": {"inputs": {"topic": "AI"}}, }, } },