Add inline crew definition loading (#6183)

This commit is contained in:
Vinicius Brasil
2026-06-16 11:51:22 -07:00
committed by GitHub
parent 9d44d0a5e5
commit a6cf52ec7e
5 changed files with 376 additions and 51 deletions

View File

@@ -14,12 +14,22 @@ from crewai.project.annotations import (
tool,
)
from crewai.project.crew_base import CrewBase
from crewai.project.crew_definition import (
CrewAgentDefinition,
CrewDefinition,
CrewTaskDefinition,
PythonReferenceDefinition,
)
from crewai.project.crew_loader import load_crew, load_crew_and_kickoff
from crewai.project.json_loader import load_agent, strip_jsonc_comments
__all__ = [
"CrewAgentDefinition",
"CrewBase",
"CrewDefinition",
"CrewTaskDefinition",
"PythonReferenceDefinition",
"after_kickoff",
"agent",
"before_kickoff",

View File

@@ -0,0 +1,129 @@
"""Definition models for inline CrewAI crew configurations."""
from __future__ import annotations
from typing import Any, TypeAlias
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
__all__ = [
"CrewAgentDefinition",
"CrewDefinition",
"CrewTaskDefinition",
"PythonReferenceDefinition",
]
class PythonReferenceDefinition(BaseModel):
"""Dotted Python reference used by crew definitions."""
python: str
@field_validator("python")
@classmethod
def _validate_python_ref(cls, value: str) -> str:
path = value.strip()
if not path:
raise ValueError("Python reference 'python' must be a string")
if "." not in path:
raise ValueError(
f"Python reference '{path}' must be a dotted import path "
"like 'module.attribute'"
)
return path
class CrewAgentDefinition(BaseModel):
"""Inline agent definition used by a crew definition."""
model_config = ConfigDict(extra="allow")
role: str
goal: str
backstory: str
type: str | PythonReferenceDefinition | None = None
settings: dict[str, Any] = Field(default_factory=dict)
@field_validator("settings", mode="before")
@classmethod
def _validate_settings(cls, value: Any) -> Any:
if value is not None and not isinstance(value, dict):
raise ValueError("agent.settings must be a mapping")
return value or {}
class CrewTaskDefinition(BaseModel):
"""Task definition used by a crew definition."""
model_config = ConfigDict(extra="allow")
description: str
expected_output: str
name: str | None = None
agent: str | None = None
context: list[str] | None = None
type: str | PythonReferenceDefinition | None = None
_CrewAgentsInput: TypeAlias = dict[str, CrewAgentDefinition] | list[dict[str, Any]]
class CrewDefinition(BaseModel):
"""In-memory JSON/YAML crew definition with inline agents and tasks."""
model_config = ConfigDict(extra="allow")
agents: dict[str, CrewAgentDefinition]
tasks: list[CrewTaskDefinition]
inputs: dict[str, Any] = Field(default_factory=dict)
manager_agent: str | PythonReferenceDefinition | None = None
@field_validator("inputs", mode="before")
@classmethod
def _validate_inputs(cls, value: Any) -> Any:
if value is not None and not isinstance(value, dict):
raise ValueError("crew.inputs must be a mapping")
return value or {}
@field_validator(
"agents",
mode="before",
json_schema_input_type=_CrewAgentsInput,
)
@classmethod
def _validate_inline_agents(cls, value: Any) -> Any:
if isinstance(value, dict):
return value
if not isinstance(value, list):
return value
agents: dict[str, Any] = {}
for index, item in enumerate(value):
if not isinstance(item, dict):
raise ValueError(f"agents[{index}] must be an inline agent mapping")
if "name" in item:
name = item["name"]
if not isinstance(name, str) or not name:
raise ValueError(f"agents[{index}].name must be a non-empty string")
agents[name] = {key: val for key, val in item.items() if key != "name"}
continue
if len(item) != 1:
raise ValueError(
f"agents[{index}] must include a name field or be a one-key mapping"
)
name, definition = next(iter(item.items()))
agents[str(name)] = definition
return agents
@model_validator(mode="after")
def _validate_inline_shape(self) -> CrewDefinition:
if not self.agents:
raise ValueError("crew action requires inline agent definitions")
if not self.tasks:
raise ValueError("crew action requires a non-empty tasks list")
return self

View File

@@ -7,10 +7,15 @@ from typing import Any
from pydantic import ValidationError
from crewai.project.crew_definition import CrewDefinition
from crewai.project.json_loader import (
JSONAgentDefinition,
JSONCrewProject,
JSONProjectError,
JSONProjectValidationError,
_AgentDefinitionSource,
_crew_kwargs_from_definition,
_load_json_crew_project_definition,
_task_class_from_definition,
_task_kwargs_from_definition,
load_json_crew_project,
@@ -27,12 +32,73 @@ def load_crew(
default inputs. Agent definitions are resolved from individual
``<name>.jsonc`` / ``<name>.json`` files inside an ``agents/`` directory.
"""
from crewai import Crew, Task
crew_path = Path(source)
project = load_json_crew_project(crew_path, agents_dir=agents_dir)
return _load_crew_project(project, project_root=crew_path.parent)
def build_agent(agent_def: Any) -> Any:
def load_crew_from_definition(
definition: CrewDefinition | dict[str, Any],
*,
source: str | Path = "<inline crew>",
project_root: str | Path | None = None,
) -> tuple[Any, dict[str, Any]]:
"""Load a ``Crew`` from an in-memory JSON/YAML crew definition."""
root = Path(project_root) if project_root is not None else Path.cwd()
source_label = str(source)
crew_definition = (
definition
if isinstance(definition, CrewDefinition)
else CrewDefinition.model_validate(definition)
)
definition_data = crew_definition.model_dump(mode="python", exclude_none=True)
project = _crew_project_from_definition(
definition_data,
source=source_label,
project_root=root,
)
return _load_crew_project(project, project_root=root)
def _crew_project_from_definition(
definition: dict[str, Any],
*,
source: str,
project_root: Path,
) -> JSONCrewProject:
agent_bodies: dict[str, Any] = definition["agents"]
agent_names = list(agent_bodies)
manager_agent = definition.get("manager_agent")
if isinstance(manager_agent, str):
agent_names = [name for name in agent_names if name != manager_agent]
def load_agent_definition_source(agent_name: str) -> _AgentDefinitionSource | None:
body = agent_bodies.get(agent_name)
if body is None:
return None
return body, f"{source}: agents.{agent_name}"
return _load_json_crew_project_definition(
{**definition, "agents": agent_names},
source=source,
agents_dir=project_root / "agents",
project_root=project_root,
load_agent_definition_source=load_agent_definition_source,
missing_agent_hint=None,
collect_errors=False,
)
def _load_crew_project(
project: JSONCrewProject,
*,
project_root: Path,
) -> tuple[Any, dict[str, Any]]:
from crewai import Crew, Task
source_label = str(project.crew_path)
def build_agent(agent_def: JSONAgentDefinition) -> Any:
try:
return agent_def.agent_class(**agent_def.kwargs)
except ValidationError as exc:
@@ -52,22 +118,22 @@ def load_crew(
task_name_map: dict[str, Task] = {}
for index, task_defn in enumerate(project.task_definitions):
source_label = f"{crew_path}: tasks[{index}]"
task_class = _task_class_from_definition(task_defn, f"{source_label}: type")
task_source = f"{source_label}: tasks[{index}]"
task_class = _task_class_from_definition(task_defn, f"{task_source}: type")
task_kwargs = _task_kwargs_from_definition(
task_defn,
agents_map=agents_map,
task_name_map=task_name_map,
source=source_label,
project_root=crew_path.parent,
source=task_source,
project_root=project_root,
)
try:
task = task_class(**task_kwargs)
except ValidationError as exc:
raise JSONProjectError(f"{source_label}: validation failed: {exc}") from exc
raise JSONProjectError(f"{task_source}: validation failed: {exc}") from exc
except Exception as exc:
raise JSONProjectError(
f"{source_label}: failed to load task: {exc}"
f"{task_source}: failed to load task: {exc}"
) from exc
tasks_list.append(task)
@@ -80,17 +146,17 @@ def load_crew(
agents=[agents_map[name] for name in project.agent_names],
tasks=tasks_list,
agents_map=agents_map,
source=crew_path,
source=source_label,
)
try:
crew = Crew(**crew_kwargs)
except ValidationError as exc:
raise JSONProjectError(f"{crew_path}: validation failed: {exc}") from exc
raise JSONProjectError(f"{source_label}: validation failed: {exc}") from exc
except JSONProjectValidationError:
raise
except Exception as exc:
raise JSONProjectError(f"{crew_path}: failed to load crew: {exc}") from exc
raise JSONProjectError(f"{source_label}: failed to load crew: {exc}") from exc
return crew, project.definition.get("inputs", {})

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
import json
import logging
@@ -156,6 +157,10 @@ class JSONCrewProject:
task_definitions: list[dict[str, Any]]
_AgentDefinitionSource = tuple[dict[str, Any], str | Path]
_AgentDefinitionLoader = Callable[[str], _AgentDefinitionSource | None]
def find_json_project_file(directory: str | Path, stem: str) -> Path | None:
"""Return ``stem.jsonc`` or ``stem.json``, preferring JSONC."""
root = Path(directory)
@@ -230,7 +235,7 @@ def load_json_crew_project(
*,
collect_errors: bool = False,
) -> JSONCrewProject:
"""Parse and structurally validate a JSON crew project.
"""Load and structurally validate a JSON crew project from files.
When ``collect_errors`` is true, all discoverable structural errors are
returned as a single ``JSONProjectValidationError`` for deploy validation.
@@ -239,7 +244,46 @@ def load_json_crew_project(
crew_path = Path(source)
if agents_dir is None:
agents_dir = crew_path.parent / "agents"
agents_dir = Path(agents_dir)
def load_agent_definition_source(agent_name: str) -> _AgentDefinitionSource | None:
agent_file = find_json_project_file(agents_dir, agent_name)
if agent_file is None:
return None
return _expect_object(load_jsonc_file(agent_file), agent_file), agent_file
try:
defn = _expect_object(load_jsonc_file(crew_path), crew_path)
except Exception as exc:
if collect_errors:
raise JSONProjectValidationError([str(exc)]) from exc
raise
return _load_json_crew_project_definition(
defn,
source=crew_path,
agents_dir=agents_dir,
project_root=crew_path.parent,
load_agent_definition_source=load_agent_definition_source,
missing_agent_hint=(
f"not found in {agents_dir} "
f"(tried {{agent_name}}.jsonc and {{agent_name}}.json)"
),
collect_errors=collect_errors,
)
def _load_json_crew_project_definition(
defn: dict[str, Any],
*,
source: str | Path,
agents_dir: str | Path,
project_root: Path,
load_agent_definition_source: _AgentDefinitionLoader,
missing_agent_hint: str | None,
collect_errors: bool,
) -> JSONCrewProject:
"""Structurally validate a parsed JSON crew project definition."""
errors: list[str] = []
def fail(message: str, exc_type: type[Exception] = JSONProjectError) -> None:
@@ -256,67 +300,58 @@ def load_json_crew_project(
return
raise JSONProjectValidationError(messages)
try:
defn = _expect_object(load_jsonc_file(crew_path), crew_path)
except Exception as exc:
if collect_errors:
raise JSONProjectValidationError([str(exc)]) from exc
raise
fail_many(
_field_errors(
defn,
_crew_allowed_fields(),
_CREW_RUNTIME_FIELDS,
crew_path,
source,
{"inputs"},
)
)
fail_many(_python_reference_definition_errors(defn, crew_path))
fail_many(_python_reference_definition_errors(defn, source))
agent_names = defn.get("agents", [])
if not isinstance(agent_names, list) or not agent_names:
fail(f"{crew_path}: 'agents' must be a non-empty list")
fail(f"{source}: 'agents' must be a non-empty list")
agent_names = []
agents_dir = Path(agents_dir)
agent_definitions: dict[str, JSONAgentDefinition] = {}
def load_agent_definition(agent_name: str) -> None:
if not isinstance(agent_name, str) or not agent_name:
fail(f"{crew_path}: each agent reference must be a non-empty string")
fail(f"{source}: each agent reference must be a non-empty string")
return
if agent_name in agent_definitions:
return
agent_file = find_json_project_file(agents_dir, agent_name)
if agent_file is None:
message = (
f"Agent definition for '{agent_name}' not found in {agents_dir} "
f"(tried {agent_name}.jsonc and {agent_name}.json)"
)
if collect_errors:
errors.append(
f"{crew_path}: agent '{agent_name}' not found in {agents_dir} "
f"(tried {agent_name}.jsonc and {agent_name}.json)"
)
else:
raise FileNotFoundError(message)
return
try:
agent_defn = _expect_object(load_jsonc_file(agent_file), agent_file)
loaded_agent = load_agent_definition_source(agent_name)
if loaded_agent is None:
hint = (
missing_agent_hint.format(agent_name=agent_name)
if missing_agent_hint is not None
else "not found in provided agent definitions"
)
message = f"Agent definition for '{agent_name}' {hint}"
if collect_errors:
errors.append(f"{source}: agent '{agent_name}' {hint}")
else:
raise FileNotFoundError(message)
return
agent_defn, agent_source = loaded_agent
agent_class = _agent_class_from_definition(
agent_defn,
f"{agent_file}: type",
f"{agent_source}: type",
resolve_python_refs=not collect_errors,
)
agent_kwargs = _agent_kwargs_from_definition(
agent_defn,
agent_file,
agent_source,
agent_class=agent_class,
# Validation must never execute project code (custom tools).
resolve_tools=not collect_errors,
resolve_python_refs=not collect_errors,
project_root=crew_path.parent,
project_root=project_root,
)
except Exception as exc:
if collect_errors:
@@ -325,7 +360,7 @@ def load_json_crew_project(
raise
agent_definitions[agent_name] = JSONAgentDefinition(
name=agent_name,
path=agent_file,
path=Path(str(agent_source)),
definition=agent_defn,
kwargs=agent_kwargs,
agent_class=agent_class,
@@ -342,7 +377,7 @@ def load_json_crew_project(
pass
else:
fail(
f"{crew_path}: 'manager_agent' must be an agent definition name "
f"{source}: 'manager_agent' must be an agent definition name "
f'or a {{"{PYTHON_REF_KEY}": "module.agent"}} reference'
)
@@ -350,12 +385,12 @@ def load_json_crew_project(
task_defs = defn.get("tasks", [])
if not isinstance(task_defs, list) or not task_defs:
fail(f"{crew_path}: 'tasks' must be a non-empty list")
fail(f"{source}: 'tasks' must be a non-empty list")
task_defs = []
known_tasks: set[str] = set()
for index, task_defn in enumerate(task_defs):
task_path = f"{crew_path}: tasks[{index}]"
task_path = f"{source}: tasks[{index}]"
if not isinstance(task_defn, dict):
fail(f"{task_path} must be an object")
continue
@@ -381,7 +416,7 @@ def load_json_crew_project(
)
fail_many(
_tool_definition_errors(task_defn.get("tools"), task_path, crew_path.parent)
_tool_definition_errors(task_defn.get("tools"), task_path, project_root)
)
context_names = task_defn.get("context")
@@ -406,8 +441,8 @@ def load_json_crew_project(
raise JSONProjectValidationError(errors)
return JSONCrewProject(
crew_path=crew_path,
agents_dir=agents_dir,
crew_path=Path(str(source)),
agents_dir=Path(str(agents_dir)),
definition=defn,
agent_names=list(agent_names),
agents=agent_definitions,

View File

@@ -9,7 +9,7 @@ import pytest
from crewai.llms.base_llm import BaseLLM
from crewai.project.json_loader import JSONProjectError, JSONProjectValidationError
from crewai.project.crew_loader import load_crew
from crewai.project.crew_loader import load_crew, load_crew_from_definition
def _write_python_defs(tmp_path: Path) -> None:
@@ -70,6 +70,91 @@ def _input_file_path(value) -> Path:
class TestLoadCrew:
def test_load_crew_from_inline_definition(self):
crew, inputs = load_crew_from_definition(
{
"name": "inline_crew",
"agents": {
"researcher": {
"role": "Researcher",
"goal": "Research {topic}",
"backstory": "Knows things.",
}
},
"tasks": [
{
"name": "research",
"description": "Research {topic}",
"expected_output": "Findings about {topic}",
"agent": "researcher",
}
],
"inputs": {"topic": "AI"},
}
)
assert crew.name == "inline_crew"
assert crew.agents[0].role == "Researcher"
assert crew.tasks[0].description == "Research {topic}"
assert inputs == {"topic": "AI"}
def test_inline_definition_accepts_null_inputs(self):
_, inputs = load_crew_from_definition(
{
"agents": {
"researcher": {
"role": "Researcher",
"goal": "Research",
"backstory": "Knows things.",
}
},
"tasks": [
{
"description": "Research",
"expected_output": "Findings",
"agent": "researcher",
}
],
"inputs": None,
}
)
assert inputs == {}
def test_inline_hierarchical_manager_agent_is_not_duplicated(self):
crew, _ = load_crew_from_definition(
{
"name": "inline_hier_manager_crew",
"agents": {
"worker": {
"role": "Worker",
"goal": "Do work",
"backstory": "Does things.",
},
"manager": {
"role": "Manager",
"goal": "Coordinate work",
"backstory": "Keeps the work moving.",
},
},
"tasks": [
{
"description": "Do work",
"expected_output": "Work done",
"agent": "manager",
}
],
"process": "hierarchical",
"manager_agent": "manager",
}
)
assert len(crew.agents) == 1
assert crew.agents[0].role == "Worker"
assert crew.manager_agent is not None
assert crew.manager_agent.role == "Manager"
assert crew.tasks[0].agent is crew.manager_agent
def test_minimal_crew(self, tmp_path: Path):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()