diff --git a/lib/cli/src/crewai_cli/run_declarative_flow.py b/lib/cli/src/crewai_cli/run_declarative_flow.py index c6ff668c4..b70492777 100644 --- a/lib/cli/src/crewai_cli/run_declarative_flow.py +++ b/lib/cli/src/crewai_cli/run_declarative_flow.py @@ -6,6 +6,7 @@ import subprocess from typing import Any import click +from pydantic import ValidationError from crewai_cli.utils import build_env_with_all_tool_credentials @@ -65,7 +66,6 @@ def load_declarative_flow(definition: str) -> Any: """Load a declarative Flow instance from a definition path.""" try: from crewai.flow.flow import Flow - from crewai.flow.flow_definition import FlowDefinition except ImportError as exc: click.echo( "Running declarative flows requires the full crewai package.", @@ -74,14 +74,30 @@ def load_declarative_flow(definition: str) -> Any: raise SystemExit(1) from exc definition_path = Path(definition).expanduser() - definition_source = _read_declarative_flow_source(definition_path, definition) + try: + if not definition_path.is_file(): + if definition_path.exists(): + click.echo( + f"Invalid --definition path: {definition} is not a file.", + err=True, + ) + raise SystemExit(1) + click.echo( + f"Invalid --definition path: {definition} does not exist.", err=True + ) + raise SystemExit(1) + except OSError as exc: + click.echo(f"Invalid --definition path: {definition} ({exc})", err=True) + raise SystemExit(1) from exc - flow_definition = _parse_declarative_flow( - FlowDefinition, - definition_source, - source_path=definition_path, - ) - return Flow.from_definition(flow_definition) + try: + return Flow.from_declaration(path=definition_path) + except (OSError, UnicodeError, ValueError, ValidationError) as exc: + click.echo( + f"Unable to read --definition path {definition_path}: {exc}", + err=True, + ) + raise SystemExit(1) from exc def configured_project_declarative_flow( @@ -154,53 +170,6 @@ def _parse_inputs(inputs: str | None) -> dict[str, Any] | None: return parsed -def _read_declarative_flow_source(path: Path, definition: str) -> str: - try: - if path.is_file(): - source = _read_declarative_flow_file(path) - elif path.exists(): - click.echo( - f"Invalid --definition path: {definition} is not a file.", err=True - ) - raise SystemExit(1) - else: - click.echo( - f"Invalid --definition path: {definition} does not exist.", err=True - ) - raise SystemExit(1) - except OSError as exc: - click.echo(f"Invalid --definition path: {definition} ({exc})", err=True) - raise SystemExit(1) from exc - - return source - - -def _read_declarative_flow_file(path: Path) -> str: - try: - source = path.read_text(encoding="utf-8") - except (OSError, UnicodeError) as exc: - click.echo( - f"Unable to read --definition path {path}: {exc}", - err=True, - ) - raise SystemExit(1) from exc - return source - - -def _parse_declarative_flow( - flow_definition_cls: type[Any], source: str, *, source_path: Path -) -> Any: - if _looks_like_json(source): - return flow_definition_cls.from_json(source, source_path=source_path) - - return flow_definition_cls.from_yaml(source, source_path=source_path) - - -def _looks_like_json(source: str) -> bool: - stripped = source.lstrip() - return stripped.startswith("{") - - def _format_result(result: Any) -> str: raw_result = getattr(result, "raw", result) if isinstance(raw_result, str): diff --git a/lib/cli/tests/test_run_declarative_flow.py b/lib/cli/tests/test_run_declarative_flow.py index db1286ee7..8d5435c8f 100644 --- a/lib/cli/tests/test_run_declarative_flow.py +++ b/lib/cli/tests/test_run_declarative_flow.py @@ -60,6 +60,43 @@ def test_run_declarative_flow_reports_missing_file( ) +def test_run_declarative_flow_reports_empty_file( + tmp_path: Path, capsys: pytest.CaptureFixture[str] +) -> None: + definition_path = tmp_path / "flow.yaml" + definition_path.write_text(" \n", encoding="utf-8") + + with pytest.raises(SystemExit): + run_declarative_flow_module.run_declarative_flow(str(definition_path)) + + assert "Flow declaration file is empty" in capsys.readouterr().err + + +@pytest.mark.parametrize( + "contents, expected_error", + [ + ("[]\n", "Flow declaration must contain a mapping"), + ("schema: crewai.flow/v1\nmethods: {}\n", "Field required"), + ], +) +def test_load_declarative_flow_reports_invalid_declarations( + tmp_path: Path, + capsys: pytest.CaptureFixture[str], + contents: str, + expected_error: str, +) -> None: + definition_path = tmp_path / "flow.yaml" + definition_path.write_text(contents, encoding="utf-8") + + with pytest.raises(SystemExit) as exc_info: + run_declarative_flow_module.load_declarative_flow(str(definition_path)) + + assert exc_info.value.code == 1 + stderr = capsys.readouterr().err + assert f"Unable to read --definition path {definition_path}:" in stderr + assert expected_error in stderr + + def test_run_declarative_flow_in_project_env_uses_uv( monkeypatch: pytest.MonkeyPatch, tmp_path: Path ) -> None: diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 6f05853d0..fa9c89ea8 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -1,7 +1,7 @@ """Flow Definition: the serializable, declarative Flow contract. -Defines :class:`FlowDefinition` and its sub-models — a static, declarative -(JSON/YAML) representation of a Flow: its methods, trigger conditions, +Defines :class:`FlowDefinition` and its sub-models — a static declarative +representation of a Flow: its methods, trigger conditions, state, and configuration. It is independent of the Python authoring layer that may have produced it and of the engine that runs it (see ``runtime``). @@ -235,7 +235,7 @@ class FlowPersistenceDefinition(BaseModel): ``persistence`` may hold a live backend when the definition is built from a decorated class — the engine then persists through the exact instance - the user configured; the JSON/YAML projection degrades it to its + the user configured; the declarative projection degrades it to its serialized config. """ @@ -275,7 +275,7 @@ class FlowHumanFeedbackDefinition(BaseModel): """Static human feedback configuration. ``llm`` and ``provider`` may hold live Python objects when the definition - is built from a decorated class; the JSON/YAML projection degrades them to + is built from a decorated class; the declarative projection degrades them to a serialized config (``llm``) or a ``module:qualname`` ref (``provider``). """ @@ -777,7 +777,7 @@ class FlowDefinition(BaseModel): return self def to_dict(self, *, exclude_none: bool = True) -> dict[str, Any]: - """Serialize the definition to a JSON/YAML-ready dictionary.""" + """Serialize the definition to a declaration-ready dictionary.""" return self.model_dump(by_alias=True, exclude_none=exclude_none, mode="json") def to_json(self, *, indent: int | None = 2, exclude_none: bool = True) -> str: @@ -817,16 +817,37 @@ class FlowDefinition(BaseModel): return definition @classmethod - def from_json(cls, data: str, *, source_path: Path | None = None) -> FlowDefinition: - """Load a definition from JSON.""" - return cls.from_dict(json.loads(data), source_path=source_path) + def from_declaration( + cls, + *, + contents: FlowDefinition | str | dict[str, Any] | None = None, + path: Path | str | None = None, + ) -> FlowDefinition: + """Load a declarative flow from contents or a file path.""" + if isinstance(contents, cls): + return contents - @classmethod - def from_yaml(cls, data: str, *, source_path: Path | None = None) -> FlowDefinition: - """Load a definition from YAML.""" - loaded = yaml.safe_load(data) or {} + source_path: Path | None = None + if contents is None: + if path is None: + raise ValueError("Provide contents or path") + source_path = Path(path) + contents = source_path.expanduser().read_text(encoding="utf-8") + + if isinstance(contents, dict): + return cls.from_dict(contents) + + if not isinstance(contents, str): + raise TypeError("Flow declaration contents must be a string or dictionary") + + if not contents.strip(): + if source_path is not None: + raise ValueError(f"Flow declaration file is empty: {source_path}") + raise ValueError("Flow declaration contents are empty") + + loaded = yaml.safe_load(contents) if not isinstance(loaded, dict): - raise ValueError("Flow definition YAML must contain a mapping") + raise ValueError("Flow declaration must contain a mapping") return cls.from_dict(loaded, source_path=source_path) @classmethod diff --git a/lib/crewai/src/crewai/flow/runtime/__init__.py b/lib/crewai/src/crewai/flow/runtime/__init__.py index c47526a78..4b07f3533 100644 --- a/lib/crewai/src/crewai/flow/runtime/__init__.py +++ b/lib/crewai/src/crewai/flow/runtime/__init__.py @@ -25,6 +25,7 @@ from datetime import datetime import enum import inspect import logging +from pathlib import Path import threading from typing import ( TYPE_CHECKING, @@ -769,6 +770,21 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): @classmethod def from_definition(cls, definition: FlowDefinition, **kwargs: Any) -> Flow[Any]: """Build a runnable Flow directly from a definition; no subclass required.""" + return cls.from_declaration(contents=definition, **kwargs) + + @classmethod + def from_declaration( + cls, + *, + contents: FlowDefinition | str | dict[str, Any] | None = None, + path: Path | str | None = None, + **kwargs: Any, + ) -> Flow[Any]: + """Build a runnable declarative flow from contents or a file path.""" + definition = FlowDefinition.from_declaration( + contents=contents, + path=path, + ) return cls.model_validate( {**definition.config.model_dump(), **kwargs}, context={"flow_definition": definition}, diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index 2aa654151..89df6c902 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -613,7 +613,7 @@ def test_flow_definition_merges_stacked_listen_router(): assert methods["second_router"].emit == ["second_approval", "not_approved"] -def test_flow_definition_round_trips_json_and_yaml(): +def test_flow_definition_round_trips_declaration_serialization(): class RoundTripFlow(Flow): @start() def begin(self): @@ -629,16 +629,122 @@ def test_flow_definition_round_trips_json_and_yaml(): definition = RoundTripFlow.flow_definition() - json_round_trip = flow_definition.FlowDefinition.from_json(definition.to_json()) - yaml_round_trip = flow_definition.FlowDefinition.from_yaml(definition.to_yaml()) + round_trips = [ + flow_definition.FlowDefinition.from_declaration(contents=definition.to_json()), + flow_definition.FlowDefinition.from_declaration(contents=definition.to_yaml()), + ] - assert json_round_trip.to_dict() == definition.to_dict() - assert yaml_round_trip.to_dict() == definition.to_dict() - assert yaml_round_trip.methods["decide"].router is True - assert yaml_round_trip.methods["decide"].listen == "begin" + for round_trip in round_trips: + assert round_trip.to_dict() == definition.to_dict() + assert round_trip.methods["decide"].router is True + assert round_trip.methods["decide"].listen == "begin" -def test_each_action_round_trips_json_and_yaml(): +def test_flow_definition_from_declaration_accepts_contents(): + data = { + "schema": "crewai.flow/v1", + "name": "DeclarationFlow", + "methods": { + "begin": { + "start": True, + "do": { + "call": "expression", + "expr": "'started'", + }, + }, + }, + } + definition = flow_definition.FlowDefinition.from_dict(data) + contents = [ + definition, + data, + definition.to_json(), + definition.to_yaml(), + ] + expected = definition.to_dict() + + for content in contents: + loaded = flow_definition.FlowDefinition.from_declaration(contents=content) + + assert loaded.to_dict() == expected + + +def test_flow_definition_from_declaration_rejects_empty_file(tmp_path: Path): + declaration_path = tmp_path / "flow.crewai" + declaration_path.write_text(" \n", encoding="utf-8") + + with pytest.raises(ValueError, match="Flow declaration file is empty"): + flow_definition.FlowDefinition.from_declaration(path=declaration_path) + + +@pytest.mark.parametrize("contents", ["[]", "false", "0", "null", "~"]) +def test_flow_definition_from_declaration_rejects_falsey_non_mapping_contents( + contents: str, +): + with pytest.raises(ValueError, match="Flow declaration must contain a mapping"): + flow_definition.FlowDefinition.from_declaration(contents=contents) + + +def test_flow_definition_from_declaration_accepts_paths(tmp_path: Path): + definition = flow_definition.FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "DeclarationFlow", + "methods": { + "begin": { + "start": True, + "do": { + "call": "expression", + "expr": "'started'", + }, + }, + }, + } + ) + declaration_path = tmp_path / "flow.crewai" + declaration_path.write_text(definition.to_yaml(), encoding="utf-8") + path_inputs = [ + declaration_path, + str(declaration_path), + ] + + for path_input in path_inputs: + loaded = flow_definition.FlowDefinition.from_declaration(path=path_input) + + assert loaded.to_dict() == definition.to_dict() + assert loaded.source_path == declaration_path.resolve() + + +def test_flow_definition_from_declaration_requires_input(): + with pytest.raises(ValueError, match="Provide contents or path"): + flow_definition.FlowDefinition.from_declaration() + + +def test_flow_definition_from_declaration_prefers_contents_over_path( + tmp_path: Path, +): + data = { + "schema": "crewai.flow/v1", + "name": "ContentsFlow", + "methods": { + "begin": { + "start": True, + "do": {"call": "expression", "expr": "'started'"}, + }, + }, + } + declaration_path = tmp_path / "missing.crewai" + + loaded = flow_definition.FlowDefinition.from_declaration( + contents=data, + path=declaration_path, + ) + + assert loaded.name == "ContentsFlow" + assert loaded.source_path is None + + +def test_each_action_round_trips_declaration_serialization(): definition = flow_definition.FlowDefinition.from_dict( { "schema": "crewai.flow/v1", @@ -677,15 +783,17 @@ def test_each_action_round_trips_json_and_yaml(): } ) - json_round_trip = flow_definition.FlowDefinition.from_json(definition.to_json()) - yaml_round_trip = flow_definition.FlowDefinition.from_yaml(definition.to_yaml()) + round_trips = [ + flow_definition.FlowDefinition.from_declaration(contents=definition.to_json()), + flow_definition.FlowDefinition.from_declaration(contents=definition.to_yaml()), + ] - assert json_round_trip.to_dict() == definition.to_dict() - assert yaml_round_trip.to_dict() == definition.to_dict() - assert yaml_round_trip.methods["process_rows"].description == ( - "Process every loaded row." - ) - assert yaml_round_trip.methods["process_rows"].do.call == "each" + for round_trip in round_trips: + assert round_trip.to_dict() == definition.to_dict() + assert round_trip.methods["process_rows"].description == ( + "Process every loaded row." + ) + assert round_trip.methods["process_rows"].do.call == "each" def test_flow_definition_rejects_invalid_method_names(): diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index 693d75ef5..909f56459 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -454,7 +454,7 @@ def assert_parity(flow_cls, yaml_str, inputs=None, ordered=True): class_flow = flow_cls() class_result, class_events = _run_with_events(class_flow, inputs) - definition = FlowDefinition.from_yaml(yaml_str) + definition = FlowDefinition.from_declaration(contents=yaml_str) definition_flow = Flow.from_definition(definition) definition_result, definition_events = _run_with_events(definition_flow, inputs) @@ -477,6 +477,21 @@ def test_simple_chain_parity(): assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"] +def test_flow_from_declaration_builds_runnable_flow(): + flow = Flow.from_declaration(contents=CHAIN_YAML) + + assert flow.kickoff() == "confirmed:True" + assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"] + + +def test_flow_from_declaration_accepts_flow_definition(): + definition = FlowDefinition.from_declaration(contents=CHAIN_YAML) + flow = Flow.from_declaration(contents=definition) + + assert flow.kickoff() == "confirmed:True" + assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"] + + def test_and_or_merge_parity(): flow, _ = assert_parity(MergeFlow, MERGE_YAML, ordered=False) assert flow.state["joined"] is True @@ -499,7 +514,7 @@ def test_cyclic_flow_parity(): def test_definition_flow_events_use_definition_name(): - definition = FlowDefinition.from_yaml(CHAIN_YAML) + definition = FlowDefinition.from_declaration(contents=CHAIN_YAML) flow = Flow.from_definition(definition) _, events = _run_with_events(flow) assert events @@ -588,7 +603,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff() == "found:ai agents" @@ -639,7 +654,7 @@ methods: listen: begin """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"topic": "ai"}) == "found:ai agents" @@ -758,7 +773,7 @@ methods: listen: begin """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff() == "search:hello agents" @@ -783,7 +798,7 @@ methods: listen: build_query """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff() == "found:ai agents news" @@ -803,7 +818,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert ( flow.kickoff(inputs={"limit": 2, "domains": ["crewai.com", "example.com"]}) @@ -836,7 +851,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"question": "What is CrewAI?"}) == { "agent": "Analyst", @@ -874,7 +889,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"questions": ["one", "two"]}) == [ "Analyst:one", @@ -905,7 +920,7 @@ def test_agent_action_round_trips_with_inline_definition(): } ) - round_trip = FlowDefinition.from_yaml(definition.to_yaml()) + round_trip = FlowDefinition.from_declaration(contents=definition.to_yaml()) action = round_trip.to_dict()["methods"]["answer"]["do"] assert action["call"] == "agent" @@ -968,7 +983,7 @@ methods: """ with pytest.raises(ValidationError, match="invalid CEL expression"): - FlowDefinition.from_yaml(yaml_str) + FlowDefinition.from_declaration(contents=yaml_str) def test_crew_action_runs_inline_yaml_definition(monkeypatch: pytest.MonkeyPatch): @@ -1010,7 +1025,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"topic": "AI"}) == { "crew": "inline_research", @@ -1086,7 +1101,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"topic": "AI"}) == { "crew": "referenced_research", @@ -1160,9 +1175,7 @@ methods: other_cwd.mkdir() monkeypatch.chdir(other_cwd) - flow = Flow.from_definition( - FlowDefinition.from_yaml(yaml_str, source_path=flow_path) - ) + flow = Flow.from_definition(FlowDefinition.from_declaration(path=flow_path)) assert flow.kickoff(inputs={"topic": "AI"}) == { "crew": "relative_research", @@ -1185,10 +1198,9 @@ methods: from_declaration: ../outside/crew.jsonc start: true """ + flow_path.write_text(yaml_str, encoding="utf-8") - flow = Flow.from_definition( - FlowDefinition.from_yaml(yaml_str, source_path=flow_path) - ) + flow = Flow.from_definition(FlowDefinition.from_declaration(path=flow_path)) with pytest.raises( ValueError, @@ -1411,7 +1423,7 @@ methods: """ with pytest.raises(ValidationError, match="invalid CEL expression"): - FlowDefinition.from_yaml(yaml_str) + FlowDefinition.from_declaration(contents=yaml_str) def test_code_action_renders_keyword_inputs(): @@ -1429,7 +1441,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"name": "hello"}) == "hello!" @@ -1448,7 +1460,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"value": "ok"}) == "callable:ok" @@ -1472,7 +1484,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [ "normalized:a", @@ -1499,7 +1511,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) caller_thread_id = threading.get_ident() assert flow.kickoff(inputs={"rows": ["a"]}) == ["process_rows:a"] @@ -1526,7 +1538,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"] @@ -1548,7 +1560,7 @@ methods: FlowScriptExecutionDisabledError, match="CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1", ) as exc_info: - Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert "methods with unresolvable actions" not in str(exc_info.value) @@ -1572,7 +1584,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"raw_score": 3.2}) == "rounded:4" assert flow.state["rounded"] == 4 @@ -1601,7 +1613,7 @@ methods: listen: seed """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff() == "alpha:alpha" assert flow.state["input_matches_output"] is True @@ -1639,7 +1651,7 @@ methods: listen: seed """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"] @@ -1671,7 +1683,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [ {"row": "a", "normalized": "saved:a"}, @@ -1700,7 +1712,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["a", "b"] assert flow._method_outputs == [ @@ -1738,7 +1750,7 @@ methods: listen: seed """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [ "local:a", @@ -1777,7 +1789,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff( inputs={ @@ -1811,7 +1823,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff(inputs={"rows": [{"kind": "keep", "value": "a"}]}) == ["a"] @@ -1838,7 +1850,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert flow.kickoff( inputs={ @@ -1868,7 +1880,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) with pytest.raises(ValueError, match="if expression must evaluate to a boolean"): flow.kickoff(inputs={"rows": [{"value": "truthy"}]}) @@ -1898,7 +1910,7 @@ methods: listen: process_rows """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) events = [] with crewai_event_bus.scoped_handlers(): @@ -2069,7 +2081,7 @@ methods: start: true """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) with pytest.raises(RuntimeError, match="bad row"): flow.kickoff(inputs={"rows": ["ok", "bad"]}) @@ -2190,7 +2202,7 @@ methods: listen: right """ - definition = FlowDefinition.from_yaml(yaml_str) + definition = FlowDefinition.from_declaration(contents=yaml_str) assert Flow.from_definition(definition).kickoff( inputs={"direction": "left"} @@ -2213,7 +2225,7 @@ methods: """ with pytest.raises(ValidationError, match="invalid CEL expression"): - FlowDefinition.from_yaml(yaml_str) + FlowDefinition.from_declaration(contents=yaml_str) def test_expression_action_rejects_unknown_cel_root(): @@ -2229,7 +2241,7 @@ methods: """ with pytest.raises(ValidationError, match="unknown CEL root"): - FlowDefinition.from_yaml(yaml_str) + FlowDefinition.from_declaration(contents=yaml_str) def test_tool_action_requires_module_qualname_ref(): @@ -2263,14 +2275,16 @@ def test_pydantic_state_from_ref_parity(): def test_pydantic_state_default_overlay(): - flow = Flow.from_definition(FlowDefinition.from_yaml(PYDANTIC_STATE_OVERLAY_YAML)) + flow = Flow.from_definition( + FlowDefinition.from_declaration(contents=PYDANTIC_STATE_OVERLAY_YAML) + ) result = flow.kickoff() assert result == "count=6" assert flow.state.count == 6 def test_json_schema_state(): - flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=JSON_SCHEMA_STATE_YAML)) result = flow.kickoff() assert result == "count=1" assert flow.state.count == 1 @@ -2279,14 +2293,14 @@ def test_json_schema_state(): def test_json_schema_state_validates_inputs(): - flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=JSON_SCHEMA_STATE_YAML)) with pytest.raises(ValueError, match="Invalid inputs"): flow.kickoff(inputs={"count": "not-a-number"}) def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable(): flow = Flow.from_definition( - FlowDefinition.from_yaml(PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML) + FlowDefinition.from_declaration(contents=PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML) ) result = flow.kickoff() assert result == "count=1" @@ -2295,7 +2309,9 @@ def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable(): def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog): with caplog.at_level("ERROR"): - flow = Flow.from_definition(FlowDefinition.from_yaml(UNRESOLVABLE_STATE_YAML)) + flow = Flow.from_definition( + FlowDefinition.from_declaration(contents=UNRESOLVABLE_STATE_YAML) + ) assert "falling back to dict state" in caplog.text result = flow.kickoff() @@ -2305,7 +2321,7 @@ def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog): def test_dict_state_is_a_copy_of_default_plus_id(): - definition = FlowDefinition.from_yaml(DICT_STATE_YAML) + definition = FlowDefinition.from_declaration(contents=DICT_STATE_YAML) flow = Flow.from_definition(definition) assert flow.state["count"] == 5 @@ -2322,7 +2338,7 @@ def test_dict_state_is_a_copy_of_default_plus_id(): def test_unknown_state_type_falls_back_to_dict(caplog): with caplog.at_level("WARNING"): - flow = Flow.from_definition(FlowDefinition.from_yaml(UNKNOWN_STATE_YAML)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=UNKNOWN_STATE_YAML)) assert "falling back to dict state" in caplog.text result = flow.kickoff() @@ -2395,7 +2411,7 @@ def _run_capturing_flow_lifecycle(yaml_str, event_types): def capture(source, event): events.append(event) - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) result = flow.kickoff() return flow, result, events @@ -2409,7 +2425,7 @@ _LIFECYCLE_EVENTS = [ ] -def test_config_suppress_flow_events_from_yaml(): +def test_config_suppress_flow_events_from_declaration(): twin_events = [] with crewai_event_bus.scoped_handlers(): for event_type in _LIFECYCLE_EVENTS: @@ -2432,14 +2448,14 @@ def test_config_suppress_flow_events_from_yaml(): ) -def test_config_max_method_calls_from_yaml(): - flow = Flow.from_definition(FlowDefinition.from_yaml(CAPPED_LOOP_YAML)) +def test_config_max_method_calls_from_declaration(): + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=CAPPED_LOOP_YAML)) with pytest.raises(RecursionError, match="has been called 2 times"): flow.kickoff() -def test_config_stream_from_yaml(): - flow = Flow.from_definition(FlowDefinition.from_yaml(STREAMING_CHAIN_YAML)) +def test_config_stream_from_declaration(): + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=STREAMING_CHAIN_YAML)) streaming = flow.kickoff() assert isinstance(streaming, FlowStreamingOutput) for _ in streaming: @@ -2448,7 +2464,7 @@ def test_config_stream_from_yaml(): assert flow.stream is True -def test_config_defer_trace_finalization_from_yaml(): +def test_config_defer_trace_finalization_from_declaration(): _, _, baseline_events = _run_capturing_flow_lifecycle( CHAIN_YAML, [FlowFinishedEvent] ) @@ -2462,7 +2478,7 @@ def test_config_defer_trace_finalization_from_yaml(): assert deferred_events == [] -def test_config_checkpoint_from_yaml(tmp_path): +def test_config_checkpoint_from_declaration(tmp_path): yaml_str = ( CHAIN_YAML + f""" @@ -2471,19 +2487,23 @@ config: location: {tmp_path} """ ) - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) assert isinstance(flow.checkpoint, CheckpointConfig) assert flow.checkpoint.location == str(tmp_path) -def test_config_input_provider_from_yaml(): - flow = Flow.from_definition(FlowDefinition.from_yaml(INPUT_PROVIDER_CHAIN_YAML)) +def test_config_input_provider_from_declaration(): + flow = Flow.from_definition( + FlowDefinition.from_declaration(contents=INPUT_PROVIDER_CHAIN_YAML) + ) assert isinstance(flow.input_provider, StubInputProvider) def test_round_trip_config_equivalence(): class_flow = ConfiguredFlow() - definition = FlowDefinition.from_yaml(ConfiguredFlow.flow_definition().to_yaml()) + definition = FlowDefinition.from_declaration( + contents=ConfiguredFlow.flow_definition().to_yaml() + ) definition_flow = Flow.from_definition(definition) assert definition.config.suppress_flow_events is True @@ -2653,9 +2673,9 @@ class MethodPersistedFlow(Flow): return "two" -def test_flow_level_persist_from_yaml_saves_once_per_method(): +def test_flow_level_persist_from_declaration_saves_once_per_method(): yaml_str = _flow_level_persist_yaml("yaml-flow-level") - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) result = flow.kickoff() assert result == "two" @@ -2665,9 +2685,9 @@ def test_flow_level_persist_from_yaml_saves_once_per_method(): assert final_save["id"] == flow.state["id"] -def test_method_level_persist_from_yaml_saves_only_that_method(): +def test_method_level_persist_from_declaration_saves_only_that_method(): yaml_str = _method_level_persist_yaml("yaml-method-level") - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) flow.kickoff() assert _saved_methods("yaml-method-level") == ["first"] @@ -2696,20 +2716,20 @@ methods: persist: enabled: false """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) flow.kickoff() assert _saved_methods("yaml-opt-out") == ["first"] -def test_persist_restore_by_id_from_yaml(): +def test_persist_restore_by_id_from_declaration(): yaml_str = _flow_level_persist_yaml("yaml-restore") - flow1 = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow1 = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) flow1.kickoff() assert flow1.state["count"] == 2 - flow2 = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow2 = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) flow2.kickoff(inputs={"id": flow1.state["id"]}) assert flow2.state["count"] == 4 @@ -2729,7 +2749,9 @@ def test_method_level_persist_decorator_saves_only_that_method(): def test_round_trip_persist_equivalence(): - definition = FlowDefinition.from_yaml(ClassPersistedFlow.flow_definition().to_yaml()) + definition = FlowDefinition.from_declaration( + contents=ClassPersistedFlow.flow_definition().to_yaml() + ) before = len(DefinitionStoreBackend.saves["class-decorator"]) flow = Flow.from_definition(definition) @@ -2738,7 +2760,7 @@ def test_round_trip_persist_equivalence(): assert _saved_methods("class-decorator")[before:] == ["first", "second"] -def test_method_persist_backend_overrides_flow_level_backend_from_yaml(): +def test_method_persist_backend_overrides_flow_level_backend_from_declaration(): yaml_str = f""" schema: crewai.flow/v1 name: PersistedFlow @@ -2762,7 +2784,7 @@ methods: persistence_type: DefinitionStoreBackend store: yaml-mixed-method """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) flow.kickoff() assert _saved_methods("yaml-mixed-flow") == ["first"] @@ -2910,8 +2932,8 @@ methods: """ -def test_human_feedback_from_yaml_default_outcome_routes(): - flow = Flow.from_definition(FlowDefinition.from_yaml(REVIEW_YAML)) +def test_human_feedback_from_declaration_default_outcome_routes(): + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=REVIEW_YAML)) with patch.object(flow, "_request_human_feedback", return_value="") as request: result = flow.kickoff() @@ -2922,8 +2944,8 @@ def test_human_feedback_from_yaml_default_outcome_routes(): assert flow.last_human_feedback.output == "draft-content" -def test_human_feedback_from_yaml_collapses_and_routes(): - flow = Flow.from_definition(FlowDefinition.from_yaml(REVIEW_YAML)) +def test_human_feedback_from_declaration_collapses_and_routes(): + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=REVIEW_YAML)) with ( patch.object(flow, "_request_human_feedback", return_value="ship it"), @@ -2940,7 +2962,7 @@ def test_round_trip_human_feedback_equivalence(): with patch.object(class_flow, "_request_human_feedback", return_value=""): class_result = class_flow.kickoff() - definition = FlowDefinition.from_yaml(ReviewFlow.flow_definition().to_yaml()) + definition = FlowDefinition.from_declaration(contents=ReviewFlow.flow_definition().to_yaml()) twin = Flow.from_definition(definition) with patch.object(twin, "_request_human_feedback", return_value=""): twin_result = twin.kickoff() @@ -2953,8 +2975,8 @@ def test_round_trip_human_feedback_equivalence(): ) -def test_human_feedback_pending_and_resume_from_yaml(): - definition = FlowDefinition.from_yaml(PENDING_REVIEW_YAML) +def test_human_feedback_pending_and_resume_from_declaration(): + definition = FlowDefinition.from_declaration(contents=PENDING_REVIEW_YAML) flow = Flow.from_definition(definition) pending = flow.kickoff() @@ -2975,7 +2997,7 @@ def test_human_feedback_pending_and_resume_from_yaml(): assert flow_id not in DefinitionStoreBackend.pending -def test_flow_config_provider_fallback_from_yaml(): +def test_flow_config_provider_fallback_from_declaration(): yaml_str = f""" schema: crewai.flow/v1 name: ConfigProviderFlow @@ -3001,7 +3023,7 @@ methods: return "from-config" provider = RecordingProvider() - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) previous = flow_config.hitl_provider flow_config.hitl_provider = provider @@ -3104,7 +3126,7 @@ methods: message: "Review:" provider: {__name__}:_NeedsArgsProvider """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) with pytest.raises( ValueError, match="cannot instantiate human_feedback.provider ref" @@ -3125,7 +3147,7 @@ methods: message: "Review:" provider: missing_module_xyz:Provider """ - flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str)) with pytest.raises( ValueError, match="unresolvable human_feedback.provider ref" @@ -3137,7 +3159,7 @@ def _checkpoint_chain_flow(tmp_path): from crewai.state.provider.json_provider import JsonProvider from crewai.state.runtime import RuntimeState - definition = FlowDefinition.from_yaml(CHAIN_YAML) + definition = FlowDefinition.from_declaration(contents=CHAIN_YAML) flow = Flow.from_definition(definition) result = flow.kickoff() assert result == "confirmed:True" @@ -3177,7 +3199,7 @@ state: methods: {} """ with pytest.raises(ValidationError, match="default"): - FlowDefinition.from_yaml(yaml_str) + FlowDefinition.from_declaration(contents=yaml_str) def test_definition_method_missing_from_class_fails_loudly():