Require explicit CrewAI project definitions (#6358)

* Require explicit CrewAI project definitions

JSON crews and declarative flows now resolve from `[tool.crewai]`
metadata instead of implicit filename discovery. This makes project type
selection deterministic, prevents stray `crew.json(c)` files from changing
CLI behavior, and centralizes definition path validation for run, install,
deploy validation, plotting, and memory reset paths.

`[tool.crewai].definition` must be a project-local file path. Absolute
paths, `~`, missing files, directories, and paths escaping the project root
are rejected so deploy and runtime commands use the same contract.

Breaking changes and migration paths:

* JSON crew projects are no longer discovered from `crew.json` or
  `crew.jsonc` alone. Add explicit metadata:

  ```toml
  [tool.crewai]
  type = "crew"
  definition = "crew.jsonc"
  ```

* Declarative flow projects must use a valid project-local definition path:

  ```toml
  [tool.crewai]
  type = "flow"
  definition = "flows/research.yaml"
  ```

* `Flow.from_definition(definition)` is removed. Use:

  ```python
  Flow.from_declaration(contents=definition)
  ```

* `FlowDefinition.to_json()` and `FlowDefinition.to_yaml()` are removed.
  Use `FlowDefinition.to_dict()` and serialize with the caller's JSON or
  YAML library.

* `FlowDefinition.from_dict()` is removed. Use:

  ```python
  FlowDefinition.from_declaration(contents=data)
  ```

* `FlowDefinition.json_schema()` is removed. Use Pydantic's schema API only
  where schema generation is intentionally needed:

  ```python
  FlowDefinition.model_json_schema(by_alias=True)
  ```

* `crewai_cli.run_crew.find_crew_json_file()` and `_has_json_crew()` are
  removed. Use `configured_project_json_crew()` or the shared
  `crewai_core.project.configured_project_definition("crew")` helper.

* `crewai reset-memories` now only loads JSON crews declared through
  `[tool.crewai].definition`, and invalid declared JSON crew definitions
  fail instead of silently falling back to classic crew discovery.

* Address code review comments
This commit is contained in:
Vinicius Brasil
2026-06-26 12:07:03 -07:00
committed by GitHub
parent e10c17fcf6
commit 596150188b
19 changed files with 723 additions and 471 deletions

View File

@@ -102,6 +102,7 @@ only-include = ["agents", "crew.jsonc", "tools", "knowledge", "skills"]
[tool.crewai]
type = "crew"
definition = "crew.jsonc"
"""
_GITIGNORE = """\

View File

@@ -40,14 +40,18 @@ from typing import Any
from crewai.project.json_loader import (
JSONProjectValidationError,
find_crew_json_file,
find_json_project_file,
validate_crew_project,
)
from crewai_core.project import (
ProjectDefinitionError,
configured_project_definition,
get_crewai_project_config,
get_crewai_project_type,
read_toml,
)
from rich.console import Console
from crewai_cli.utils import parse_toml
console = Console()
logger = logging.getLogger(__name__)
@@ -159,24 +163,16 @@ class DeployValidator:
@property
def _is_json_crew(self) -> bool:
"""True for JSON crew projects, deferring to the declared type.
A flow project that also contains a crew.json(c) file validates as
the flow it declares in pyproject.toml, not as a JSON crew.
"""
if find_crew_json_file(self.project_root) is None:
return False
"""True for JSON crew projects with configured crew definitions."""
pyproject_path = self.project_root / "pyproject.toml"
if not pyproject_path.exists():
return True
return False
try:
data = parse_toml(pyproject_path.read_text())
data = read_toml(pyproject_path)
except Exception:
return True
declared_type: str | None = (
(data.get("tool") or {}).get("crewai", {}).get("type")
)
return declared_type != "flow"
return False
crewai_config = get_crewai_project_config(data)
return crewai_config.get("type") == "crew" and "definition" in crewai_config
def run(self) -> list[ValidationResult]:
"""Run all checks. Later checks are skipped when earlier ones make
@@ -208,14 +204,32 @@ class DeployValidator:
def _run_json_checks(self) -> list[ValidationResult]:
"""Validation suite for JSON-defined crew projects."""
crew_path = find_crew_json_file(self.project_root)
self._check_pyproject()
self._check_lockfile()
try:
crew_path = configured_project_definition(
"crew",
pyproject_data=self._pyproject,
project_root=self.project_root,
)
except ProjectDefinitionError as exc:
self._add(
Severity.ERROR,
"invalid_crew_definition",
"[tool.crewai] definition is invalid",
detail=str(exc),
hint=(
"Set `[tool.crewai] definition` to a project-local JSON "
"or JSONC crew file."
),
)
return self.results
if crew_path is None:
return self.results
agents_dir = self.project_root / "agents"
self._check_pyproject()
self._check_lockfile()
agents_dir = crew_path.parent / "agents"
agents_dir_ok = self._check_json_agents_dir(agents_dir)
project = None
@@ -346,7 +360,7 @@ class DeployValidator:
return False
try:
self._pyproject = parse_toml(pyproject_path.read_text())
self._pyproject = read_toml(pyproject_path)
except Exception as e:
self._add(
Severity.ERROR,
@@ -374,9 +388,7 @@ class DeployValidator:
self._project_name = name
self._package_name = normalize_package_name(name)
self._is_flow = (self._pyproject.get("tool") or {}).get("crewai", {}).get(
"type"
) == "flow"
self._is_flow = get_crewai_project_type(self._pyproject) == "flow"
return True
def _check_lockfile(self) -> None:

View File

@@ -2,53 +2,35 @@ from pathlib import Path
import subprocess
import click
from crewai_core.project import configured_project_definition, read_toml
from crewai_cli.deploy.validate import normalize_package_name
from crewai_cli.utils import build_env_with_all_tool_credentials, parse_toml
def _find_json_crew_file(project_root: Path | None = None) -> Path | None:
"""Return the JSON crew definition path when present."""
root = project_root or Path.cwd()
for filename in ("crew.jsonc", "crew.json"):
crew_path = root / filename
if crew_path.is_file():
return crew_path
return None
from crewai_cli.utils import build_env_with_all_tool_credentials
def _is_json_crew_project(project_root: Path | None = None) -> bool:
"""Return True for JSON crew projects that do not need package install."""
root = project_root or Path.cwd()
if _find_json_crew_file(root) is None:
return False
pyproject_path = root / "pyproject.toml"
if not pyproject_path.is_file():
return True
return False
try:
pyproject = parse_toml(pyproject_path.read_text())
except Exception:
return True
if not isinstance(pyproject, dict):
return True
pyproject = read_toml(pyproject_path)
tool_config = pyproject.get("tool") or {}
crewai_config = tool_config.get("crewai") if isinstance(tool_config, dict) else None
declared_type = (
crewai_config.get("type") if isinstance(crewai_config, dict) else None
)
project_config = pyproject.get("project") or {}
project_name = (
project_config.get("name") if isinstance(project_config, dict) else None
)
if isinstance(project_name, str):
package_name = normalize_package_name(project_name)
if package_name and (root / "src" / package_name / "crew.py").is_file():
return False
if (
configured_project_definition(
"crew", pyproject_data=pyproject, project_root=root
)
is None
):
return False
return declared_type != "flow"
project_name = pyproject.get("project", {}).get("name", "")
package_name = normalize_package_name(project_name)
if package_name and (root / "src" / package_name / "crew.py").is_file():
return False
return True
# Be mindful about changing this.

View File

@@ -1,23 +1,27 @@
from __future__ import annotations
from collections.abc import Callable
from contextlib import AbstractContextManager, nullcontext
import os
from pathlib import Path
import re
import subprocess
import sys
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any
import click
from crewai_core.constants import CREWAI_TRAINED_AGENTS_FILE_ENV
from crewai_core.project import (
ProjectDefinitionError,
configured_project_definition,
get_crewai_project_type,
read_toml,
)
from packaging import version
from crewai_cli.utils import (
build_env_with_all_tool_credentials,
enable_prompt_line_editing,
is_dmn_mode_enabled,
read_toml,
)
from crewai_cli.version import get_crewai_tools_dependency, get_crewai_version
@@ -32,6 +36,7 @@ if TYPE_CHECKING:
_INPUT_PLACEHOLDER_RE = re.compile(r"(?<!{){([A-Za-z_][A-Za-z0-9_\-]*)}(?!})")
_CREWAI_CLI_RUNNER_PACKAGE_DIR_ENV = "CREWAI_CLI_RUNNER_PACKAGE_DIR"
_CREWAI_RUNNER_SOURCE_DIR_ENV = "CREWAI_RUNNER_SOURCE_DIR"
_CREWAI_JSON_CREW_DEFINITION_ENV = "CREWAI_JSON_CREW_DEFINITION"
_FULL_CREWAI_INSTALL_MESSAGE = f"""\
CrewAI CLI is installed without the `crewai` package required to run crews.
@@ -75,22 +80,20 @@ module_spec.loader.exec_module(module)
from crewai_core.constants import CREWAI_TRAINED_AGENTS_FILE_ENV
kwargs = {
"trained_agents_file": os.getenv(CREWAI_TRAINED_AGENTS_FILE_ENV),
}
if crew_definition := os.getenv("CREWAI_JSON_CREW_DEFINITION"):
kwargs["crew_path"] = crew_definition
try:
module._run_json_crew(
trained_agents_file=os.getenv(CREWAI_TRAINED_AGENTS_FILE_ENV)
)
module._run_json_crew(**kwargs)
except module.click.ClickException as exc:
exc.show()
raise SystemExit(exc.exit_code)
""".strip()
def _import_find_crew_json_file() -> Callable[[], Path | None]:
from crewai.project.json_loader import find_crew_json_file as _find_crew_json_file
return cast("Callable[[], Path | None]", _find_crew_json_file)
def _is_missing_crewai_package(exc: ModuleNotFoundError) -> bool:
return bool(exc.name and exc.name.startswith("crewai"))
@@ -99,32 +102,23 @@ def _full_crewai_install_error() -> click.ClickException:
return click.ClickException(_FULL_CREWAI_INSTALL_MESSAGE)
def find_crew_json_file() -> Path | None:
def configured_project_json_crew(
pyproject_data: dict[str, Any] | None = None,
project_root: Path | None = None,
) -> Path | None:
"""Return the configured JSON crew definition for crew projects."""
root = project_root or Path.cwd()
if pyproject_data is None and not (root / "pyproject.toml").is_file():
return None
try:
return _import_find_crew_json_file()()
except ModuleNotFoundError as exc:
if _is_missing_crewai_package(exc):
raise _full_crewai_install_error() from exc
raise
def _has_json_crew() -> bool:
"""Check if this is a JSON-defined crew project.
The project type declared in pyproject.toml wins: a flow project that
happens to contain a crew.json(c) file still runs as a flow. A missing
or unreadable pyproject means a bare JSON crew project.
"""
if find_crew_json_file() is None:
return False
try:
pyproject_data = read_toml()
except Exception:
return True
declared_type: str | None = (
pyproject_data.get("tool", {}).get("crewai", {}).get("type")
)
return declared_type != "flow"
return configured_project_definition(
"crew",
pyproject_data=pyproject_data,
project_root=root,
)
except ProjectDefinitionError as exc:
raise click.UsageError(str(exc)) from exc
def _extract_input_placeholders(text: str | None) -> set[str]:
@@ -199,7 +193,12 @@ def _json_loading_status(message: str) -> AbstractContextManager[Any]:
def _load_json_crew(crew_path: Path) -> tuple[Any, dict[str, Any]]:
from crewai.project.crew_loader import load_crew
try:
from crewai.project.crew_loader import load_crew
except ModuleNotFoundError as exc:
if _is_missing_crewai_package(exc):
raise _full_crewai_install_error() from exc
raise
return load_crew(crew_path)
@@ -262,7 +261,10 @@ def _run_json_crew_without_tui(crew_path: Path) -> Any:
return result
def _run_json_crew(trained_agents_file: str | None = None) -> Any:
def _run_json_crew(
trained_agents_file: str | None = None,
crew_path: str | Path | None = None,
) -> Any:
"""Load and run a JSON-defined crew."""
from dotenv import load_dotenv
@@ -275,9 +277,13 @@ def _run_json_crew(trained_agents_file: str | None = None) -> Any:
if trained_agents_file:
os.environ[CREWAI_TRAINED_AGENTS_FILE_ENV] = trained_agents_file
crew_path = find_crew_json_file()
if crew_path is None:
raise FileNotFoundError("No crew.jsonc or crew.json found")
crew_path = configured_project_json_crew()
if crew_path is None:
raise FileNotFoundError(
"No JSON crew definition configured in [tool.crewai].definition"
)
crew_path = Path(crew_path)
if is_dmn_mode_enabled():
return _run_json_crew_without_tui(crew_path)
@@ -391,10 +397,16 @@ def _json_crew_run_command(project_root: Path | None = None) -> list[str]:
return ["uv", "run", "--no-sync", "python", "-c", _JSON_CREW_RUNNER_CODE]
def _run_json_crew_in_project_env(trained_agents_file: str | None = None) -> Any:
def _run_json_crew_in_project_env(
trained_agents_file: str | None = None,
crew_path: str | Path | None = None,
) -> Any:
"""Run JSON crews from the project's uv-managed environment."""
if not (Path.cwd() / "pyproject.toml").is_file():
return _run_json_crew(trained_agents_file=trained_agents_file)
return _run_json_crew(
trained_agents_file=trained_agents_file,
crew_path=crew_path,
)
_install_json_crew_dependencies_if_needed()
@@ -405,6 +417,8 @@ def _run_json_crew_in_project_env(trained_agents_file: str | None = None) -> Any
env[_CREWAI_RUNNER_SOURCE_DIR_ENV] = str(local_crewai_source_dir)
if trained_agents_file:
env[CREWAI_TRAINED_AGENTS_FILE_ENV] = trained_agents_file
if crew_path is not None:
env[_CREWAI_JSON_CREW_DEFINITION_ENV] = str(crew_path)
try:
subprocess.run( # noqa: S603
@@ -557,13 +571,16 @@ def run_crew(
)
return
if _has_json_crew():
_run_json_crew_in_project_env(trained_agents_file=trained_agents_file)
pyproject_data = read_toml()
if json_crew_definition := configured_project_json_crew(pyproject_data):
_run_json_crew_in_project_env(
trained_agents_file=trained_agents_file,
crew_path=json_crew_definition,
)
return
pyproject_data = read_toml()
_warn_if_old_poetry_project(pyproject_data)
project_type = _get_project_type(pyproject_data)
project_type = get_crewai_project_type(pyproject_data)
if project_type == "flow":
_run_flow_project(
@@ -627,11 +644,6 @@ def _run_classic_crew_project(
)
def _get_project_type(pyproject_data: dict[str, Any]) -> str | None:
project_type = pyproject_data.get("tool", {}).get("crewai", {}).get("type")
return project_type if isinstance(project_type, str) else None
def _warn_if_old_poetry_project(pyproject_data: dict[str, Any]) -> None:
crewai_version = get_crewai_version()
min_required_version = "0.71.0"

View File

@@ -1,11 +1,12 @@
from __future__ import annotations
import json
from pathlib import Path, PureWindowsPath
from pathlib import Path
import subprocess
from typing import Any
import click
from crewai_core.project import ProjectDefinitionError, configured_project_definition
from pydantic import ValidationError
from crewai_cli.utils import build_env_with_all_tool_credentials
@@ -105,80 +106,18 @@ def configured_project_declarative_flow(
project_root: Path | None = None,
) -> Path | None:
"""Return the configured declarative flow source for flow projects."""
if pyproject_data is None:
try:
from crewai_cli.utils import read_toml
pyproject_data = read_toml()
except Exception:
return None
crewai_config = pyproject_data.get("tool", {}).get("crewai", {})
if crewai_config.get("type") != "flow":
root = project_root or Path.cwd()
if pyproject_data is None and not (root / "pyproject.toml").is_file():
return None
definition = crewai_config.get("definition")
if not isinstance(definition, str):
return None
definition = definition.strip()
if not definition:
return None
return _resolve_project_definition_path(
definition=definition,
project_root=project_root or Path.cwd(),
)
def _resolve_project_definition_path(definition: str, project_root: Path) -> Path:
definition_path = Path(definition)
windows_definition_path = PureWindowsPath(definition)
if definition.startswith("~"):
raise click.UsageError(
"[tool.crewai] definition must be a project-local path; "
f"got {definition!r}."
)
if definition_path.is_absolute() or windows_definition_path.is_absolute():
raise click.UsageError(
"[tool.crewai] definition must be relative to the project root; "
f"got {definition!r}."
)
try:
root = project_root.resolve(strict=True)
except OSError as exc:
raise click.UsageError(
f"Invalid project root for [tool.crewai] definition: {exc}"
) from exc
candidate = root / definition_path
try:
resolved_candidate = candidate.resolve(strict=False)
except OSError as exc:
raise click.UsageError(
f"Invalid [tool.crewai] definition path {definition!r}: {exc}"
) from exc
if not resolved_candidate.is_relative_to(root):
raise click.UsageError(
"[tool.crewai] definition must resolve inside the project root; "
f"got {definition!r}."
return configured_project_definition(
"flow",
pyproject_data=pyproject_data,
project_root=root,
)
if not resolved_candidate.exists():
raise click.UsageError(
"[tool.crewai] definition must point to an existing file; "
f"got {definition!r}."
)
if not resolved_candidate.is_file():
raise click.UsageError(
"[tool.crewai] definition must point to a regular file; "
f"got {definition!r}."
)
return resolved_candidate
except ProjectDefinitionError as exc:
raise click.UsageError(str(exc)) from exc
def _execute_declarative_flow_command(command: list[str]) -> None:

View File

@@ -146,6 +146,7 @@ build-backend = "hatchling.build"
[tool.crewai]
type = "crew"
definition = "crew.jsonc"
""".strip()
+ "\n"
)
@@ -180,6 +181,7 @@ dependencies = ["crewai[tools]>=1.15.0,<2.0.0"]
[tool.crewai]
type = "crew"
definition = "crew.jsonc"
""".strip()
+ "\n"
)
@@ -221,6 +223,7 @@ custom = "custom.module:main"
[tool.crewai]
type = "crew"
definition = "crew.jsonc"
""".strip()
+ "\n"
)

View File

@@ -111,7 +111,12 @@ def _run_without_import_check(root: Path) -> DeployValidator:
def _scaffold_json_crew(root: Path, *, task_agent: str = "researcher") -> None:
(root / "pyproject.toml").write_text(_make_pyproject(name="json_crew"))
(root / "pyproject.toml").write_text(
_make_pyproject(
name="json_crew",
extra='[tool.crewai]\ntype = "crew"\ndefinition = "crew.jsonc"',
)
)
(root / "uv.lock").write_text("# dummy uv lockfile\n")
agents_dir = root / "agents"
agents_dir.mkdir()
@@ -221,7 +226,6 @@ def test_json_crew_reports_project_metadata_before_invalid_json(
tmp_path: Path,
) -> None:
_scaffold_json_crew(tmp_path)
(tmp_path / "pyproject.toml").unlink()
(tmp_path / "uv.lock").unlink()
(tmp_path / "crew.jsonc").write_text('{"agents": ["researcher"], "tasks": []}\n')
@@ -229,7 +233,6 @@ def test_json_crew_reports_project_metadata_before_invalid_json(
v.run()
codes = _codes(v)
assert "missing_pyproject" in codes
assert "missing_lockfile" in codes
assert "invalid_crew_json" in codes
assert "missing_src_dir" not in codes
@@ -546,17 +549,43 @@ def test_is_json_crew_defers_to_declared_flow_type(tmp_path):
assert DeployValidator(project_root=tmp_path)._is_json_crew is False
def test_is_json_crew_true_for_declared_crew_type(tmp_path):
def test_is_json_crew_true_for_declared_crew_definition(tmp_path):
(tmp_path / "crew.jsonc").write_text("{}")
(tmp_path / "pyproject.toml").write_text(
'[project]\nname = "demo"\nversion = "0.1.0"\n\n'
'[tool.crewai]\ntype = "crew"\ndefinition = "crew.jsonc"\n'
)
assert DeployValidator(project_root=tmp_path)._is_json_crew is True
def test_is_json_crew_false_for_declared_crew_without_definition(tmp_path):
(tmp_path / "crew.jsonc").write_text("{}")
(tmp_path / "pyproject.toml").write_text(
'[project]\nname = "demo"\nversion = "0.1.0"\n\n'
'[tool.crewai]\ntype = "crew"\n'
)
assert DeployValidator(project_root=tmp_path)._is_json_crew is True
assert DeployValidator(project_root=tmp_path)._is_json_crew is False
def test_is_json_crew_true_without_pyproject(tmp_path):
def test_json_crew_non_string_definition_reports_invalid_definition(
tmp_path: Path,
) -> None:
(tmp_path / "pyproject.toml").write_text(
'[project]\nname = "demo"\nversion = "0.1.0"\n\n'
'[tool.crewai]\ntype = "crew"\ndefinition = ["crew.jsonc"]\n'
)
v = DeployValidator(project_root=tmp_path)
v.run()
finding = next(r for r in v.results if r.code == "invalid_crew_definition")
assert finding.severity is Severity.ERROR
assert "must be a string" in finding.detail
def test_is_json_crew_false_without_pyproject(tmp_path):
(tmp_path / "crew.jsonc").write_text("{}")
assert DeployValidator(project_root=tmp_path)._is_json_crew is True
assert DeployValidator(project_root=tmp_path)._is_json_crew is False

View File

@@ -741,6 +741,10 @@ def test_json_create_provider_preselects_default_model(tmp_path, monkeypatch):
assert pyproject["tool"]["hatch"]["build"]["targets"]["wheel"][
"only-include"
] == ["agents", "crew.jsonc", "tools", "knowledge", "skills"]
assert pyproject["tool"]["crewai"] == {
"type": "crew",
"definition": "crew.jsonc",
}
crew_template = (tmp_path / "json_crew" / "crew.jsonc").read_text()
assert (

View File

@@ -26,6 +26,7 @@ name = "json_crew"
[tool.crewai]
type = "crew"
definition = "crew.jsonc"
""".strip()
)
(tmp_path / "crew.jsonc").write_text("{}\n")
@@ -45,6 +46,7 @@ name = "hybrid-crew"
[tool.crewai]
type = "crew"
definition = "crew.jsonc"
""".strip()
)
(tmp_path / "crew.jsonc").write_text("{}\n")

View File

@@ -16,12 +16,17 @@ def test_missing_crewai_package_shows_full_install_hint(monkeypatch):
def missing_crewai_package():
raise ModuleNotFoundError("No module named 'crewai'", name="crewai")
monkeypatch.setattr(
run_crew_module, "_import_find_crew_json_file", missing_crewai_package
)
real_import = __import__
def fake_import(name, *args, **kwargs):
if name == "crewai.project.crew_loader":
missing_crewai_package()
return real_import(name, *args, **kwargs)
monkeypatch.setattr("builtins.__import__", fake_import)
with pytest.raises(click.ClickException) as exc_info:
run_crew_module.find_crew_json_file()
run_crew_module._load_json_crew(Path("crew.jsonc"))
message = exc_info.value.message
assert "CrewAI CLI is installed without the `crewai` package" in message
@@ -31,11 +36,17 @@ def test_missing_crewai_package_shows_full_install_hint(monkeypatch):
def test_run_crew_forwards_trained_agents_file_to_json_crews(monkeypatch):
"""crewai run -f must reach JSON crews, not only classic subprocess crews."""
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: True)
monkeypatch.setattr(run_crew_module, "read_toml", lambda: {})
monkeypatch.setattr(
run_crew_module,
"configured_project_json_crew",
lambda pyproject_data=None, project_root=None: Path("crew.jsonc"),
)
called: dict = {}
def fake_run_json_crew_in_project_env(trained_agents_file=None):
def fake_run_json_crew_in_project_env(trained_agents_file=None, crew_path=None):
called["trained_agents_file"] = trained_agents_file
called["crew_path"] = crew_path
monkeypatch.setattr(
run_crew_module,
@@ -45,7 +56,10 @@ def test_run_crew_forwards_trained_agents_file_to_json_crews(monkeypatch):
run_crew_module.run_crew(trained_agents_file="some.pkl")
assert called == {"trained_agents_file": "some.pkl"}
assert called == {
"trained_agents_file": "some.pkl",
"crew_path": Path("crew.jsonc"),
}
def test_json_run_uses_project_env_when_pyproject_exists(monkeypatch, tmp_path: Path):
@@ -71,8 +85,10 @@ def test_json_run_uses_project_env_when_pyproject_exists(monkeypatch, tmp_path:
monkeypatch.setattr(run_crew_module.subprocess, "run", fake_subprocess_run)
crew_path = tmp_path / "crew.jsonc"
run_crew_module._run_json_crew_in_project_env(
trained_agents_file="trained.pkl"
trained_agents_file="trained.pkl",
crew_path=crew_path,
)
expected_env = {
@@ -81,6 +97,7 @@ def test_json_run_uses_project_env_when_pyproject_exists(monkeypatch, tmp_path:
Path(run_crew_module.__file__).resolve().parent
),
CREWAI_TRAINED_AGENTS_FILE_ENV: "trained.pkl",
run_crew_module._CREWAI_JSON_CREW_DEFINITION_ENV: str(crew_path),
}
if local_crewai_source_dir := run_crew_module._find_local_crewai_source_dir():
expected_env[run_crew_module._CREWAI_RUNNER_SOURCE_DIR_ENV] = str(
@@ -214,8 +231,9 @@ def test_json_run_without_pyproject_runs_in_process(monkeypatch, tmp_path: Path)
monkeypatch.chdir(tmp_path)
called: dict = {}
def fake_run_json_crew(trained_agents_file=None):
def fake_run_json_crew(trained_agents_file=None, crew_path=None):
called["trained_agents_file"] = trained_agents_file
called["crew_path"] = crew_path
return "result"
monkeypatch.setattr(run_crew_module, "_run_json_crew", fake_run_json_crew)
@@ -226,7 +244,7 @@ def test_json_run_without_pyproject_runs_in_process(monkeypatch, tmp_path: Path)
)
== "result"
)
assert called == {"trained_agents_file": "trained.pkl"}
assert called == {"trained_agents_file": "trained.pkl", "crew_path": None}
def test_json_project_env_run_failure_exits_nonzero(monkeypatch, tmp_path: Path):
@@ -435,7 +453,7 @@ def _patch_tui_run(monkeypatch, status: str):
crew = SimpleNamespace(name="Demo", tasks=[], agents=[])
monkeypatch.setattr(
run_crew_module, "find_crew_json_file", lambda: Path("crew.jsonc")
run_crew_module, "configured_project_json_crew", lambda: Path("crew.jsonc")
)
monkeypatch.setattr(
run_crew_module,
@@ -489,7 +507,9 @@ def test_run_json_crew_dmn_mode_bypasses_tui(monkeypatch, tmp_path: Path, capsys
kickoff_calls.append(inputs)
return "plain result"
monkeypatch.setattr(run_crew_module, "find_crew_json_file", lambda: crew_path)
monkeypatch.setattr(
run_crew_module, "configured_project_json_crew", lambda: crew_path
)
monkeypatch.setattr(
run_crew_module,
"_load_json_crew",
@@ -528,7 +548,9 @@ def test_run_json_crew_dmn_mode_exits_on_missing_inputs(
tasks=[],
)
monkeypatch.setattr(run_crew_module, "find_crew_json_file", lambda: crew_path)
monkeypatch.setattr(
run_crew_module, "configured_project_json_crew", lambda: crew_path
)
monkeypatch.setattr(
run_crew_module,
"_load_json_crew",
@@ -543,28 +565,47 @@ def test_run_json_crew_dmn_mode_exits_on_missing_inputs(
assert "Missing runtime inputs for CREWAI_DMN mode: topic" in captured.err
def test_has_json_crew_defers_to_declared_flow_type(monkeypatch, tmp_path: Path):
def test_configured_project_json_crew_defers_to_declared_flow_type(
monkeypatch, tmp_path: Path
):
"""A flow project containing a stray crew.jsonc must still run as a flow."""
monkeypatch.chdir(tmp_path)
(tmp_path / "crew.jsonc").write_text("{}")
(tmp_path / "pyproject.toml").write_text('[tool.crewai]\ntype = "flow"\n')
assert run_crew_module._has_json_crew() is False
assert run_crew_module.configured_project_json_crew() is None
def test_has_json_crew_true_for_declared_crew_type(monkeypatch, tmp_path: Path):
def test_configured_project_json_crew_returns_declared_crew_definition(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
crew_path = tmp_path / "crew.jsonc"
crew_path.write_text("{}")
(tmp_path / "pyproject.toml").write_text(
'[tool.crewai]\ntype = "crew"\ndefinition = "crew.jsonc"\n'
)
assert run_crew_module.configured_project_json_crew() == crew_path.resolve()
def test_configured_project_json_crew_ignores_declared_crew_without_definition(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "crew.jsonc").write_text("{}")
(tmp_path / "pyproject.toml").write_text('[tool.crewai]\ntype = "crew"\n')
assert run_crew_module._has_json_crew() is True
assert run_crew_module.configured_project_json_crew() is None
def test_has_json_crew_true_without_pyproject(monkeypatch, tmp_path: Path):
def test_configured_project_json_crew_ignores_missing_pyproject(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "crew.jsonc").write_text("{}")
assert run_crew_module._has_json_crew() is True
assert run_crew_module.configured_project_json_crew() is None
def test_run_crew_rejects_inputs_without_definition():
@@ -605,7 +646,6 @@ def test_run_crew_runs_explicit_declarative_definition(monkeypatch, capsys):
def test_run_crew_runs_classic_crew_project(monkeypatch, capsys):
calls = []
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False)
monkeypatch.setattr(
run_crew_module,
"read_toml",
@@ -631,7 +671,6 @@ def test_run_crew_runs_classic_crew_project(monkeypatch, capsys):
def test_run_crew_runs_python_flow_project(monkeypatch, capsys):
calls = []
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False)
monkeypatch.setattr(
run_crew_module,
"read_toml",
@@ -660,7 +699,6 @@ def test_run_crew_runs_conversational_flow_tui(monkeypatch, capsys):
flow = Flow()
calls = []
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False)
monkeypatch.setattr(
run_crew_module,
"read_toml",
@@ -689,7 +727,6 @@ def test_run_crew_runs_conversational_flow_tui(monkeypatch, capsys):
def test_run_crew_rejects_filename_for_flow_project(monkeypatch):
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False)
monkeypatch.setattr(
run_crew_module,
"read_toml",
@@ -710,7 +747,6 @@ def test_run_crew_runs_configured_declarative_flow_project(
monkeypatch.chdir(tmp_path)
definition_path = tmp_path / "flow.yaml"
definition_path.write_text("schema: crewai.flow/v1\n", encoding="utf-8")
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: False)
monkeypatch.setattr(
run_crew_module,
"read_toml",

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from functools import reduce
from pathlib import Path, PureWindowsPath
import sys
from typing import Any
@@ -16,7 +17,11 @@ if sys.version_info >= (3, 11):
console = Console()
def read_toml(file_path: str = "pyproject.toml") -> dict[str, Any]:
class ProjectDefinitionError(ValueError):
"""Invalid ``[tool.crewai].definition`` project configuration."""
def read_toml(file_path: str | Path = "pyproject.toml") -> dict[str, Any]:
"""Read a TOML file from disk and return its parsed contents."""
with open(file_path, "rb") as f:
return tomli.load(f)
@@ -29,6 +34,115 @@ def parse_toml(content: str) -> dict[str, Any]:
return tomli.loads(content)
def get_crewai_project_config(pyproject_data: dict[str, Any]) -> dict[str, Any]:
"""Return the normalized ``[tool.crewai]`` table from pyproject data."""
tool_config = pyproject_data.get("tool")
if not isinstance(tool_config, dict):
return {}
crewai_config = tool_config.get("crewai")
if not isinstance(crewai_config, dict):
return {}
return crewai_config
def get_crewai_project_type(pyproject_data: dict[str, Any]) -> str | None:
"""Return ``[tool.crewai].type`` when configured."""
project_type = get_crewai_project_config(pyproject_data).get("type")
return project_type if isinstance(project_type, str) else None
def configured_project_definition(
project_type: str,
*,
pyproject_data: dict[str, Any] | None = None,
project_root: Path | str | None = None,
) -> Path | None:
"""Return a configured CrewAI definition path for a project type.
``[tool.crewai].type`` must match ``project_type`` and ``definition`` must
be a non-empty project-local file path. Missing definitions return ``None``
so callers can fall back to legacy entrypoints for that project type.
"""
root = Path(project_root) if project_root is not None else Path.cwd()
if pyproject_data is None:
pyproject_data = read_toml(root / "pyproject.toml")
crewai_config = get_crewai_project_config(pyproject_data)
if crewai_config.get("type") != project_type:
return None
if "definition" not in crewai_config:
return None
raw_definition = crewai_config["definition"]
if not isinstance(raw_definition, str):
raise ProjectDefinitionError(
"[tool.crewai] definition must be a string project-local path; "
f"got {raw_definition!r}."
)
definition = raw_definition.strip()
if not definition:
raise ProjectDefinitionError(
"[tool.crewai] definition must be a non-empty project-local path."
)
return resolve_project_definition_path(definition=definition, project_root=root)
def resolve_project_definition_path(definition: str, project_root: Path | str) -> Path:
"""Resolve a ``[tool.crewai].definition`` path inside ``project_root``."""
root_path = Path(project_root)
definition_path = Path(definition)
windows_definition_path = PureWindowsPath(definition)
if definition.startswith("~"):
raise ProjectDefinitionError(
"[tool.crewai] definition must be a project-local path; "
f"got {definition!r}."
)
if definition_path.is_absolute() or windows_definition_path.is_absolute():
raise ProjectDefinitionError(
"[tool.crewai] definition must be relative to the project root; "
f"got {definition!r}."
)
try:
root = root_path.resolve(strict=True)
except OSError as exc:
raise ProjectDefinitionError(
f"Invalid project root for [tool.crewai] definition: {exc}"
) from exc
candidate = root / definition_path
try:
resolved_candidate = candidate.resolve(strict=False)
except OSError as exc:
raise ProjectDefinitionError(
f"Invalid [tool.crewai] definition path {definition!r}: {exc}"
) from exc
if not resolved_candidate.is_relative_to(root):
raise ProjectDefinitionError(
"[tool.crewai] definition must resolve inside the project root; "
f"got {definition!r}."
)
if not resolved_candidate.exists():
raise ProjectDefinitionError(
"[tool.crewai] definition must point to an existing file; "
f"got {definition!r}."
)
if not resolved_candidate.is_file():
raise ProjectDefinitionError(
"[tool.crewai] definition must point to a regular file; "
f"got {definition!r}."
)
return resolved_candidate
def _get_nested_value(data: dict[str, Any], keys: list[str]) -> Any:
return reduce(dict.__getitem__, keys, data)

View File

@@ -10,6 +10,7 @@ from crewai_core import (
lock_store,
paths,
printer,
project,
user_data,
version,
)
@@ -97,6 +98,83 @@ def test_unused_var_warning_silenced() -> None:
assert os.environ is not None
def test_configured_project_definition_resolves_project_local_file(
tmp_path: Path,
) -> None:
definition = tmp_path / "crew.jsonc"
definition.write_text("{}\n")
resolved = project.configured_project_definition(
"crew",
pyproject_data={
"tool": {
"crewai": {
"type": "crew",
"definition": " crew.jsonc ",
}
}
},
project_root=tmp_path,
)
assert resolved == definition.resolve()
def test_configured_project_definition_rejects_project_escape(tmp_path: Path) -> None:
outside = tmp_path.parent / f"{tmp_path.name}-outside-crew.jsonc"
outside.write_text("{}\n")
with pytest.raises(project.ProjectDefinitionError):
project.configured_project_definition(
"crew",
pyproject_data={
"tool": {
"crewai": {
"type": "crew",
"definition": "../outside-crew.jsonc",
}
}
},
project_root=tmp_path,
)
def test_configured_project_definition_rejects_non_string_definition(
tmp_path: Path,
) -> None:
with pytest.raises(project.ProjectDefinitionError, match="must be a string"):
project.configured_project_definition(
"crew",
pyproject_data={
"tool": {
"crewai": {
"type": "crew",
"definition": ["crew.jsonc"],
}
}
},
project_root=tmp_path,
)
def test_configured_project_definition_rejects_empty_definition(
tmp_path: Path,
) -> None:
with pytest.raises(project.ProjectDefinitionError, match="non-empty"):
project.configured_project_definition(
"crew",
pyproject_data={
"tool": {
"crewai": {
"type": "crew",
"definition": " ",
}
}
},
project_root=tmp_path,
)
def test_core_telemetry_skips_duplicate_tracer_provider(
monkeypatch: pytest.MonkeyPatch,
) -> None:

View File

@@ -9,7 +9,6 @@ layer that may have produced it and of the engine that runs it (see
from __future__ import annotations
import json
import logging
from pathlib import Path
import re
@@ -780,19 +779,6 @@ class FlowDefinition(BaseModel):
"""Serialize the definition to a declaration-ready dictionary."""
return self.model_dump(by_alias=True, exclude_none=exclude_none, mode="json")
def to_json(self, *, indent: int | None = 2, exclude_none: bool = True) -> str:
"""Serialize the definition to JSON."""
data = self.to_dict(exclude_none=exclude_none)
return json.dumps(data, indent=indent)
def to_yaml(self, *, exclude_none: bool = True) -> str:
"""Serialize the definition to YAML."""
return yaml.safe_dump(
self.to_dict(exclude_none=exclude_none),
sort_keys=False,
allow_unicode=True,
)
@property
def source_path(self) -> Path | None:
"""Original definition file path, when loaded from a file."""
@@ -805,17 +791,6 @@ class FlowDefinition(BaseModel):
return None
return self._source_path.parent
@classmethod
def from_dict(
cls, data: dict[str, Any], *, source_path: Path | None = None
) -> FlowDefinition:
"""Load a definition from a dictionary."""
definition = cls.model_validate(data)
if source_path is not None:
definition._source_path = source_path.expanduser().resolve()
log_flow_definition_issues(definition)
return definition
@classmethod
def from_declaration(
cls,
@@ -835,7 +810,7 @@ class FlowDefinition(BaseModel):
contents = source_path.expanduser().read_text(encoding="utf-8")
if isinstance(contents, dict):
return cls.from_dict(contents)
return cls._load_mapping(contents)
if not isinstance(contents, str):
raise TypeError("Flow declaration contents must be a string or dictionary")
@@ -848,12 +823,17 @@ class FlowDefinition(BaseModel):
loaded = yaml.safe_load(contents)
if not isinstance(loaded, dict):
raise ValueError("Flow declaration must contain a mapping")
return cls.from_dict(loaded, source_path=source_path)
return cls._load_mapping(loaded, source_path=source_path)
@classmethod
def json_schema(cls) -> dict[str, Any]:
"""Return the JSON Schema for the declarative Flow contract."""
return cls.model_json_schema(by_alias=True)
def _load_mapping(
cls, data: dict[str, Any], *, source_path: Path | None = None
) -> FlowDefinition:
definition = cls.model_validate(data)
if source_path is not None:
definition._source_path = source_path.expanduser().resolve()
log_flow_definition_issues(definition)
return definition
def _validate_step_name(name: str, *, field: str) -> None:

View File

@@ -480,11 +480,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
cls._flow_definition = flow_definition
return flow_definition
@classmethod
def from_definition(cls, definition: FlowDefinition, **kwargs: Any) -> Flow[Any]:
"""Build a runnable Flow directly from a definition; no subclass required."""
return cls.from_declaration(contents=definition, **kwargs)
@classmethod
def from_declaration(
cls,
@@ -604,7 +599,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
config: Checkpoint configuration with ``restore_from`` set to
the path of the checkpoint to load.
definition: The FlowDefinition to restore a definition-built flow
(one created via ``Flow.from_definition``) from; its actions
(one created via ``Flow.from_declaration``) from; its actions
are re-resolved since checkpoints carry no callables.
Subclasses carry their own definition and don't need this.
@@ -629,7 +624,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
entity._restore_from_checkpoint()
return entity
instance = (
cls.from_definition(definition) if definition is not None else cls()
cls.from_declaration(contents=definition)
if definition is not None
else cls()
)
instance.checkpoint_completed_methods = entity.checkpoint_completed_methods
instance.checkpoint_method_outputs = entity.checkpoint_method_outputs
@@ -1178,7 +1175,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
registered factory when present, else the built-in SQLite
fallback).
definition: The FlowDefinition to restore a definition-built flow
(one created via ``Flow.from_definition``) from. Subclasses
(one created via ``Flow.from_declaration``) from. Subclasses
carry their own definition and don't need this.
**kwargs: Additional keyword arguments passed to the Flow constructor
@@ -1212,7 +1209,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
state_data, pending_context = loaded
instance = (
cls.from_definition(definition, persistence=persistence, **kwargs)
cls.from_declaration(contents=definition, persistence=persistence, **kwargs)
if definition is not None
else cls(persistence=persistence, **kwargs)
)

View File

@@ -1,15 +1,16 @@
"""Memory reset utilities for CrewAI crews and flows."""
from pathlib import Path
import subprocess
from typing import Any
import click
from crewai_core.project import configured_project_definition, read_toml
from crewai.flow import Flow
from crewai.memory.unified_memory import Memory
from crewai.project.crew_loader import load_crew
from crewai.project.json_loader import find_crew_json_file
from crewai.utilities.project_utils import get_crews, get_flows, read_toml
from crewai.utilities.project_utils import get_crews, get_flows
def _reset_flow_memory(flow: Flow[Any]) -> None:
@@ -42,35 +43,20 @@ def _reset_flow_memory(flow: Flow[Any]) -> None:
click.echo(f"Memory reset skipped: {exc}", err=True)
def _current_project_declares_flow() -> bool:
try:
pyproject_data = read_toml()
except Exception:
return False
declared_type: str | None = (
pyproject_data.get("tool", {}).get("crewai", {}).get("type")
)
return declared_type == "flow"
def _configured_json_crew_path() -> Path | None:
if not Path("pyproject.toml").is_file():
return None
pyproject_data = read_toml()
return configured_project_definition("crew", pyproject_data=pyproject_data)
def _get_json_crew() -> Any | None:
"""Load a JSON-first crew from the current project, if present."""
if _current_project_declares_flow():
return None
crew_path = find_crew_json_file()
crew_path = _configured_json_crew_path()
if crew_path is None:
return None
try:
crew, _ = load_crew(crew_path)
except Exception as exc:
click.echo(
f"Skipping JSON crew at {crew_path}: failed to load ({exc}).",
err=True,
)
return None
crew, _ = load_crew(crew_path)
return crew
@@ -151,3 +137,4 @@ def reset_memories_command(
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
raise SystemExit(1) from e

View File

@@ -240,6 +240,9 @@ def test_reset_no_crew_or_flow_found(runner):
def test_reset_json_crew_memory(mock_crew, runner, monkeypatch, tmp_path):
monkeypatch.chdir(tmp_path)
(tmp_path / "crew.jsonc").write_text("{}")
(tmp_path / "pyproject.toml").write_text(
'[tool.crewai]\ntype = "crew"\ndefinition = "crew.jsonc"\n'
)
with mock.patch(
"crewai.utilities.reset_memories.get_crews", return_value=[]
@@ -251,16 +254,19 @@ def test_reset_json_crew_memory(mock_crew, runner, monkeypatch, tmp_path):
) as mock_load_crew:
result = runner.invoke(reset_memories, ["-m"])
mock_load_crew.assert_called_once_with(Path("crew.jsonc"))
mock_load_crew.assert_called_once_with((tmp_path / "crew.jsonc").resolve())
mock_crew.reset_memories.assert_called_once_with(command_type="memory")
assert f"[Crew ({mock_crew.name})] Memory has been reset." in result.output
def test_reset_invalid_json_crew_does_not_block_classic_crew(
def test_reset_invalid_json_crew_blocks_reset(
mock_crew, runner, monkeypatch, tmp_path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "crew.jsonc").write_text("{invalid")
(tmp_path / "pyproject.toml").write_text(
'[tool.crewai]\ntype = "crew"\ndefinition = "crew.jsonc"\n'
)
with mock.patch(
"crewai.utilities.reset_memories.get_crews", return_value=[mock_crew]
@@ -272,10 +278,10 @@ def test_reset_invalid_json_crew_does_not_block_classic_crew(
) as mock_load_crew:
result = runner.invoke(reset_memories, ["-m"])
mock_load_crew.assert_called_once_with(Path("crew.jsonc"))
mock_crew.reset_memories.assert_called_once_with(command_type="memory")
assert "Skipping JSON crew at crew.jsonc: failed to load (invalid JSON)." in result.output
assert f"[Crew ({mock_crew.name})] Memory has been reset." in result.output
mock_load_crew.assert_called_once_with((tmp_path / "crew.jsonc").resolve())
mock_crew.reset_memories.assert_not_called()
assert result.exit_code != 0
assert "An unexpected error occurred: invalid JSON" in result.output
def test_reset_json_crew_skipped_for_declared_flow_project(

View File

@@ -65,7 +65,7 @@ def test_flow_public_exports_are_explicit():
def test_flow_definition_json_schema_carries_reference_descriptions():
schema = flow_definition.FlowDefinition.json_schema()
schema = flow_definition.FlowDefinition.model_json_schema(by_alias=True)
defs = schema["$defs"]
assert schema["properties"]["schema"]["description"]
@@ -120,7 +120,7 @@ def test_flow_definition_json_schema_carries_reference_descriptions():
def test_flow_definition_json_schema_carries_field_examples_only():
schema = flow_definition.FlowDefinition.json_schema()
schema = flow_definition.FlowDefinition.model_json_schema(by_alias=True)
defs = schema["$defs"]
for model_name in [
@@ -437,7 +437,7 @@ def test_flow_definition_uses_collapsed_conversational_router_start():
assert methods["route_conversation"].router is True
def test_flow_definition_serializes_human_feedback_metadata(caplog):
def test_flow_definition_degrades_human_feedback_metadata(caplog):
caplog.set_level(logging.WARNING, logger="crewai.flow.dsl._utils")
marker = object()
@@ -461,7 +461,7 @@ def test_flow_definition_serializes_human_feedback_metadata(caplog):
and "not fully serializable" in record.message
for record in caplog.records
)
definition.to_json()
definition.to_dict()
def test_flow_definition_fragments_cover_start_listen_and_condition_sugar():
@@ -613,7 +613,7 @@ def test_flow_definition_merges_stacked_listen_router():
assert methods["second_router"].emit == ["second_approval", "not_approved"]
def test_flow_definition_round_trips_declaration_serialization():
def test_flow_definition_from_declaration_accepts_json_and_yaml_strings():
class RoundTripFlow(Flow):
@start()
def begin(self):
@@ -627,17 +627,67 @@ def test_flow_definition_round_trips_declaration_serialization():
def left(self):
return "left"
definition = RoundTripFlow.flow_definition()
round_trips = [
flow_definition.FlowDefinition.from_declaration(contents=definition.to_json()),
flow_definition.FlowDefinition.from_declaration(contents=definition.to_yaml()),
expected = RoundTripFlow.flow_definition()
declarations = [
"""
{
"schema": "crewai.flow/v1",
"name": "RoundTripFlow",
"methods": {
"begin": {
"start": true,
"do": {
"call": "code",
"ref": "test_flow_definition:RoundTripFlow.begin"
}
},
"decide": {
"listen": "begin",
"router": true,
"do": {
"call": "code",
"ref": "test_flow_definition:RoundTripFlow.decide"
}
},
"left": {
"listen": "left",
"do": {
"call": "code",
"ref": "test_flow_definition:RoundTripFlow.left"
}
}
}
}
""",
"""
schema: crewai.flow/v1
name: RoundTripFlow
methods:
begin:
start: true
do:
call: code
ref: test_flow_definition:RoundTripFlow.begin
decide:
listen: begin
router: true
do:
call: code
ref: test_flow_definition:RoundTripFlow.decide
left:
listen: left
do:
call: code
ref: test_flow_definition:RoundTripFlow.left
""",
]
for round_trip in round_trips:
assert round_trip.to_dict() == definition.to_dict()
assert round_trip.methods["decide"].router is True
assert round_trip.methods["decide"].listen == "begin"
for declaration in declarations:
loaded = flow_definition.FlowDefinition.from_declaration(contents=declaration)
assert loaded.name == expected.name
assert loaded.methods["decide"].router is True
assert loaded.methods["decide"].listen == "begin"
def test_flow_definition_from_declaration_accepts_contents():
@@ -654,20 +704,41 @@ def test_flow_definition_from_declaration_accepts_contents():
},
},
}
definition = flow_definition.FlowDefinition.from_dict(data)
definition = flow_definition.FlowDefinition.from_declaration(contents=data)
contents = [
definition,
data,
definition.to_json(),
definition.to_yaml(),
"""
{
"schema": "crewai.flow/v1",
"name": "DeclarationFlow",
"methods": {
"begin": {
"start": true,
"do": {
"call": "expression",
"expr": "'started'"
}
}
}
}
""",
"""
schema: crewai.flow/v1
name: DeclarationFlow
methods:
begin:
start: true
do:
call: expression
expr: "'started'"
""",
]
expected = definition.to_dict()
for content in contents:
loaded = flow_definition.FlowDefinition.from_declaration(contents=content)
assert loaded.to_dict() == expected
assert loaded.to_dict() == definition.to_dict()
def test_flow_definition_from_declaration_rejects_empty_file(tmp_path: Path):
declaration_path = tmp_path / "flow.crewai"
@@ -686,7 +757,7 @@ def test_flow_definition_from_declaration_rejects_falsey_non_mapping_contents(
def test_flow_definition_from_declaration_accepts_paths(tmp_path: Path):
definition = flow_definition.FlowDefinition.from_dict(
definition = flow_definition.FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "DeclarationFlow",
@@ -702,7 +773,19 @@ def test_flow_definition_from_declaration_accepts_paths(tmp_path: Path):
}
)
declaration_path = tmp_path / "flow.crewai"
declaration_path.write_text(definition.to_yaml(), encoding="utf-8")
declaration_path.write_text(
"""
schema: crewai.flow/v1
name: DeclarationFlow
methods:
begin:
start: true
do:
call: expression
expr: "'started'"
""",
encoding="utf-8",
)
path_inputs = [
declaration_path,
str(declaration_path),
@@ -711,7 +794,9 @@ def test_flow_definition_from_declaration_accepts_paths(tmp_path: Path):
for path_input in path_inputs:
loaded = flow_definition.FlowDefinition.from_declaration(path=path_input)
assert loaded.to_dict() == definition.to_dict()
assert loaded.name == definition.name
assert loaded.methods["begin"].is_start is True
assert loaded.methods["begin"].do.call == "expression"
assert loaded.source_path == declaration_path.resolve()
@@ -744,8 +829,8 @@ def test_flow_definition_from_declaration_prefers_contents_over_path(
assert loaded.source_path is None
def test_each_action_round_trips_declaration_serialization():
definition = flow_definition.FlowDefinition.from_dict(
def test_each_action_loads_from_declaration():
definition = flow_definition.FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "EachFlow",
@@ -783,22 +868,13 @@ def test_each_action_round_trips_declaration_serialization():
}
)
round_trips = [
flow_definition.FlowDefinition.from_declaration(contents=definition.to_json()),
flow_definition.FlowDefinition.from_declaration(contents=definition.to_yaml()),
]
for round_trip in round_trips:
assert round_trip.to_dict() == definition.to_dict()
assert round_trip.methods["process_rows"].description == (
"Process every loaded row."
)
assert round_trip.methods["process_rows"].do.call == "each"
assert definition.methods["process_rows"].description == "Process every loaded row."
assert definition.methods["process_rows"].do.call == "each"
def test_flow_definition_rejects_invalid_method_names():
with pytest.raises(ValueError, match="Flow method names must match"):
flow_definition.FlowDefinition.from_dict(
flow_definition.FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "InvalidMethodNameFlow",
@@ -1009,7 +1085,7 @@ def test_flow_definition_accepts_explicit_router_events():
def test_flow_definition_ignores_legacy_diagnostics_loaded_from_contract():
definition = flow_definition.FlowDefinition.from_dict(
definition = flow_definition.FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "LoadedDiagnosticsFlow",
@@ -1042,7 +1118,7 @@ def test_flow_definition_ignores_legacy_diagnostics_loaded_from_contract():
def test_router_start_false_without_listen_is_allowed(caplog):
caplog.set_level(logging.ERROR, logger="crewai.flow.flow_definition")
flow_definition.FlowDefinition.from_dict(
flow_definition.FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "LoadedFlow",
@@ -1118,7 +1194,7 @@ def test_dynamic_router_string_listener_is_valid_contract():
def test_static_string_listener_is_allowed_by_contract():
definition = flow_definition.FlowDefinition.from_dict(
definition = flow_definition.FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "TypoFlow",
@@ -1138,7 +1214,7 @@ def test_static_string_listener_is_allowed_by_contract():
def test_start_false_not_classified_as_start_method():
definition = flow_definition.FlowDefinition.from_dict(
definition = flow_definition.FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ExplicitNonStartFlow",
@@ -1202,7 +1278,7 @@ def test_flow_definition_cache_is_not_reused_by_subclasses():
def test_flow_definition_allows_router_without_trigger(caplog):
caplog.set_level(logging.WARNING, logger="crewai.flow.flow_definition")
flow_definition.FlowDefinition.from_dict(
flow_definition.FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "LoadedFlow",

View File

@@ -477,7 +477,7 @@ def assert_parity(flow_cls, yaml_str, inputs=None, ordered=True):
class_result, class_events = _run_with_events(class_flow, inputs)
definition = FlowDefinition.from_declaration(contents=yaml_str)
definition_flow = Flow.from_definition(definition)
definition_flow = Flow.from_declaration(contents=definition)
definition_result, definition_events = _run_with_events(definition_flow, inputs)
assert definition_result == class_result
@@ -537,7 +537,7 @@ def test_cyclic_flow_parity():
def test_definition_flow_events_use_definition_name():
definition = FlowDefinition.from_declaration(contents=CHAIN_YAML)
flow = Flow.from_definition(definition)
flow = Flow.from_declaration(contents=definition)
_, events = _run_with_events(flow)
assert events
assert all(flow_name == "ChainFlow" for _, _, flow_name in events)
@@ -545,7 +545,7 @@ def test_definition_flow_events_use_definition_name():
def test_definition_method_without_action_is_invalid():
with pytest.raises(ValidationError, match="do"):
FlowDefinition.from_dict(
FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "NoActions",
@@ -554,8 +554,8 @@ def test_definition_method_without_action_is_invalid():
)
def test_from_definition_unresolvable_ref_raises():
definition = FlowDefinition.from_dict(
def test_from_declaration_unresolvable_ref_raises():
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "BadRefs",
@@ -569,11 +569,11 @@ def test_from_definition_unresolvable_ref_raises():
)
with pytest.raises(ValueError, match="unresolvable actions.*begin"):
Flow.from_definition(definition)
Flow.from_declaration(contents=definition)
def test_from_definition_malformed_ref_raises():
definition = FlowDefinition.from_dict(
def test_from_declaration_malformed_ref_raises():
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "MalformedRefs",
@@ -582,11 +582,11 @@ def test_from_definition_malformed_ref_raises():
)
with pytest.raises(ValueError, match="expected 'module:qualname'"):
Flow.from_definition(definition)
Flow.from_declaration(contents=definition)
def test_from_definition_local_scope_ref_raises():
definition = FlowDefinition.from_dict(
def test_from_declaration_local_scope_ref_raises():
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "LocalRefs",
@@ -600,7 +600,7 @@ def test_from_definition_local_scope_ref_raises():
)
with pytest.raises(ValueError, match="expected 'module:qualname'"):
Flow.from_definition(definition)
Flow.from_declaration(contents=definition)
def test_flow_definition_stamps_refs():
@@ -610,7 +610,7 @@ def test_flow_definition_stamps_refs():
assert definition.methods["shout"].do.ref == f"{__name__}:ChainFlow.shout"
def test_from_definition_runs_tool_action_with_static_inputs():
def test_from_declaration_runs_tool_action_with_static_inputs():
yaml_str = f"""
schema: crewai.flow/v1
name: ToolFlow
@@ -625,13 +625,13 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff() == "found:ai agents"
def test_tool_action_round_trips_with_inputs():
definition = FlowDefinition.from_dict(
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
@@ -648,12 +648,12 @@ def test_tool_action_round_trips_with_inputs():
}
)
assert definition.to_dict()["methods"]["search"]["do"] == {
"call": "tool",
"ref": f"{__name__}:StaticSearchTool",
"with": {"search_query": "ai agents"},
}
assert Flow.from_definition(definition).kickoff() == "search:ai agents"
action = definition.methods["search"].do
assert action.call == "tool"
assert action.ref == f"{__name__}:StaticSearchTool"
assert action.with_ == {"search_query": "ai agents"}
assert Flow.from_declaration(contents=definition).kickoff() == "search:ai agents"
def test_tool_action_renders_cel_inputs_at_runtime():
@@ -676,13 +676,13 @@ methods:
listen: begin
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"topic": "ai"}) == "found:ai agents"
def test_tool_action_treats_embedded_cel_marker_as_literal():
definition = FlowDefinition.from_dict(
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
@@ -702,11 +702,11 @@ def test_tool_action_treats_embedded_cel_marker_as_literal():
}
)
assert Flow.from_definition(definition).kickoff() == "p}x:wrapped ${'a}b'} value"
assert Flow.from_declaration(contents=definition).kickoff() == "p}x:wrapped ${'a}b'} value"
def test_tool_action_treats_marker_with_trailing_text_as_literal():
definition = FlowDefinition.from_dict(
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
@@ -726,12 +726,12 @@ def test_tool_action_treats_marker_with_trailing_text_as_literal():
}
)
assert Flow.from_definition(definition).kickoff() == "p:${state.topic} extra"
assert Flow.from_declaration(contents=definition).kickoff() == "p:${state.topic} extra"
def test_tool_action_rejects_adjacent_markers_as_invalid_cel():
with pytest.raises(ValidationError, match="invalid CEL expression"):
FlowDefinition.from_dict(
FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
@@ -753,7 +753,7 @@ def test_tool_action_rejects_adjacent_markers_as_invalid_cel():
def test_tool_action_accepts_braces_in_full_cel_marker():
definition = FlowDefinition.from_dict(
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
@@ -773,7 +773,7 @@ def test_tool_action_accepts_braces_in_full_cel_marker():
}
)
assert Flow.from_definition(definition).kickoff() == "p}x:ai agents"
assert Flow.from_declaration(contents=definition).kickoff() == "p}x:ai agents"
def test_tool_action_renders_latest_output_by_method_name():
@@ -795,7 +795,7 @@ methods:
listen: begin
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff() == "search:hello agents"
@@ -820,7 +820,7 @@ methods:
listen: build_query
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff() == "found:ai agents news"
@@ -840,7 +840,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert (
flow.kickoff(inputs={"limit": 2, "domains": ["crewai.com", "example.com"]})
@@ -873,7 +873,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"question": "What is CrewAI?"}) == {
"agent": "Analyst",
@@ -911,7 +911,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"questions": ["one", "two"]}) == [
"Analyst:one",
@@ -920,7 +920,7 @@ methods:
def test_agent_action_round_trips_with_inline_definition():
definition = FlowDefinition.from_dict(
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "AgentFlow",
@@ -942,17 +942,16 @@ def test_agent_action_round_trips_with_inline_definition():
}
)
round_trip = FlowDefinition.from_declaration(contents=definition.to_yaml())
action = round_trip.to_dict()["methods"]["answer"]["do"]
action = definition.methods["answer"].do
assert action["call"] == "agent"
assert action["with"]["role"] == "Analyst"
assert action["with"]["input"] == "${state.question}"
assert action["with"]["settings"] == {"verbose": True}
assert action.call == "agent"
assert action.with_.role == "Analyst"
assert action.with_.input == "${state.question}"
assert action.with_.settings == {"verbose": True}
def test_agent_action_json_schema_describes_inline_agent_definitions():
schema_defs = FlowDefinition.json_schema()["$defs"]
schema_defs = FlowDefinition.model_json_schema(by_alias=True)["$defs"]
assert set(schema_defs["AgentDefinition"]["properties"]) >= {
"role",
@@ -966,7 +965,7 @@ def test_agent_action_json_schema_describes_inline_agent_definitions():
def test_agent_action_rejects_non_string_input_in_definition():
with pytest.raises(ValidationError, match="agent.input must be a string"):
FlowDefinition.from_dict(
FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "AgentFlow",
@@ -1047,7 +1046,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"topic": "AI"}) == {
"crew": "inline_research",
@@ -1123,7 +1122,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"topic": "AI"}) == {
"crew": "referenced_research",
@@ -1197,7 +1196,7 @@ methods:
other_cwd.mkdir()
monkeypatch.chdir(other_cwd)
flow = Flow.from_definition(FlowDefinition.from_declaration(path=flow_path))
flow = Flow.from_declaration(path=flow_path)
assert flow.kickoff(inputs={"topic": "AI"}) == {
"crew": "relative_research",
@@ -1222,7 +1221,7 @@ methods:
"""
flow_path.write_text(yaml_str, encoding="utf-8")
flow = Flow.from_definition(FlowDefinition.from_declaration(path=flow_path))
flow = Flow.from_declaration(path=flow_path)
with pytest.raises(
ValueError,
@@ -1232,7 +1231,7 @@ methods:
def test_crew_action_round_trips_with_inline_definition():
definition = FlowDefinition.from_dict(
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "CrewFlow",
@@ -1266,20 +1265,16 @@ def test_crew_action_round_trips_with_inline_definition():
}
)
assert definition.to_dict()["methods"]["research"]["do"]["call"] == "crew"
assert (
definition.to_dict()["methods"]["research"]["do"]["with"]["agents"][
"researcher"
]["role"]
== "Researcher"
)
assert definition.to_dict()["methods"]["research"]["do"]["inputs"] == {
"topic": "${state.topic}"
}
action = definition.methods["research"].do
assert action.call == "crew"
assert action.with_ is not None
assert action.with_.agents["researcher"].role == "Researcher"
assert action.inputs == {"topic": "${state.topic}"}
def test_crew_action_normalizes_named_agent_list_definition():
definition = FlowDefinition.from_dict(
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "CrewFlow",
@@ -1311,16 +1306,15 @@ def test_crew_action_normalizes_named_agent_list_definition():
}
)
assert (
definition.to_dict()["methods"]["research"]["do"]["with"]["agents"][
"researcher"
]["role"]
== "Researcher"
)
action = definition.methods["research"].do
assert action.call == "crew"
assert action.with_ is not None
assert action.with_.agents["researcher"].role == "Researcher"
def test_crew_action_json_schema_describes_inline_crew_definitions():
schema_defs = FlowDefinition.json_schema()["$defs"]
schema_defs = FlowDefinition.model_json_schema(by_alias=True)["$defs"]
agents_schema = schema_defs["CrewDefinition"]["properties"]["agents"]
assert set(schema_defs["CrewDefinition"]["properties"]) >= {
@@ -1345,7 +1339,7 @@ def test_crew_action_json_schema_describes_inline_crew_definitions():
def test_crew_action_rejects_incomplete_inline_agent_definition():
with pytest.raises(ValidationError, match="goal"):
FlowDefinition.from_dict(
FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "CrewFlow",
@@ -1378,7 +1372,7 @@ def test_crew_action_rejects_incomplete_inline_agent_definition():
def test_crew_action_rejects_python_ref_field():
with pytest.raises(ValidationError, match="ref"):
FlowDefinition.from_dict(
FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "CrewFlow",
@@ -1397,7 +1391,7 @@ def test_crew_action_rejects_python_ref_field():
def test_crew_action_rejects_non_mapping_inputs_in_definition():
with pytest.raises(ValidationError, match="crew.inputs must be a mapping"):
FlowDefinition.from_dict(
FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "CrewFlow",
@@ -1463,7 +1457,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"name": "hello"}) == "hello!"
@@ -1482,7 +1476,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"value": "ok"}) == "callable:ok"
@@ -1506,7 +1500,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [
"normalized:a",
@@ -1533,7 +1527,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
caller_thread_id = threading.get_ident()
assert flow.kickoff(inputs={"rows": ["a"]}) == ["process_rows:a"]
@@ -1560,7 +1554,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"]
@@ -1582,7 +1576,7 @@ methods:
FlowScriptExecutionDisabledError,
match="CREWAI_ALLOW_FLOW_SCRIPT_EXECUTION=1",
) as exc_info:
Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
Flow.from_declaration(contents=yaml_str)
assert "methods with unresolvable actions" not in str(exc_info.value)
@@ -1606,7 +1600,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"raw_score": 3.2}) == "rounded:4"
assert flow.state["rounded"] == 4
@@ -1635,7 +1629,7 @@ methods:
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff() == "alpha:alpha"
assert flow.state["input_matches_output"] is True
@@ -1673,7 +1667,7 @@ methods:
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"rows": [" a ", " b "]}) == ["global:a", "global:b"]
@@ -1705,7 +1699,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [
{"row": "a", "normalized": "saved:a"},
@@ -1734,7 +1728,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["a", "b"]
assert flow._method_outputs == [
@@ -1772,7 +1766,7 @@ methods:
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [
"local:a",
@@ -1811,7 +1805,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(
inputs={
@@ -1845,7 +1839,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"rows": [{"kind": "keep", "value": "a"}]}) == ["a"]
@@ -1872,7 +1866,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(
inputs={
@@ -1902,7 +1896,7 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
with pytest.raises(ValueError, match="if expression must evaluate to a boolean"):
flow.kickoff(inputs={"rows": [{"value": "truthy"}]})
@@ -1932,7 +1926,7 @@ methods:
listen: process_rows
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
events = []
with crewai_event_bus.scoped_handlers():
@@ -1958,7 +1952,7 @@ methods:
],
)
def test_each_action_rejects_non_list_inputs(expr, inputs):
definition = FlowDefinition.from_dict(
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "EachFlow",
@@ -1979,7 +1973,7 @@ def test_each_action_rejects_non_list_inputs(expr, inputs):
},
}
)
flow = Flow.from_definition(definition)
flow = Flow.from_declaration(contents=definition)
with pytest.raises(ValueError, match="each.in must evaluate to an array"):
flow.kickoff(inputs=inputs)
@@ -2009,7 +2003,7 @@ def test_each_action_rejects_non_list_inputs(expr, inputs):
)
def test_each_action_validates_step_shape(action_do):
with pytest.raises(ValidationError):
FlowDefinition.from_dict(
FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "EachFlow",
@@ -2029,7 +2023,7 @@ def test_each_action_validates_step_shape(action_do):
def test_if_clauses_are_rejected_at_method_level():
with pytest.raises(ValidationError):
FlowDefinition.from_dict(
FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "TopLevelIfFlow",
@@ -2049,7 +2043,7 @@ def test_if_clauses_are_rejected_at_method_level():
def test_each_action_rejects_nested_each_actions():
with pytest.raises(ValidationError):
FlowDefinition.from_dict(
FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "EachFlow",
@@ -2103,14 +2097,14 @@ methods:
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
with pytest.raises(RuntimeError, match="bad row"):
flow.kickoff(inputs={"rows": ["ok", "bad"]})
def test_expression_action_round_trips():
definition = FlowDefinition.from_dict(
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ExpressionFlow",
@@ -2126,15 +2120,15 @@ def test_expression_action_round_trips():
}
)
assert definition.to_dict()["methods"]["classify"]["do"] == {
"call": "expression",
"expr": "state.score >= 80 ? 'qualified' : 'nurture'",
}
assert Flow.from_definition(definition).kickoff(inputs={"score": 90}) == "qualified"
action = definition.methods["classify"].do
assert action.call == "expression"
assert action.expr == "state.score >= 80 ? 'qualified' : 'nurture'"
assert Flow.from_declaration(contents=definition).kickoff(inputs={"score": 90}) == "qualified"
def test_explicit_cel_fields_accept_expression_markers():
definition = FlowDefinition.from_dict(
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ExpressionFlow",
@@ -2150,7 +2144,7 @@ def test_explicit_cel_fields_accept_expression_markers():
}
)
assert Flow.from_definition(definition).kickoff(inputs={"score": 90}) == "qualified"
assert Flow.from_declaration(contents=definition).kickoff(inputs={"score": 90}) == "qualified"
def test_expression_local_context_recurses_into_dataclass_values():
@@ -2226,10 +2220,10 @@ methods:
definition = FlowDefinition.from_declaration(contents=yaml_str)
assert Flow.from_definition(definition).kickoff(
assert Flow.from_declaration(contents=definition).kickoff(
inputs={"direction": "left"}
) == "took-left"
assert Flow.from_definition(definition).kickoff(
assert Flow.from_declaration(contents=definition).kickoff(
inputs={"direction": "right"}
) == "took-right"
@@ -2267,7 +2261,7 @@ methods:
def test_tool_action_requires_module_qualname_ref():
definition = FlowDefinition.from_dict(
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
@@ -2285,7 +2279,7 @@ def test_tool_action_requires_module_qualname_ref():
)
with pytest.raises(ValueError, match="expected 'module:qualname'"):
Flow.from_definition(definition)
Flow.from_declaration(contents=definition)
def test_pydantic_state_from_ref_parity():
@@ -2297,7 +2291,7 @@ def test_pydantic_state_from_ref_parity():
def test_pydantic_state_default_overlay():
flow = Flow.from_definition(
flow = Flow.from_declaration(contents=
FlowDefinition.from_declaration(contents=PYDANTIC_STATE_OVERLAY_YAML)
)
result = flow.kickoff()
@@ -2306,7 +2300,7 @@ def test_pydantic_state_default_overlay():
def test_json_schema_state():
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=JSON_SCHEMA_STATE_YAML))
flow = Flow.from_declaration(contents=JSON_SCHEMA_STATE_YAML)
result = flow.kickoff()
assert result == "count=1"
assert flow.state.count == 1
@@ -2315,13 +2309,13 @@ def test_json_schema_state():
def test_json_schema_state_validates_inputs():
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=JSON_SCHEMA_STATE_YAML))
flow = Flow.from_declaration(contents=JSON_SCHEMA_STATE_YAML)
with pytest.raises(ValueError, match="Invalid inputs"):
flow.kickoff(inputs={"count": "not-a-number"})
def test_json_schema_state_required_fields_can_come_from_kickoff_inputs():
flow = Flow.from_definition(
flow = Flow.from_declaration(contents=
FlowDefinition.from_declaration(contents=JSON_SCHEMA_REQUIRED_INPUT_STATE_YAML)
)
@@ -2333,7 +2327,7 @@ def test_json_schema_state_required_fields_can_come_from_kickoff_inputs():
def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable():
flow = Flow.from_definition(
flow = Flow.from_declaration(contents=
FlowDefinition.from_declaration(contents=PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML)
)
result = flow.kickoff()
@@ -2343,7 +2337,7 @@ def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable():
def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog):
with caplog.at_level("ERROR"):
flow = Flow.from_definition(
flow = Flow.from_declaration(contents=
FlowDefinition.from_declaration(contents=UNRESOLVABLE_STATE_YAML)
)
assert "falling back to dict state" in caplog.text
@@ -2357,13 +2351,13 @@ def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog):
def test_dict_state_is_a_copy_of_default_plus_id():
definition = FlowDefinition.from_declaration(contents=DICT_STATE_YAML)
flow = Flow.from_definition(definition)
flow = Flow.from_declaration(contents=definition)
assert flow.state["count"] == 5
assert flow.state["id"]
flow.kickoff()
assert flow.state["begin_ran"] is True
second = Flow.from_definition(definition)
second = Flow.from_declaration(contents=definition)
assert second.state["count"] == 5
assert "begin_ran" not in second.state
assert second.state["id"] != flow.state["id"]
@@ -2372,7 +2366,7 @@ def test_dict_state_is_a_copy_of_default_plus_id():
def test_unknown_state_type_falls_back_to_dict(caplog):
with caplog.at_level("WARNING"):
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=UNKNOWN_STATE_YAML))
flow = Flow.from_declaration(contents=UNKNOWN_STATE_YAML)
assert "falling back to dict state" in caplog.text
result = flow.kickoff()
@@ -2445,7 +2439,7 @@ def _run_capturing_flow_lifecycle(yaml_str, event_types):
def capture(source, event):
events.append(event)
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
result = flow.kickoff()
return flow, result, events
@@ -2483,13 +2477,13 @@ def test_config_suppress_flow_events_from_declaration():
def test_config_max_method_calls_from_declaration():
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=CAPPED_LOOP_YAML))
flow = Flow.from_declaration(contents=CAPPED_LOOP_YAML)
with pytest.raises(RecursionError, match="has been called 2 times"):
flow.kickoff()
def test_config_stream_from_declaration():
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=STREAMING_CHAIN_YAML))
flow = Flow.from_declaration(contents=STREAMING_CHAIN_YAML)
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
for _ in streaming:
@@ -2521,24 +2515,24 @@ config:
location: {tmp_path}
"""
)
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
assert isinstance(flow.checkpoint, CheckpointConfig)
assert flow.checkpoint.location == str(tmp_path)
def test_config_input_provider_from_declaration():
flow = Flow.from_definition(
flow = Flow.from_declaration(contents=
FlowDefinition.from_declaration(contents=INPUT_PROVIDER_CHAIN_YAML)
)
assert isinstance(flow.input_provider, StubInputProvider)
def test_round_trip_config_equivalence():
def test_definition_config_equivalence():
class_flow = ConfiguredFlow()
definition = FlowDefinition.from_declaration(
contents=ConfiguredFlow.flow_definition().to_yaml()
contents=ConfiguredFlow.flow_definition()
)
definition_flow = Flow.from_definition(definition)
definition_flow = Flow.from_declaration(contents=definition)
assert definition.config.suppress_flow_events is True
assert definition.config.max_method_calls == 5
@@ -2555,7 +2549,7 @@ def test_round_trip_config_equivalence():
def test_unknown_schema_rejected():
with pytest.raises(ValidationError, match="schema"):
FlowDefinition.from_dict(
FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v2",
"name": "FutureSchema",
@@ -2709,7 +2703,7 @@ class MethodPersistedFlow(Flow):
def test_flow_level_persist_from_declaration_saves_once_per_method():
yaml_str = _flow_level_persist_yaml("yaml-flow-level")
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
result = flow.kickoff()
assert result == "two"
@@ -2721,7 +2715,7 @@ def test_flow_level_persist_from_declaration_saves_once_per_method():
def test_method_level_persist_from_declaration_saves_only_that_method():
yaml_str = _method_level_persist_yaml("yaml-method-level")
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
flow.kickoff()
assert _saved_methods("yaml-method-level") == ["first"]
@@ -2750,7 +2744,7 @@ methods:
persist:
enabled: false
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
flow.kickoff()
assert _saved_methods("yaml-opt-out") == ["first"]
@@ -2759,11 +2753,11 @@ methods:
def test_persist_restore_by_id_from_declaration():
yaml_str = _flow_level_persist_yaml("yaml-restore")
flow1 = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow1 = Flow.from_declaration(contents=yaml_str)
flow1.kickoff()
assert flow1.state["count"] == 2
flow2 = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow2 = Flow.from_declaration(contents=yaml_str)
flow2.kickoff(inputs={"id": flow1.state["id"]})
assert flow2.state["count"] == 4
@@ -2782,13 +2776,13 @@ def test_method_level_persist_decorator_saves_only_that_method():
assert _saved_methods("method-decorator")[before:] == ["first"]
def test_round_trip_persist_equivalence():
def test_definition_persist_equivalence():
definition = FlowDefinition.from_declaration(
contents=ClassPersistedFlow.flow_definition().to_yaml()
contents=ClassPersistedFlow.flow_definition()
)
before = len(DefinitionStoreBackend.saves["class-decorator"])
flow = Flow.from_definition(definition)
flow = Flow.from_declaration(contents=definition)
flow.kickoff()
assert _saved_methods("class-decorator")[before:] == ["first", "second"]
@@ -2818,7 +2812,7 @@ methods:
persistence_type: DefinitionStoreBackend
store: yaml-mixed-method
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
flow.kickoff()
assert _saved_methods("yaml-mixed-flow") == ["first"]
@@ -2967,7 +2961,7 @@ methods:
def test_human_feedback_from_declaration_default_outcome_routes():
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=REVIEW_YAML))
flow = Flow.from_declaration(contents=REVIEW_YAML)
with patch.object(flow, "_request_human_feedback", return_value="") as request:
result = flow.kickoff()
@@ -2979,7 +2973,7 @@ def test_human_feedback_from_declaration_default_outcome_routes():
def test_human_feedback_from_declaration_collapses_and_routes():
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=REVIEW_YAML))
flow = Flow.from_declaration(contents=REVIEW_YAML)
with (
patch.object(flow, "_request_human_feedback", return_value="ship it"),
@@ -2991,13 +2985,13 @@ def test_human_feedback_from_declaration_collapses_and_routes():
assert [r.outcome for r in flow.human_feedback_history] == ["approved"]
def test_round_trip_human_feedback_equivalence():
def test_definition_human_feedback_equivalence():
class_flow = ReviewFlow()
with patch.object(class_flow, "_request_human_feedback", return_value=""):
class_result = class_flow.kickoff()
definition = FlowDefinition.from_declaration(contents=ReviewFlow.flow_definition().to_yaml())
twin = Flow.from_definition(definition)
definition = FlowDefinition.from_declaration(contents=ReviewFlow.flow_definition())
twin = Flow.from_declaration(contents=definition)
with patch.object(twin, "_request_human_feedback", return_value=""):
twin_result = twin.kickoff()
@@ -3012,7 +3006,7 @@ def test_round_trip_human_feedback_equivalence():
def test_human_feedback_pending_and_resume_from_declaration():
definition = FlowDefinition.from_declaration(contents=PENDING_REVIEW_YAML)
flow = Flow.from_definition(definition)
flow = Flow.from_declaration(contents=definition)
pending = flow.kickoff()
assert isinstance(pending, HumanFeedbackPending)
@@ -3057,7 +3051,7 @@ methods:
return "from-config"
provider = RecordingProvider()
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
previous = flow_config.hitl_provider
flow_config.hitl_provider = provider
@@ -3160,7 +3154,7 @@ methods:
message: "Review:"
provider: {__name__}:_NeedsArgsProvider
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
with pytest.raises(
ValueError, match="cannot instantiate human_feedback.provider ref"
@@ -3181,7 +3175,7 @@ methods:
message: "Review:"
provider: missing_module_xyz:Provider
"""
flow = Flow.from_definition(FlowDefinition.from_declaration(contents=yaml_str))
flow = Flow.from_declaration(contents=yaml_str)
with pytest.raises(
ValueError, match="unresolvable human_feedback.provider ref"
@@ -3194,7 +3188,7 @@ def _checkpoint_chain_flow(tmp_path):
from crewai.state.runtime import RuntimeState
definition = FlowDefinition.from_declaration(contents=CHAIN_YAML)
flow = Flow.from_definition(definition)
flow = Flow.from_declaration(contents=definition)
result = flow.kickoff()
assert result == "confirmed:True"

View File

@@ -80,7 +80,7 @@ class ComplexFlow(Flow):
def _attach_flow_definition(
flow_class: type[Flow], methods: dict[str, dict[str, object]]
) -> None:
flow_class._flow_definition = FlowDefinition.from_dict(
flow_class._flow_definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": flow_class.__name__,
@@ -130,7 +130,7 @@ def test_build_flow_structure_from_flow_class():
def test_build_flow_structure_from_flow_definition():
"""Test building visualization directly from a FlowDefinition."""
definition = FlowDefinition.from_dict(
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "DefinedFlow",
@@ -374,7 +374,7 @@ def test_topological_path_counting():
assert len(structure["edges"]) > 0
def test_class_metadata_comes_from_definition():
def test_class_metadata_comes_from_declaration():
"""Test that nodes include only definition-derived class metadata."""
flow = SimpleFlow()
structure = build_flow_structure(flow)