diff --git a/lib/cli/src/crewai_cli/cli.py b/lib/cli/src/crewai_cli/cli.py index f1a8b20ba..f2ebd9a84 100644 --- a/lib/cli/src/crewai_cli/cli.py +++ b/lib/cli/src/crewai_cli/cli.py @@ -26,6 +26,7 @@ from crewai_cli.remote_template.main import TemplateCommand from crewai_cli.replay_from_task import replay_task_command from crewai_cli.reset_memories_command import reset_memories_command from crewai_cli.run_crew import run_crew +from crewai_cli.run_flow_definition import run_flow_definition from crewai_cli.settings.main import SettingsCommand from crewai_cli.task_outputs import load_task_outputs from crewai_cli.tools.main import ToolCommand @@ -398,8 +399,36 @@ def install(context: click.Context) -> None: "CREWAI_TRAINED_AGENTS_FILE." ), ) -def run(trained_agents_file: str | None) -> None: - """Run the Crew.""" +@click.option( + "--definition", + type=str, + default=None, + help=( + "Experimental: path to a Flow Definition YAML/JSON file, " + "or an inline YAML/JSON string." + ), +) +@click.option( + "--inputs", + type=str, + default=None, + help='Experimental: JSON object passed to flow.kickoff(), e.g. \'{"topic":"AI"}\'.', +) +def run( + trained_agents_file: str | None, definition: str | None, inputs: str | None +) -> None: + """Run the Crew or Flow.""" + if inputs is not None and definition is None: + raise click.UsageError("--inputs requires --definition") + + if definition is not None: + click.secho( + "Warning: `crewai run --definition` is experimental and may change without notice.", + fg="yellow", + ) + run_flow_definition(definition=definition, inputs=inputs) + return + run_crew(trained_agents_file=trained_agents_file) diff --git a/lib/cli/src/crewai_cli/run_flow_definition.py b/lib/cli/src/crewai_cli/run_flow_definition.py new file mode 100644 index 000000000..7acb6d9fe --- /dev/null +++ b/lib/cli/src/crewai_cli/run_flow_definition.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import click + + +def run_flow_definition(definition: str, inputs: str | None = None) -> None: + """Run a flow from a Flow Definition YAML/JSON string or file path.""" + try: + from crewai.flow.flow import Flow + from crewai.flow.flow_definition import FlowDefinition + except ImportError as exc: + click.echo( + "Running flows from definitions requires the full crewai package.", + err=True, + ) + raise SystemExit(1) from exc + + parsed_inputs = _parse_inputs(inputs) + definition_source = _read_definition_source(definition) + + try: + flow_definition = _parse_flow_definition(FlowDefinition, definition_source) + flow = Flow.from_definition(flow_definition) + result = flow.kickoff(inputs=parsed_inputs) + except Exception as exc: + click.echo( + f"An error occurred while running the flow definition: {exc}", err=True + ) + raise SystemExit(1) from exc + + click.echo(_format_result(result)) + + +def _parse_inputs(inputs: str | None) -> dict[str, Any] | None: + if inputs is None: + return None + + try: + parsed = json.loads(inputs) + except json.JSONDecodeError as exc: + click.echo(f"Invalid --inputs JSON: {exc}", err=True) + raise SystemExit(1) from exc + + if not isinstance(parsed, dict): + click.echo("Invalid --inputs JSON: expected an object.", err=True) + raise SystemExit(1) + + return parsed + + +def _read_definition_source(definition: str) -> str: + path = Path(definition).expanduser() + try: + is_file = path.is_file() + except OSError as exc: + if _looks_like_inline_definition(definition): + return definition + click.echo(f"Invalid --definition path: {definition} ({exc})", err=True) + raise SystemExit(1) from exc + + if is_file: + try: + return path.read_text(encoding="utf-8") + except (OSError, UnicodeError) as exc: + click.echo( + f"Unable to read --definition path {path}: {exc}", + err=True, + ) + raise SystemExit(1) from exc + + try: + if path.exists(): + click.echo( + f"Invalid --definition path: {definition} is not a file.", err=True + ) + raise SystemExit(1) + except OSError as exc: + click.echo(f"Invalid --definition path: {definition} ({exc})", err=True) + raise SystemExit(1) from exc + + return definition + + +def _looks_like_inline_definition(definition: str) -> bool: + stripped = definition.lstrip() + return "\n" in definition or stripped.startswith(("{", "---")) or ":" in stripped + + +def _parse_flow_definition(flow_definition_cls: type[Any], source: str) -> Any: + if _looks_like_json(source): + return flow_definition_cls.from_json(source) + + return flow_definition_cls.from_yaml(source) + + +def _looks_like_json(source: str) -> bool: + stripped = source.lstrip() + return stripped.startswith("{") + + +def _format_result(result: Any) -> str: + raw_result = getattr(result, "raw", result) + if isinstance(raw_result, str): + return raw_result + + try: + return json.dumps(raw_result, default=str) + except TypeError: + return str(raw_result) diff --git a/lib/cli/tests/test_cli.py b/lib/cli/tests/test_cli.py index b8e88f333..15abe42ab 100644 --- a/lib/cli/tests/test_cli.py +++ b/lib/cli/tests/test_cli.py @@ -13,6 +13,7 @@ from crewai_cli.cli import ( flow_add_crew, login, reset_memories, + run, test, train, version, @@ -119,6 +120,43 @@ def test_test_invalid_string_iterations(evaluate_crew, runner): ) +@mock.patch("crewai_cli.cli.run_crew") +def test_run_uses_project_runner_by_default(run_crew, runner): + result = runner.invoke(run) + + assert result.exit_code == 0 + run_crew.assert_called_once_with(trained_agents_file=None) + assert "experimental" not in result.output.lower() + + +@mock.patch("crewai_cli.cli.run_flow_definition") +def test_run_with_definition_uses_definition_runner(run_flow_definition, runner): + result = runner.invoke( + run, + ["--definition", "flow.yaml", "--inputs", '{"topic":"AI"}'], + ) + + assert result.exit_code == 0 + assert ( + "Warning: `crewai run --definition` is experimental and may change without notice." + in result.output + ) + run_flow_definition.assert_called_once_with( + definition="flow.yaml", inputs='{"topic":"AI"}' + ) + + +@mock.patch("crewai_cli.cli.run_crew") +@mock.patch("crewai_cli.cli.run_flow_definition") +def test_run_rejects_inputs_without_definition(run_flow_definition, run_crew, runner): + result = runner.invoke(run, ["--inputs", '{"topic":"AI"}']) + + assert result.exit_code == 2 + assert "Error: --inputs requires --definition" in result.output + run_flow_definition.assert_not_called() + run_crew.assert_not_called() + + @mock.patch("crewai_cli.cli.AuthenticationCommand") def test_login(command, runner): mock_auth = command.return_value diff --git a/lib/cli/tests/test_run_flow_definition.py b/lib/cli/tests/test_run_flow_definition.py new file mode 100644 index 000000000..532f810be --- /dev/null +++ b/lib/cli/tests/test_run_flow_definition.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +import json +import sys +import types + +import pytest +import yaml + +from crewai_cli.run_flow_definition import run_flow_definition + + +class _FakeFlow: + def __init__(self, definition): + self.definition = definition + + def kickoff(self, inputs=None): + return { + "flow": self.definition["name"], + "inputs": inputs or {}, + } + + +class _FakeFlowFactory: + @classmethod + def from_definition(cls, definition): + return _FakeFlow(definition) + + +class _FakeFlowDefinition: + @classmethod + def from_yaml(cls, source): + return yaml.safe_load(source) + + @classmethod + def from_json(cls, source): + return json.loads(source) + + +@pytest.fixture +def fake_flow_runtime(monkeypatch): + crewai_module = types.ModuleType("crewai") + flow_package = types.ModuleType("crewai.flow") + flow_module = types.ModuleType("crewai.flow.flow") + flow_definition_module = types.ModuleType("crewai.flow.flow_definition") + + flow_module.Flow = _FakeFlowFactory + flow_definition_module.FlowDefinition = _FakeFlowDefinition + + monkeypatch.setitem(sys.modules, "crewai", crewai_module) + monkeypatch.setitem(sys.modules, "crewai.flow", flow_package) + monkeypatch.setitem(sys.modules, "crewai.flow.flow", flow_module) + monkeypatch.setitem( + sys.modules, "crewai.flow.flow_definition", flow_definition_module + ) + + +def _captured_json(capsys): + return json.loads(capsys.readouterr().out) + + +def test_run_flow_definition_reads_definition_file( + tmp_path, capsys, fake_flow_runtime +): + definition_path = tmp_path / "flow.yaml" + definition_path.write_text("schema: crewai.flow/v1\nname: TestFlow\n") + + run_flow_definition(str(definition_path), '{"topic":"AI"}') + + assert _captured_json(capsys) == { + "flow": "TestFlow", + "inputs": {"topic": "AI"}, + } + + +@pytest.mark.parametrize( + ("definition_source", "expected_flow_name"), + [ + pytest.param( + "schema: crewai.flow/v1\nname: InlineFlow\n", + "InlineFlow", + id="inline-yaml", + ), + pytest.param( + '{"schema":"crewai.flow/v1","name":"InlineJsonFlow"}', + "InlineJsonFlow", + id="inline-json", + ), + pytest.param( + '{"schema":"crewai.flow/v1","name":"' + ("JsonFlow" * 500) + '"}', + "JsonFlow" * 500, + id="large-inline-json", + ), + ], +) +def test_run_flow_definition_accepts_inline_definitions( + definition_source, expected_flow_name, capsys, fake_flow_runtime +): + run_flow_definition(definition_source) + + assert _captured_json(capsys) == {"flow": expected_flow_name, "inputs": {}} + + +@pytest.mark.parametrize( + ("filename", "definition_source", "expected_flow_name"), + [ + pytest.param( + "flow.yaml", + "schema: crewai.flow/v1\nname: YamlFileFlow\n", + "YamlFileFlow", + id="yaml-file", + ), + pytest.param( + "flow.json", + '{"schema":"crewai.flow/v1","name":"JsonFlow"}', + "JsonFlow", + id="json-file", + ), + ], +) +def test_run_flow_definition_accepts_definition_files( + filename, definition_source, expected_flow_name, tmp_path, capsys, fake_flow_runtime +): + definition_path = tmp_path / filename + definition_path.write_text(definition_source) + + run_flow_definition(str(definition_path)) + + assert _captured_json(capsys) == {"flow": expected_flow_name, "inputs": {}} + + +def test_run_flow_definition_rejects_non_object_inputs(fake_flow_runtime, capsys): + with pytest.raises(SystemExit): + run_flow_definition("name: TestFlow", '["not", "an", "object"]') + + assert "Invalid --inputs JSON: expected an object." in capsys.readouterr().err + + +def test_run_flow_definition_reports_unreadable_file( + monkeypatch, tmp_path, capsys, fake_flow_runtime +): + definition_path = tmp_path / "flow.yaml" + definition_path.write_text("schema: crewai.flow/v1\nname: TestFlow\n") + + def raise_permission_error(self, *args, **kwargs): + raise PermissionError("no access") + + monkeypatch.setattr("pathlib.Path.read_text", raise_permission_error) + + with pytest.raises(SystemExit): + run_flow_definition(str(definition_path)) + + err = capsys.readouterr().err + assert "Unable to read --definition path" in err + assert str(definition_path) in err + assert "no access" in err