Add unified declarative flow loading (#6308)

Add a single declaration loader shared by API and CLI callers.

- Add FlowDefinition.from_declaration for FlowDefinition instances, dictionaries, YAML/JSON strings, and file paths
- Add Flow.from_declaration to build runnable flows directly from the same inputs
- Route declarative flow CLI loading through Flow.from_declaration so path handling and validation stay centralized

```
# Load just the serializable definition when you do not need to run it yet.
definition = FlowDefinition.from_declaration(path="flows/research.crewai")
definition = FlowDefinition.from_declaration(contents=flow_yaml)
definition = FlowDefinition.from_declaration(contents=flow_dict)

# Build a runnable flow directly from the same declaration inputs.
flow = Flow.from_declaration(path="flows/research.crewai")
flow = Flow.from_declaration(contents=flow_yaml)
flow = Flow.from_declaration(contents=flow_dict)
flow = Flow.from_declaration(contents=definition)

# Run it like any other flow.
result = flow.kickoff(inputs={"topic": "AI agents"})

# The CLI now goes through the same path-based loader.
# crewai run --definition flows/research.crewai
```
This commit is contained in:
Vinicius Brasil
2026-06-23 12:02:18 -07:00
committed by GitHub
parent 793539173d
commit 3452e5c187
6 changed files with 341 additions and 168 deletions

View File

@@ -6,6 +6,7 @@ import subprocess
from typing import Any
import click
from pydantic import ValidationError
from crewai_cli.utils import build_env_with_all_tool_credentials
@@ -65,7 +66,6 @@ def load_declarative_flow(definition: str) -> Any:
"""Load a declarative Flow instance from a definition path."""
try:
from crewai.flow.flow import Flow
from crewai.flow.flow_definition import FlowDefinition
except ImportError as exc:
click.echo(
"Running declarative flows requires the full crewai package.",
@@ -74,14 +74,30 @@ def load_declarative_flow(definition: str) -> Any:
raise SystemExit(1) from exc
definition_path = Path(definition).expanduser()
definition_source = _read_declarative_flow_source(definition_path, definition)
try:
if not definition_path.is_file():
if definition_path.exists():
click.echo(
f"Invalid --definition path: {definition} is not a file.",
err=True,
)
raise SystemExit(1)
click.echo(
f"Invalid --definition path: {definition} does not exist.", err=True
)
raise SystemExit(1)
except OSError as exc:
click.echo(f"Invalid --definition path: {definition} ({exc})", err=True)
raise SystemExit(1) from exc
flow_definition = _parse_declarative_flow(
FlowDefinition,
definition_source,
source_path=definition_path,
)
return Flow.from_definition(flow_definition)
try:
return Flow.from_declaration(path=definition_path)
except (OSError, UnicodeError, ValueError, ValidationError) as exc:
click.echo(
f"Unable to read --definition path {definition_path}: {exc}",
err=True,
)
raise SystemExit(1) from exc
def configured_project_declarative_flow(
@@ -154,53 +170,6 @@ def _parse_inputs(inputs: str | None) -> dict[str, Any] | None:
return parsed
def _read_declarative_flow_source(path: Path, definition: str) -> str:
try:
if path.is_file():
source = _read_declarative_flow_file(path)
elif path.exists():
click.echo(
f"Invalid --definition path: {definition} is not a file.", err=True
)
raise SystemExit(1)
else:
click.echo(
f"Invalid --definition path: {definition} does not exist.", err=True
)
raise SystemExit(1)
except OSError as exc:
click.echo(f"Invalid --definition path: {definition} ({exc})", err=True)
raise SystemExit(1) from exc
return source
def _read_declarative_flow_file(path: Path) -> str:
try:
source = path.read_text(encoding="utf-8")
except (OSError, UnicodeError) as exc:
click.echo(
f"Unable to read --definition path {path}: {exc}",
err=True,
)
raise SystemExit(1) from exc
return source
def _parse_declarative_flow(
flow_definition_cls: type[Any], source: str, *, source_path: Path
) -> Any:
if _looks_like_json(source):
return flow_definition_cls.from_json(source, source_path=source_path)
return flow_definition_cls.from_yaml(source, source_path=source_path)
def _looks_like_json(source: str) -> bool:
stripped = source.lstrip()
return stripped.startswith("{")
def _format_result(result: Any) -> str:
raw_result = getattr(result, "raw", result)
if isinstance(raw_result, str):

View File

@@ -60,6 +60,43 @@ def test_run_declarative_flow_reports_missing_file(
)
def test_run_declarative_flow_reports_empty_file(
tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
definition_path = tmp_path / "flow.yaml"
definition_path.write_text(" \n", encoding="utf-8")
with pytest.raises(SystemExit):
run_declarative_flow_module.run_declarative_flow(str(definition_path))
assert "Flow declaration file is empty" in capsys.readouterr().err
@pytest.mark.parametrize(
"contents, expected_error",
[
("[]\n", "Flow declaration must contain a mapping"),
("schema: crewai.flow/v1\nmethods: {}\n", "Field required"),
],
)
def test_load_declarative_flow_reports_invalid_declarations(
tmp_path: Path,
capsys: pytest.CaptureFixture[str],
contents: str,
expected_error: str,
) -> None:
definition_path = tmp_path / "flow.yaml"
definition_path.write_text(contents, encoding="utf-8")
with pytest.raises(SystemExit) as exc_info:
run_declarative_flow_module.load_declarative_flow(str(definition_path))
assert exc_info.value.code == 1
stderr = capsys.readouterr().err
assert f"Unable to read --definition path {definition_path}:" in stderr
assert expected_error in stderr
def test_run_declarative_flow_in_project_env_uses_uv(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:

View File

@@ -1,7 +1,7 @@
"""Flow Definition: the serializable, declarative Flow contract.
Defines :class:`FlowDefinition` and its sub-models — a static, declarative
(JSON/YAML) representation of a Flow: its methods, trigger conditions,
Defines :class:`FlowDefinition` and its sub-models — a static declarative
representation of a Flow: its methods, trigger conditions,
state, and configuration. It is independent of the Python authoring
layer that may have produced it and of the engine that runs it (see
``runtime``).
@@ -235,7 +235,7 @@ class FlowPersistenceDefinition(BaseModel):
``persistence`` may hold a live backend when the definition is built from
a decorated class — the engine then persists through the exact instance
the user configured; the JSON/YAML projection degrades it to its
the user configured; the declarative projection degrades it to its
serialized config.
"""
@@ -275,7 +275,7 @@ class FlowHumanFeedbackDefinition(BaseModel):
"""Static human feedback configuration.
``llm`` and ``provider`` may hold live Python objects when the definition
is built from a decorated class; the JSON/YAML projection degrades them to
is built from a decorated class; the declarative projection degrades them to
a serialized config (``llm``) or a ``module:qualname`` ref (``provider``).
"""
@@ -777,7 +777,7 @@ class FlowDefinition(BaseModel):
return self
def to_dict(self, *, exclude_none: bool = True) -> dict[str, Any]:
"""Serialize the definition to a JSON/YAML-ready dictionary."""
"""Serialize the definition to a declaration-ready dictionary."""
return self.model_dump(by_alias=True, exclude_none=exclude_none, mode="json")
def to_json(self, *, indent: int | None = 2, exclude_none: bool = True) -> str:
@@ -817,16 +817,37 @@ class FlowDefinition(BaseModel):
return definition
@classmethod
def from_json(cls, data: str, *, source_path: Path | None = None) -> FlowDefinition:
"""Load a definition from JSON."""
return cls.from_dict(json.loads(data), source_path=source_path)
def from_declaration(
cls,
*,
contents: FlowDefinition | str | dict[str, Any] | None = None,
path: Path | str | None = None,
) -> FlowDefinition:
"""Load a declarative flow from contents or a file path."""
if isinstance(contents, cls):
return contents
@classmethod
def from_yaml(cls, data: str, *, source_path: Path | None = None) -> FlowDefinition:
"""Load a definition from YAML."""
loaded = yaml.safe_load(data) or {}
source_path: Path | None = None
if contents is None:
if path is None:
raise ValueError("Provide contents or path")
source_path = Path(path)
contents = source_path.expanduser().read_text(encoding="utf-8")
if isinstance(contents, dict):
return cls.from_dict(contents)
if not isinstance(contents, str):
raise TypeError("Flow declaration contents must be a string or dictionary")
if not contents.strip():
if source_path is not None:
raise ValueError(f"Flow declaration file is empty: {source_path}")
raise ValueError("Flow declaration contents are empty")
loaded = yaml.safe_load(contents)
if not isinstance(loaded, dict):
raise ValueError("Flow definition YAML must contain a mapping")
raise ValueError("Flow declaration must contain a mapping")
return cls.from_dict(loaded, source_path=source_path)
@classmethod

View File

@@ -25,6 +25,7 @@ from datetime import datetime
import enum
import inspect
import logging
from pathlib import Path
import threading
from typing import (
TYPE_CHECKING,
@@ -769,6 +770,21 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
@classmethod
def from_definition(cls, definition: FlowDefinition, **kwargs: Any) -> Flow[Any]:
"""Build a runnable Flow directly from a definition; no subclass required."""
return cls.from_declaration(contents=definition, **kwargs)
@classmethod
def from_declaration(
cls,
*,
contents: FlowDefinition | str | dict[str, Any] | None = None,
path: Path | str | None = None,
**kwargs: Any,
) -> Flow[Any]:
"""Build a runnable declarative flow from contents or a file path."""
definition = FlowDefinition.from_declaration(
contents=contents,
path=path,
)
return cls.model_validate(
{**definition.config.model_dump(), **kwargs},
context={"flow_definition": definition},

View File

@@ -613,7 +613,7 @@ def test_flow_definition_merges_stacked_listen_router():
assert methods["second_router"].emit == ["second_approval", "not_approved"]
def test_flow_definition_round_trips_json_and_yaml():
def test_flow_definition_round_trips_declaration_serialization():
class RoundTripFlow(Flow):
@start()
def begin(self):
@@ -629,16 +629,122 @@ def test_flow_definition_round_trips_json_and_yaml():
definition = RoundTripFlow.flow_definition()
json_round_trip = flow_definition.FlowDefinition.from_json(definition.to_json())
yaml_round_trip = flow_definition.FlowDefinition.from_yaml(definition.to_yaml())
round_trips = [
flow_definition.FlowDefinition.from_declaration(contents=definition.to_json()),
flow_definition.FlowDefinition.from_declaration(contents=definition.to_yaml()),
]
assert json_round_trip.to_dict() == definition.to_dict()
assert yaml_round_trip.to_dict() == definition.to_dict()
assert yaml_round_trip.methods["decide"].router is True
assert yaml_round_trip.methods["decide"].listen == "begin"
for round_trip in round_trips:
assert round_trip.to_dict() == definition.to_dict()
assert round_trip.methods["decide"].router is True
assert round_trip.methods["decide"].listen == "begin"
def test_each_action_round_trips_json_and_yaml():
def test_flow_definition_from_declaration_accepts_contents():
data = {
"schema": "crewai.flow/v1",
"name": "DeclarationFlow",
"methods": {
"begin": {
"start": True,
"do": {
"call": "expression",
"expr": "'started'",
},
},
},
}
definition = flow_definition.FlowDefinition.from_dict(data)
contents = [
definition,
data,
definition.to_json(),
definition.to_yaml(),
]
expected = definition.to_dict()
for content in contents:
loaded = flow_definition.FlowDefinition.from_declaration(contents=content)
assert loaded.to_dict() == expected
def test_flow_definition_from_declaration_rejects_empty_file(tmp_path: Path):
declaration_path = tmp_path / "flow.crewai"
declaration_path.write_text(" \n", encoding="utf-8")
with pytest.raises(ValueError, match="Flow declaration file is empty"):
flow_definition.FlowDefinition.from_declaration(path=declaration_path)
@pytest.mark.parametrize("contents", ["[]", "false", "0", "null", "~"])
def test_flow_definition_from_declaration_rejects_falsey_non_mapping_contents(
contents: str,
):
with pytest.raises(ValueError, match="Flow declaration must contain a mapping"):
flow_definition.FlowDefinition.from_declaration(contents=contents)
def test_flow_definition_from_declaration_accepts_paths(tmp_path: Path):
definition = flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "DeclarationFlow",
"methods": {
"begin": {
"start": True,
"do": {
"call": "expression",
"expr": "'started'",
},
},
},
}
)
declaration_path = tmp_path / "flow.crewai"
declaration_path.write_text(definition.to_yaml(), encoding="utf-8")
path_inputs = [
declaration_path,
str(declaration_path),
]
for path_input in path_inputs:
loaded = flow_definition.FlowDefinition.from_declaration(path=path_input)
assert loaded.to_dict() == definition.to_dict()
assert loaded.source_path == declaration_path.resolve()
def test_flow_definition_from_declaration_requires_input():
with pytest.raises(ValueError, match="Provide contents or path"):
flow_definition.FlowDefinition.from_declaration()
def test_flow_definition_from_declaration_prefers_contents_over_path(
tmp_path: Path,
):
data = {
"schema": "crewai.flow/v1",
"name": "ContentsFlow",
"methods": {
"begin": {
"start": True,
"do": {"call": "expression", "expr": "'started'"},
},
},
}
declaration_path = tmp_path / "missing.crewai"
loaded = flow_definition.FlowDefinition.from_declaration(
contents=data,
path=declaration_path,
)
assert loaded.name == "ContentsFlow"
assert loaded.source_path is None
def test_each_action_round_trips_declaration_serialization():
definition = flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
@@ -677,15 +783,17 @@ def test_each_action_round_trips_json_and_yaml():
}
)
json_round_trip = flow_definition.FlowDefinition.from_json(definition.to_json())
yaml_round_trip = flow_definition.FlowDefinition.from_yaml(definition.to_yaml())
round_trips = [
flow_definition.FlowDefinition.from_declaration(contents=definition.to_json()),
flow_definition.FlowDefinition.from_declaration(contents=definition.to_yaml()),
]
assert json_round_trip.to_dict() == definition.to_dict()
assert yaml_round_trip.to_dict() == definition.to_dict()
assert yaml_round_trip.methods["process_rows"].description == (
"Process every loaded row."
)
assert yaml_round_trip.methods["process_rows"].do.call == "each"
for round_trip in round_trips:
assert round_trip.to_dict() == definition.to_dict()
assert round_trip.methods["process_rows"].description == (
"Process every loaded row."
)
assert round_trip.methods["process_rows"].do.call == "each"
def test_flow_definition_rejects_invalid_method_names():

View File

@@ -454,7 +454,7 @@ def assert_parity(flow_cls, yaml_str, inputs=None, ordered=True):
class_flow = flow_cls()
class_result, class_events = _run_with_events(class_flow, inputs)
definition = FlowDefinition.from_yaml(yaml_str)
definition = FlowDefinition.from_declaration(contents=yaml_str)
definition_flow = Flow.from_definition(definition)
definition_result, definition_events = _run_with_events(definition_flow, inputs)
@@ -477,6 +477,21 @@ def test_simple_chain_parity():
assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"]
def test_flow_from_declaration_builds_runnable_flow():
flow = Flow.from_declaration(contents=CHAIN_YAML)
assert flow.kickoff() == "confirmed:True"
assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"]
def test_flow_from_declaration_accepts_flow_definition():
definition = FlowDefinition.from_declaration(contents=CHAIN_YAML)
flow = Flow.from_declaration(contents=definition)
assert flow.kickoff() == "confirmed:True"
assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"]
def test_and_or_merge_parity():
flow, _ = assert_parity(MergeFlow, MERGE_YAML, ordered=False)
assert flow.state["joined"] is True
@@ -499,7 +514,7 @@ def test_cyclic_flow_parity():
def test_definition_flow_events_use_definition_name():
definition = FlowDefinition.from_yaml(CHAIN_YAML)
definition = FlowDefinition.from_declaration(contents=CHAIN_YAML)
flow = Flow.from_definition(definition)
_, events = _run_with_events(flow)
assert events
@@ -588,7 +603,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff() == "found:ai agents"
@@ -639,7 +654,7 @@ methods:
listen: begin
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"topic": "ai"}) == "found:ai agents"
@@ -758,7 +773,7 @@ methods:
listen: begin
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff() == "search:hello agents"
@@ -783,7 +798,7 @@ methods:
listen: build_query
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff() == "found:ai agents news"
@@ -803,7 +818,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert (
flow.kickoff(inputs={"limit": 2, "domains": ["crewai.com", "example.com"]})
@@ -836,7 +851,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"question": "What is CrewAI?"}) == {
"agent": "Analyst",
@@ -874,7 +889,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"questions": ["one", "two"]}) == [
"Analyst:one",
@@ -905,7 +920,7 @@ def test_agent_action_round_trips_with_inline_definition():
}
)
round_trip = FlowDefinition.from_yaml(definition.to_yaml())
round_trip = FlowDefinition.from_declaration(contents=definition.to_yaml())
action = round_trip.to_dict()["methods"]["answer"]["do"]
assert action["call"] == "agent"
@@ -968,7 +983,7 @@ methods:
"""
with pytest.raises(ValidationError, match="invalid CEL expression"):
FlowDefinition.from_yaml(yaml_str)
FlowDefinition.from_declaration(contents=yaml_str)
def test_crew_action_runs_inline_yaml_definition(monkeypatch: pytest.MonkeyPatch):
@@ -1010,7 +1025,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"topic": "AI"}) == {
"crew": "inline_research",
@@ -1086,7 +1101,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"topic": "AI"}) == {
"crew": "referenced_research",
@@ -1160,9 +1175,7 @@ methods:
other_cwd.mkdir()
monkeypatch.chdir(other_cwd)
flow = Flow.from_definition(
FlowDefinition.from_yaml(yaml_str, source_path=flow_path)
)
flow = Flow.from_definition(FlowDefinition.from_declaration(path=flow_path))
assert flow.kickoff(inputs={"topic": "AI"}) == {
"crew": "relative_research",
@@ -1185,10 +1198,9 @@ methods:
from_declaration: ../outside/crew.jsonc
start: true
"""
flow_path.write_text(yaml_str, encoding="utf-8")
flow = Flow.from_definition(
FlowDefinition.from_yaml(yaml_str, source_path=flow_path)
)
flow = Flow.from_definition(FlowDefinition.from_declaration(path=flow_path))
with pytest.raises(
ValueError,
@@ -1411,7 +1423,7 @@ methods:
"""
with pytest.raises(ValidationError, match="invalid CEL expression"):
FlowDefinition.from_yaml(yaml_str)
FlowDefinition.from_declaration(contents=yaml_str)
def test_code_action_renders_keyword_inputs():
@@ -1429,7 +1441,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"name": "hello"}) == "hello!"
@@ -1448,7 +1460,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"value": "ok"}) == "callable:ok"
@@ -1472,7 +1484,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [
"normalized:a",
@@ -1499,7 +1511,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
caller_thread_id = threading.get_ident()
assert flow.kickoff(inputs={"rows": ["a"]}) == ["process_rows:a"]
@@ -1526,7 +1538,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"]
@@ -1548,7 +1560,7 @@ methods:
FlowScriptExecutionDisabledError,
match="CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1",
) as exc_info:
Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert "methods with unresolvable actions" not in str(exc_info.value)
@@ -1572,7 +1584,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"raw_score": 3.2}) == "rounded:4"
assert flow.state["rounded"] == 4
@@ -1601,7 +1613,7 @@ methods:
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff() == "alpha:alpha"
assert flow.state["input_matches_output"] is True
@@ -1639,7 +1651,7 @@ methods:
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"]
@@ -1671,7 +1683,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [
{"row": "a", "normalized": "saved:a"},
@@ -1700,7 +1712,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["a", "b"]
assert flow._method_outputs == [
@@ -1738,7 +1750,7 @@ methods:
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [
"local:a",
@@ -1777,7 +1789,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(
inputs={
@@ -1811,7 +1823,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(inputs={"rows": [{"kind": "keep", "value": "a"}]}) == ["a"]
@@ -1838,7 +1850,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert flow.kickoff(
inputs={
@@ -1868,7 +1880,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
with pytest.raises(ValueError, match="if expression must evaluate to a boolean"):
flow.kickoff(inputs={"rows": [{"value": "truthy"}]})
@@ -1898,7 +1910,7 @@ methods:
listen: process_rows
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
events = []
with crewai_event_bus.scoped_handlers():
@@ -2069,7 +2081,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
with pytest.raises(RuntimeError, match="bad row"):
flow.kickoff(inputs={"rows": ["ok", "bad"]})
@@ -2190,7 +2202,7 @@ methods:
listen: right
"""
definition = FlowDefinition.from_yaml(yaml_str)
definition = FlowDefinition.from_declaration(contents=yaml_str)
assert Flow.from_definition(definition).kickoff(
inputs={"direction": "left"}
@@ -2213,7 +2225,7 @@ methods:
"""
with pytest.raises(ValidationError, match="invalid CEL expression"):
FlowDefinition.from_yaml(yaml_str)
FlowDefinition.from_declaration(contents=yaml_str)
def test_expression_action_rejects_unknown_cel_root():
@@ -2229,7 +2241,7 @@ methods:
"""
with pytest.raises(ValidationError, match="unknown CEL root"):
FlowDefinition.from_yaml(yaml_str)
FlowDefinition.from_declaration(contents=yaml_str)
def test_tool_action_requires_module_qualname_ref():
@@ -2263,14 +2275,16 @@ def test_pydantic_state_from_ref_parity():
def test_pydantic_state_default_overlay():
flow = Flow.from_definition(FlowDefinition.from_yaml(PYDANTIC_STATE_OVERLAY_YAML))
flow = Flow.from_definition(
FlowDefinition.from_declaration(contents=PYDANTIC_STATE_OVERLAY_YAML)
)
result = flow.kickoff()
assert result == "count=6"
assert flow.state.count == 6
def test_json_schema_state():
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=JSON_SCHEMA_STATE_YAML))
result = flow.kickoff()
assert result == "count=1"
assert flow.state.count == 1
@@ -2279,14 +2293,14 @@ def test_json_schema_state():
def test_json_schema_state_validates_inputs():
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=JSON_SCHEMA_STATE_YAML))
with pytest.raises(ValueError, match="Invalid inputs"):
flow.kickoff(inputs={"count": "not-a-number"})
def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable():
flow = Flow.from_definition(
FlowDefinition.from_yaml(PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML)
FlowDefinition.from_declaration(contents=PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML)
)
result = flow.kickoff()
assert result == "count=1"
@@ -2295,7 +2309,9 @@ def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable():
def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog):
with caplog.at_level("ERROR"):
flow = Flow.from_definition(FlowDefinition.from_yaml(UNRESOLVABLE_STATE_YAML))
flow = Flow.from_definition(
FlowDefinition.from_declaration(contents=UNRESOLVABLE_STATE_YAML)
)
assert "falling back to dict state" in caplog.text
result = flow.kickoff()
@@ -2305,7 +2321,7 @@ def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog):
def test_dict_state_is_a_copy_of_default_plus_id():
definition = FlowDefinition.from_yaml(DICT_STATE_YAML)
definition = FlowDefinition.from_declaration(contents=DICT_STATE_YAML)
flow = Flow.from_definition(definition)
assert flow.state["count"] == 5
@@ -2322,7 +2338,7 @@ def test_dict_state_is_a_copy_of_default_plus_id():
def test_unknown_state_type_falls_back_to_dict(caplog):
with caplog.at_level("WARNING"):
flow = Flow.from_definition(FlowDefinition.from_yaml(UNKNOWN_STATE_YAML))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=UNKNOWN_STATE_YAML))
assert "falling back to dict state" in caplog.text
result = flow.kickoff()
@@ -2395,7 +2411,7 @@ def _run_capturing_flow_lifecycle(yaml_str, event_types):
def capture(source, event):
events.append(event)
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
result = flow.kickoff()
return flow, result, events
@@ -2409,7 +2425,7 @@ _LIFECYCLE_EVENTS = [
]
def test_config_suppress_flow_events_from_yaml():
def test_config_suppress_flow_events_from_declaration():
twin_events = []
with crewai_event_bus.scoped_handlers():
for event_type in _LIFECYCLE_EVENTS:
@@ -2432,14 +2448,14 @@ def test_config_suppress_flow_events_from_yaml():
)
def test_config_max_method_calls_from_yaml():
flow = Flow.from_definition(FlowDefinition.from_yaml(CAPPED_LOOP_YAML))
def test_config_max_method_calls_from_declaration():
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=CAPPED_LOOP_YAML))
with pytest.raises(RecursionError, match="has been called 2 times"):
flow.kickoff()
def test_config_stream_from_yaml():
flow = Flow.from_definition(FlowDefinition.from_yaml(STREAMING_CHAIN_YAML))
def test_config_stream_from_declaration():
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=STREAMING_CHAIN_YAML))
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
for _ in streaming:
@@ -2448,7 +2464,7 @@ def test_config_stream_from_yaml():
assert flow.stream is True
def test_config_defer_trace_finalization_from_yaml():
def test_config_defer_trace_finalization_from_declaration():
_, _, baseline_events = _run_capturing_flow_lifecycle(
CHAIN_YAML, [FlowFinishedEvent]
)
@@ -2462,7 +2478,7 @@ def test_config_defer_trace_finalization_from_yaml():
assert deferred_events == []
def test_config_checkpoint_from_yaml(tmp_path):
def test_config_checkpoint_from_declaration(tmp_path):
yaml_str = (
CHAIN_YAML
+ f"""
@@ -2471,19 +2487,23 @@ config:
location: {tmp_path}
"""
)
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
assert isinstance(flow.checkpoint, CheckpointConfig)
assert flow.checkpoint.location == str(tmp_path)
def test_config_input_provider_from_yaml():
flow = Flow.from_definition(FlowDefinition.from_yaml(INPUT_PROVIDER_CHAIN_YAML))
def test_config_input_provider_from_declaration():
flow = Flow.from_definition(
FlowDefinition.from_declaration(contents=INPUT_PROVIDER_CHAIN_YAML)
)
assert isinstance(flow.input_provider, StubInputProvider)
def test_round_trip_config_equivalence():
class_flow = ConfiguredFlow()
definition = FlowDefinition.from_yaml(ConfiguredFlow.flow_definition().to_yaml())
definition = FlowDefinition.from_declaration(
contents=ConfiguredFlow.flow_definition().to_yaml()
)
definition_flow = Flow.from_definition(definition)
assert definition.config.suppress_flow_events is True
@@ -2653,9 +2673,9 @@ class MethodPersistedFlow(Flow):
return "two"
def test_flow_level_persist_from_yaml_saves_once_per_method():
def test_flow_level_persist_from_declaration_saves_once_per_method():
yaml_str = _flow_level_persist_yaml("yaml-flow-level")
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
result = flow.kickoff()
assert result == "two"
@@ -2665,9 +2685,9 @@ def test_flow_level_persist_from_yaml_saves_once_per_method():
assert final_save["id"] == flow.state["id"]
def test_method_level_persist_from_yaml_saves_only_that_method():
def test_method_level_persist_from_declaration_saves_only_that_method():
yaml_str = _method_level_persist_yaml("yaml-method-level")
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow.kickoff()
assert _saved_methods("yaml-method-level") == ["first"]
@@ -2696,20 +2716,20 @@ methods:
persist:
enabled: false
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow.kickoff()
assert _saved_methods("yaml-opt-out") == ["first"]
def test_persist_restore_by_id_from_yaml():
def test_persist_restore_by_id_from_declaration():
yaml_str = _flow_level_persist_yaml("yaml-restore")
flow1 = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow1 = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow1.kickoff()
assert flow1.state["count"] == 2
flow2 = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow2 = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow2.kickoff(inputs={"id": flow1.state["id"]})
assert flow2.state["count"] == 4
@@ -2729,7 +2749,9 @@ def test_method_level_persist_decorator_saves_only_that_method():
def test_round_trip_persist_equivalence():
definition = FlowDefinition.from_yaml(ClassPersistedFlow.flow_definition().to_yaml())
definition = FlowDefinition.from_declaration(
contents=ClassPersistedFlow.flow_definition().to_yaml()
)
before = len(DefinitionStoreBackend.saves["class-decorator"])
flow = Flow.from_definition(definition)
@@ -2738,7 +2760,7 @@ def test_round_trip_persist_equivalence():
assert _saved_methods("class-decorator")[before:] == ["first", "second"]
def test_method_persist_backend_overrides_flow_level_backend_from_yaml():
def test_method_persist_backend_overrides_flow_level_backend_from_declaration():
yaml_str = f"""
schema: crewai.flow/v1
name: PersistedFlow
@@ -2762,7 +2784,7 @@ methods:
persistence_type: DefinitionStoreBackend
store: yaml-mixed-method
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow.kickoff()
assert _saved_methods("yaml-mixed-flow") == ["first"]
@@ -2910,8 +2932,8 @@ methods:
"""
def test_human_feedback_from_yaml_default_outcome_routes():
flow = Flow.from_definition(FlowDefinition.from_yaml(REVIEW_YAML))
def test_human_feedback_from_declaration_default_outcome_routes():
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=REVIEW_YAML))
with patch.object(flow, "_request_human_feedback", return_value="") as request:
result = flow.kickoff()
@@ -2922,8 +2944,8 @@ def test_human_feedback_from_yaml_default_outcome_routes():
assert flow.last_human_feedback.output == "draft-content"
def test_human_feedback_from_yaml_collapses_and_routes():
flow = Flow.from_definition(FlowDefinition.from_yaml(REVIEW_YAML))
def test_human_feedback_from_declaration_collapses_and_routes():
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=REVIEW_YAML))
with (
patch.object(flow, "_request_human_feedback", return_value="ship it"),
@@ -2940,7 +2962,7 @@ def test_round_trip_human_feedback_equivalence():
with patch.object(class_flow, "_request_human_feedback", return_value=""):
class_result = class_flow.kickoff()
definition = FlowDefinition.from_yaml(ReviewFlow.flow_definition().to_yaml())
definition = FlowDefinition.from_declaration(contents=ReviewFlow.flow_definition().to_yaml())
twin = Flow.from_definition(definition)
with patch.object(twin, "_request_human_feedback", return_value=""):
twin_result = twin.kickoff()
@@ -2953,8 +2975,8 @@ def test_round_trip_human_feedback_equivalence():
)
def test_human_feedback_pending_and_resume_from_yaml():
definition = FlowDefinition.from_yaml(PENDING_REVIEW_YAML)
def test_human_feedback_pending_and_resume_from_declaration():
definition = FlowDefinition.from_declaration(contents=PENDING_REVIEW_YAML)
flow = Flow.from_definition(definition)
pending = flow.kickoff()
@@ -2975,7 +2997,7 @@ def test_human_feedback_pending_and_resume_from_yaml():
assert flow_id not in DefinitionStoreBackend.pending
def test_flow_config_provider_fallback_from_yaml():
def test_flow_config_provider_fallback_from_declaration():
yaml_str = f"""
schema: crewai.flow/v1
name: ConfigProviderFlow
@@ -3001,7 +3023,7 @@ methods:
return "from-config"
provider = RecordingProvider()
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
previous = flow_config.hitl_provider
flow_config.hitl_provider = provider
@@ -3104,7 +3126,7 @@ methods:
message: "Review:"
provider: {__name__}:_NeedsArgsProvider
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
with pytest.raises(
ValueError, match="cannot instantiate human_feedback.provider ref"
@@ -3125,7 +3147,7 @@ methods:
message: "Review:"
provider: missing_module_xyz:Provider
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
with pytest.raises(
ValueError, match="unresolvable human_feedback.provider ref"
@@ -3137,7 +3159,7 @@ def _checkpoint_chain_flow(tmp_path):
from crewai.state.provider.json_provider import JsonProvider
from crewai.state.runtime import RuntimeState
definition = FlowDefinition.from_yaml(CHAIN_YAML)
definition = FlowDefinition.from_declaration(contents=CHAIN_YAML)
flow = Flow.from_definition(definition)
result = flow.kickoff()
assert result == "confirmed:True"
@@ -3177,7 +3199,7 @@ state:
methods: {}
"""
with pytest.raises(ValidationError, match="default"):
FlowDefinition.from_yaml(yaml_str)
FlowDefinition.from_declaration(contents=yaml_str)
def test_definition_method_missing_from_class_fails_loudly():