From 9d44d0a5e5b267a3ebfad872746e483b2b2d93bb Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Tue, 16 Jun 2026 11:00:07 -0700 Subject: [PATCH 1/4] Serialize concrete Pydantic subclasses (#6187) --- lib/crewai/src/crewai/utilities/serialization.py | 2 +- lib/crewai/tests/utilities/test_serialization.py | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/lib/crewai/src/crewai/utilities/serialization.py b/lib/crewai/src/crewai/utilities/serialization.py index 1b9e76588..8bc38b6f8 100644 --- a/lib/crewai/src/crewai/utilities/serialization.py +++ b/lib/crewai/src/crewai/utilities/serialization.py @@ -99,7 +99,7 @@ def to_serializable( if isinstance(obj, BaseModel): try: return to_serializable( - obj=obj.model_dump(mode="json", exclude=exclude), + obj=obj.model_dump(mode="json", exclude=exclude, serialize_as_any=True), max_depth=max_depth, _current_depth=_current_depth + 1, _ancestors=new_ancestors, diff --git a/lib/crewai/tests/utilities/test_serialization.py b/lib/crewai/tests/utilities/test_serialization.py index 8ec68ead8..93817f285 100644 --- a/lib/crewai/tests/utilities/test_serialization.py +++ b/lib/crewai/tests/utilities/test_serialization.py @@ -21,6 +21,10 @@ class Person(BaseModel): skills: List[str] +class Container(BaseModel): + payload: BaseModel | None = None + + @dataclass class DataclassPerson: name: str @@ -114,6 +118,16 @@ def test_pydantic_model_serialization(): ) +def test_polymorphic_field_serializes_concrete_subclass(): + container = Container( + payload=Address(street="1 Main", city="Tech City", country="Pythonia") + ) + + assert to_serializable(container) == { + "payload": {"street": "1 Main", "city": "Tech City", "country": "Pythonia"} + } + + def test_dataclass_serialization_recurses_into_nested_values(): person = DataclassPerson( name="Ada", From a6cf52ec7ee87237c67558b6cccf2baf970966e7 Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Tue, 16 Jun 2026 11:51:22 -0700 Subject: [PATCH 2/4] Add inline crew definition loading (#6183) --- lib/crewai/src/crewai/project/__init__.py | 10 ++ .../src/crewai/project/crew_definition.py | 129 ++++++++++++++++++ lib/crewai/src/crewai/project/crew_loader.py | 90 ++++++++++-- lib/crewai/src/crewai/project/json_loader.py | 111 +++++++++------ lib/crewai/tests/project/test_crew_loader.py | 87 +++++++++++- 5 files changed, 376 insertions(+), 51 deletions(-) create mode 100644 lib/crewai/src/crewai/project/crew_definition.py diff --git a/lib/crewai/src/crewai/project/__init__.py b/lib/crewai/src/crewai/project/__init__.py index fabbbbe76..151b616f9 100644 --- a/lib/crewai/src/crewai/project/__init__.py +++ b/lib/crewai/src/crewai/project/__init__.py @@ -14,12 +14,22 @@ from crewai.project.annotations import ( tool, ) from crewai.project.crew_base import CrewBase +from crewai.project.crew_definition import ( + CrewAgentDefinition, + CrewDefinition, + CrewTaskDefinition, + PythonReferenceDefinition, +) from crewai.project.crew_loader import load_crew, load_crew_and_kickoff from crewai.project.json_loader import load_agent, strip_jsonc_comments __all__ = [ + "CrewAgentDefinition", "CrewBase", + "CrewDefinition", + "CrewTaskDefinition", + "PythonReferenceDefinition", "after_kickoff", "agent", "before_kickoff", diff --git a/lib/crewai/src/crewai/project/crew_definition.py b/lib/crewai/src/crewai/project/crew_definition.py new file mode 100644 index 000000000..059878b5c --- /dev/null +++ b/lib/crewai/src/crewai/project/crew_definition.py @@ -0,0 +1,129 @@ +"""Definition models for inline CrewAI crew configurations.""" + +from __future__ import annotations + +from typing import Any, TypeAlias + +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator + + +__all__ = [ + "CrewAgentDefinition", + "CrewDefinition", + "CrewTaskDefinition", + "PythonReferenceDefinition", +] + + +class PythonReferenceDefinition(BaseModel): + """Dotted Python reference used by crew definitions.""" + + python: str + + @field_validator("python") + @classmethod + def _validate_python_ref(cls, value: str) -> str: + path = value.strip() + if not path: + raise ValueError("Python reference 'python' must be a string") + if "." not in path: + raise ValueError( + f"Python reference '{path}' must be a dotted import path " + "like 'module.attribute'" + ) + return path + + +class CrewAgentDefinition(BaseModel): + """Inline agent definition used by a crew definition.""" + + model_config = ConfigDict(extra="allow") + + role: str + goal: str + backstory: str + type: str | PythonReferenceDefinition | None = None + settings: dict[str, Any] = Field(default_factory=dict) + + @field_validator("settings", mode="before") + @classmethod + def _validate_settings(cls, value: Any) -> Any: + if value is not None and not isinstance(value, dict): + raise ValueError("agent.settings must be a mapping") + return value or {} + + +class CrewTaskDefinition(BaseModel): + """Task definition used by a crew definition.""" + + model_config = ConfigDict(extra="allow") + + description: str + expected_output: str + name: str | None = None + agent: str | None = None + context: list[str] | None = None + type: str | PythonReferenceDefinition | None = None + + +_CrewAgentsInput: TypeAlias = dict[str, CrewAgentDefinition] | list[dict[str, Any]] + + +class CrewDefinition(BaseModel): + """In-memory JSON/YAML crew definition with inline agents and tasks.""" + + model_config = ConfigDict(extra="allow") + + agents: dict[str, CrewAgentDefinition] + tasks: list[CrewTaskDefinition] + inputs: dict[str, Any] = Field(default_factory=dict) + manager_agent: str | PythonReferenceDefinition | None = None + + @field_validator("inputs", mode="before") + @classmethod + def _validate_inputs(cls, value: Any) -> Any: + if value is not None and not isinstance(value, dict): + raise ValueError("crew.inputs must be a mapping") + return value or {} + + @field_validator( + "agents", + mode="before", + json_schema_input_type=_CrewAgentsInput, + ) + @classmethod + def _validate_inline_agents(cls, value: Any) -> Any: + if isinstance(value, dict): + return value + if not isinstance(value, list): + return value + + agents: dict[str, Any] = {} + for index, item in enumerate(value): + if not isinstance(item, dict): + raise ValueError(f"agents[{index}] must be an inline agent mapping") + + if "name" in item: + name = item["name"] + if not isinstance(name, str) or not name: + raise ValueError(f"agents[{index}].name must be a non-empty string") + agents[name] = {key: val for key, val in item.items() if key != "name"} + continue + + if len(item) != 1: + raise ValueError( + f"agents[{index}] must include a name field or be a one-key mapping" + ) + name, definition = next(iter(item.items())) + agents[str(name)] = definition + + return agents + + @model_validator(mode="after") + def _validate_inline_shape(self) -> CrewDefinition: + if not self.agents: + raise ValueError("crew action requires inline agent definitions") + + if not self.tasks: + raise ValueError("crew action requires a non-empty tasks list") + return self diff --git a/lib/crewai/src/crewai/project/crew_loader.py b/lib/crewai/src/crewai/project/crew_loader.py index e106e60d3..f4ee0d077 100644 --- a/lib/crewai/src/crewai/project/crew_loader.py +++ b/lib/crewai/src/crewai/project/crew_loader.py @@ -7,10 +7,15 @@ from typing import Any from pydantic import ValidationError +from crewai.project.crew_definition import CrewDefinition from crewai.project.json_loader import ( + JSONAgentDefinition, + JSONCrewProject, JSONProjectError, JSONProjectValidationError, + _AgentDefinitionSource, _crew_kwargs_from_definition, + _load_json_crew_project_definition, _task_class_from_definition, _task_kwargs_from_definition, load_json_crew_project, @@ -27,12 +32,73 @@ def load_crew( default inputs. Agent definitions are resolved from individual ``.jsonc`` / ``.json`` files inside an ``agents/`` directory. """ - from crewai import Crew, Task - crew_path = Path(source) project = load_json_crew_project(crew_path, agents_dir=agents_dir) + return _load_crew_project(project, project_root=crew_path.parent) - def build_agent(agent_def: Any) -> Any: + +def load_crew_from_definition( + definition: CrewDefinition | dict[str, Any], + *, + source: str | Path = "", + project_root: str | Path | None = None, +) -> tuple[Any, dict[str, Any]]: + """Load a ``Crew`` from an in-memory JSON/YAML crew definition.""" + root = Path(project_root) if project_root is not None else Path.cwd() + source_label = str(source) + crew_definition = ( + definition + if isinstance(definition, CrewDefinition) + else CrewDefinition.model_validate(definition) + ) + definition_data = crew_definition.model_dump(mode="python", exclude_none=True) + project = _crew_project_from_definition( + definition_data, + source=source_label, + project_root=root, + ) + return _load_crew_project(project, project_root=root) + + +def _crew_project_from_definition( + definition: dict[str, Any], + *, + source: str, + project_root: Path, +) -> JSONCrewProject: + agent_bodies: dict[str, Any] = definition["agents"] + agent_names = list(agent_bodies) + manager_agent = definition.get("manager_agent") + if isinstance(manager_agent, str): + agent_names = [name for name in agent_names if name != manager_agent] + + def load_agent_definition_source(agent_name: str) -> _AgentDefinitionSource | None: + body = agent_bodies.get(agent_name) + if body is None: + return None + return body, f"{source}: agents.{agent_name}" + + return _load_json_crew_project_definition( + {**definition, "agents": agent_names}, + source=source, + agents_dir=project_root / "agents", + project_root=project_root, + load_agent_definition_source=load_agent_definition_source, + missing_agent_hint=None, + collect_errors=False, + ) + + +def _load_crew_project( + project: JSONCrewProject, + *, + project_root: Path, +) -> tuple[Any, dict[str, Any]]: + from crewai import Crew, Task + + source_label = str(project.crew_path) + + def build_agent(agent_def: JSONAgentDefinition) -> Any: try: return agent_def.agent_class(**agent_def.kwargs) except ValidationError as exc: @@ -52,22 +118,22 @@ def load_crew( task_name_map: dict[str, Task] = {} for index, task_defn in enumerate(project.task_definitions): - source_label = f"{crew_path}: tasks[{index}]" - task_class = _task_class_from_definition(task_defn, f"{source_label}: type") + task_source = f"{source_label}: tasks[{index}]" + task_class = _task_class_from_definition(task_defn, f"{task_source}: type") task_kwargs = _task_kwargs_from_definition( task_defn, agents_map=agents_map, task_name_map=task_name_map, - source=source_label, - project_root=crew_path.parent, + source=task_source, + project_root=project_root, ) try: task = task_class(**task_kwargs) except ValidationError as exc: - raise JSONProjectError(f"{source_label}: validation failed: {exc}") from exc + raise JSONProjectError(f"{task_source}: validation failed: {exc}") from exc except Exception as exc: raise JSONProjectError( - f"{source_label}: failed to load task: {exc}" + f"{task_source}: failed to load task: {exc}" ) from exc tasks_list.append(task) @@ -80,17 +146,17 @@ def load_crew( agents=[agents_map[name] for name in project.agent_names], tasks=tasks_list, agents_map=agents_map, - source=crew_path, + source=source_label, ) try: crew = Crew(**crew_kwargs) except ValidationError as exc: - raise JSONProjectError(f"{crew_path}: validation failed: {exc}") from exc + raise JSONProjectError(f"{source_label}: validation failed: {exc}") from exc except JSONProjectValidationError: raise except Exception as exc: - raise JSONProjectError(f"{crew_path}: failed to load crew: {exc}") from exc + raise JSONProjectError(f"{source_label}: failed to load crew: {exc}") from exc return crew, project.definition.get("inputs", {}) diff --git a/lib/crewai/src/crewai/project/json_loader.py b/lib/crewai/src/crewai/project/json_loader.py index b0b3bca16..ab13d881e 100644 --- a/lib/crewai/src/crewai/project/json_loader.py +++ b/lib/crewai/src/crewai/project/json_loader.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections.abc import Callable from dataclasses import dataclass import json import logging @@ -156,6 +157,10 @@ class JSONCrewProject: task_definitions: list[dict[str, Any]] +_AgentDefinitionSource = tuple[dict[str, Any], str | Path] +_AgentDefinitionLoader = Callable[[str], _AgentDefinitionSource | None] + + def find_json_project_file(directory: str | Path, stem: str) -> Path | None: """Return ``stem.jsonc`` or ``stem.json``, preferring JSONC.""" root = Path(directory) @@ -230,7 +235,7 @@ def load_json_crew_project( *, collect_errors: bool = False, ) -> JSONCrewProject: - """Parse and structurally validate a JSON crew project. + """Load and structurally validate a JSON crew project from files. When ``collect_errors`` is true, all discoverable structural errors are returned as a single ``JSONProjectValidationError`` for deploy validation. @@ -239,7 +244,46 @@ def load_json_crew_project( crew_path = Path(source) if agents_dir is None: agents_dir = crew_path.parent / "agents" + agents_dir = Path(agents_dir) + def load_agent_definition_source(agent_name: str) -> _AgentDefinitionSource | None: + agent_file = find_json_project_file(agents_dir, agent_name) + if agent_file is None: + return None + return _expect_object(load_jsonc_file(agent_file), agent_file), agent_file + + try: + defn = _expect_object(load_jsonc_file(crew_path), crew_path) + except Exception as exc: + if collect_errors: + raise JSONProjectValidationError([str(exc)]) from exc + raise + + return _load_json_crew_project_definition( + defn, + source=crew_path, + agents_dir=agents_dir, + project_root=crew_path.parent, + load_agent_definition_source=load_agent_definition_source, + missing_agent_hint=( + f"not found in {agents_dir} " + f"(tried {{agent_name}}.jsonc and {{agent_name}}.json)" + ), + collect_errors=collect_errors, + ) + + +def _load_json_crew_project_definition( + defn: dict[str, Any], + *, + source: str | Path, + agents_dir: str | Path, + project_root: Path, + load_agent_definition_source: _AgentDefinitionLoader, + missing_agent_hint: str | None, + collect_errors: bool, +) -> JSONCrewProject: + """Structurally validate a parsed JSON crew project definition.""" errors: list[str] = [] def fail(message: str, exc_type: type[Exception] = JSONProjectError) -> None: @@ -256,67 +300,58 @@ def load_json_crew_project( return raise JSONProjectValidationError(messages) - try: - defn = _expect_object(load_jsonc_file(crew_path), crew_path) - except Exception as exc: - if collect_errors: - raise JSONProjectValidationError([str(exc)]) from exc - raise - fail_many( _field_errors( defn, _crew_allowed_fields(), _CREW_RUNTIME_FIELDS, - crew_path, + source, {"inputs"}, ) ) - fail_many(_python_reference_definition_errors(defn, crew_path)) + fail_many(_python_reference_definition_errors(defn, source)) agent_names = defn.get("agents", []) if not isinstance(agent_names, list) or not agent_names: - fail(f"{crew_path}: 'agents' must be a non-empty list") + fail(f"{source}: 'agents' must be a non-empty list") agent_names = [] - agents_dir = Path(agents_dir) agent_definitions: dict[str, JSONAgentDefinition] = {} def load_agent_definition(agent_name: str) -> None: if not isinstance(agent_name, str) or not agent_name: - fail(f"{crew_path}: each agent reference must be a non-empty string") + fail(f"{source}: each agent reference must be a non-empty string") return if agent_name in agent_definitions: return - agent_file = find_json_project_file(agents_dir, agent_name) - if agent_file is None: - message = ( - f"Agent definition for '{agent_name}' not found in {agents_dir} " - f"(tried {agent_name}.jsonc and {agent_name}.json)" - ) - if collect_errors: - errors.append( - f"{crew_path}: agent '{agent_name}' not found in {agents_dir} " - f"(tried {agent_name}.jsonc and {agent_name}.json)" - ) - else: - raise FileNotFoundError(message) - return try: - agent_defn = _expect_object(load_jsonc_file(agent_file), agent_file) + loaded_agent = load_agent_definition_source(agent_name) + if loaded_agent is None: + hint = ( + missing_agent_hint.format(agent_name=agent_name) + if missing_agent_hint is not None + else "not found in provided agent definitions" + ) + message = f"Agent definition for '{agent_name}' {hint}" + if collect_errors: + errors.append(f"{source}: agent '{agent_name}' {hint}") + else: + raise FileNotFoundError(message) + return + agent_defn, agent_source = loaded_agent agent_class = _agent_class_from_definition( agent_defn, - f"{agent_file}: type", + f"{agent_source}: type", resolve_python_refs=not collect_errors, ) agent_kwargs = _agent_kwargs_from_definition( agent_defn, - agent_file, + agent_source, agent_class=agent_class, # Validation must never execute project code (custom tools). resolve_tools=not collect_errors, resolve_python_refs=not collect_errors, - project_root=crew_path.parent, + project_root=project_root, ) except Exception as exc: if collect_errors: @@ -325,7 +360,7 @@ def load_json_crew_project( raise agent_definitions[agent_name] = JSONAgentDefinition( name=agent_name, - path=agent_file, + path=Path(str(agent_source)), definition=agent_defn, kwargs=agent_kwargs, agent_class=agent_class, @@ -342,7 +377,7 @@ def load_json_crew_project( pass else: fail( - f"{crew_path}: 'manager_agent' must be an agent definition name " + f"{source}: 'manager_agent' must be an agent definition name " f'or a {{"{PYTHON_REF_KEY}": "module.agent"}} reference' ) @@ -350,12 +385,12 @@ def load_json_crew_project( task_defs = defn.get("tasks", []) if not isinstance(task_defs, list) or not task_defs: - fail(f"{crew_path}: 'tasks' must be a non-empty list") + fail(f"{source}: 'tasks' must be a non-empty list") task_defs = [] known_tasks: set[str] = set() for index, task_defn in enumerate(task_defs): - task_path = f"{crew_path}: tasks[{index}]" + task_path = f"{source}: tasks[{index}]" if not isinstance(task_defn, dict): fail(f"{task_path} must be an object") continue @@ -381,7 +416,7 @@ def load_json_crew_project( ) fail_many( - _tool_definition_errors(task_defn.get("tools"), task_path, crew_path.parent) + _tool_definition_errors(task_defn.get("tools"), task_path, project_root) ) context_names = task_defn.get("context") @@ -406,8 +441,8 @@ def load_json_crew_project( raise JSONProjectValidationError(errors) return JSONCrewProject( - crew_path=crew_path, - agents_dir=agents_dir, + crew_path=Path(str(source)), + agents_dir=Path(str(agents_dir)), definition=defn, agent_names=list(agent_names), agents=agent_definitions, diff --git a/lib/crewai/tests/project/test_crew_loader.py b/lib/crewai/tests/project/test_crew_loader.py index c4d9f75fa..367bdbd30 100644 --- a/lib/crewai/tests/project/test_crew_loader.py +++ b/lib/crewai/tests/project/test_crew_loader.py @@ -9,7 +9,7 @@ import pytest from crewai.llms.base_llm import BaseLLM from crewai.project.json_loader import JSONProjectError, JSONProjectValidationError -from crewai.project.crew_loader import load_crew +from crewai.project.crew_loader import load_crew, load_crew_from_definition def _write_python_defs(tmp_path: Path) -> None: @@ -70,6 +70,91 @@ def _input_file_path(value) -> Path: class TestLoadCrew: + def test_load_crew_from_inline_definition(self): + crew, inputs = load_crew_from_definition( + { + "name": "inline_crew", + "agents": { + "researcher": { + "role": "Researcher", + "goal": "Research {topic}", + "backstory": "Knows things.", + } + }, + "tasks": [ + { + "name": "research", + "description": "Research {topic}", + "expected_output": "Findings about {topic}", + "agent": "researcher", + } + ], + "inputs": {"topic": "AI"}, + } + ) + + assert crew.name == "inline_crew" + assert crew.agents[0].role == "Researcher" + assert crew.tasks[0].description == "Research {topic}" + assert inputs == {"topic": "AI"} + + def test_inline_definition_accepts_null_inputs(self): + _, inputs = load_crew_from_definition( + { + "agents": { + "researcher": { + "role": "Researcher", + "goal": "Research", + "backstory": "Knows things.", + } + }, + "tasks": [ + { + "description": "Research", + "expected_output": "Findings", + "agent": "researcher", + } + ], + "inputs": None, + } + ) + + assert inputs == {} + + def test_inline_hierarchical_manager_agent_is_not_duplicated(self): + crew, _ = load_crew_from_definition( + { + "name": "inline_hier_manager_crew", + "agents": { + "worker": { + "role": "Worker", + "goal": "Do work", + "backstory": "Does things.", + }, + "manager": { + "role": "Manager", + "goal": "Coordinate work", + "backstory": "Keeps the work moving.", + }, + }, + "tasks": [ + { + "description": "Do work", + "expected_output": "Work done", + "agent": "manager", + } + ], + "process": "hierarchical", + "manager_agent": "manager", + } + ) + + assert len(crew.agents) == 1 + assert crew.agents[0].role == "Worker" + assert crew.manager_agent is not None + assert crew.manager_agent.role == "Manager" + assert crew.tasks[0].agent is crew.manager_agent + def test_minimal_crew(self, tmp_path: Path): agents_dir = tmp_path / "agents" agents_dir.mkdir() From 4eb90ffbf31bde962e01894046ade65a84b17d7e Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Tue, 16 Jun 2026 12:59:48 -0700 Subject: [PATCH 3/4] Add crew actions to FlowDefinition (#6184) --- lib/crewai/src/crewai/flow/flow_definition.py | 17 +- .../src/crewai/flow/runtime/_actions.py | 21 ++ lib/crewai/tests/test_flow_definition.py | 1 + lib/crewai/tests/test_flow_from_definition.py | 246 ++++++++++++++++++ 4 files changed, 284 insertions(+), 1 deletion(-) diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index ae8be4ec5..b8a32d68e 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -28,6 +28,7 @@ from crewai.flow.conversational_definition import ( FlowConversationalDefinition, FlowConversationalRouterDefinition, ) +from crewai.project.crew_definition import CrewDefinition logger = logging.getLogger(__name__) @@ -41,6 +42,7 @@ __all__ = [ "FlowConfigDefinition", "FlowConversationalDefinition", "FlowConversationalRouterDefinition", + "FlowCrewActionDefinition", "FlowDefinition", "FlowDefinitionCondition", "FlowDefinitionDiagnostic", @@ -176,6 +178,15 @@ class FlowToolActionDefinition(BaseModel): with_: dict[str, Any] | None = Field(default=None, alias="with") +class FlowCrewActionDefinition(BaseModel): + """A Flow method action that builds and kicks off a CrewAI crew.""" + + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + call: TypingLiteral["crew"] + with_: CrewDefinition = Field(alias="with") + + class FlowExpressionActionDefinition(BaseModel): """A Flow method action that evaluates a CEL expression.""" @@ -186,7 +197,10 @@ class FlowExpressionActionDefinition(BaseModel): FlowInnerActionDefinition = ( - FlowCodeActionDefinition | FlowToolActionDefinition | FlowExpressionActionDefinition + FlowCodeActionDefinition + | FlowToolActionDefinition + | FlowCrewActionDefinition + | FlowExpressionActionDefinition ) @@ -236,6 +250,7 @@ class FlowEachActionDefinition(BaseModel): FlowActionDefinition = ( FlowCodeActionDefinition | FlowToolActionDefinition + | FlowCrewActionDefinition | FlowExpressionActionDefinition | FlowEachActionDefinition ) diff --git a/lib/crewai/src/crewai/flow/runtime/_actions.py b/lib/crewai/src/crewai/flow/runtime/_actions.py index 480fbb982..97333e209 100644 --- a/lib/crewai/src/crewai/flow/runtime/_actions.py +++ b/lib/crewai/src/crewai/flow/runtime/_actions.py @@ -11,6 +11,7 @@ from typing import TYPE_CHECKING, Any, Protocol, cast from crewai.flow.flow_definition import ( FlowActionDefinition, FlowCodeActionDefinition, + FlowCrewActionDefinition, FlowEachActionDefinition, FlowEachInnerActionDefinition, FlowExpressionActionDefinition, @@ -104,6 +105,25 @@ class ToolAction: ) from e +class CrewAction: + definition_type = FlowCrewActionDefinition + + def __init__(self, flow: Flow[Any], definition: FlowCrewActionDefinition) -> None: + self.flow = flow + self.definition = definition + + async def run(self, *_args: Any, **kwargs: Any) -> Any: + from crewai.project.crew_loader import load_crew_from_definition + + local_context = _pop_local_context(kwargs) + crew_definition = self.definition.with_ + inputs = render_with_block( + self.flow, crew_definition.inputs, local_context=local_context + ) + crew, _ = load_crew_from_definition(crew_definition, source="crew action") + return await crew.kickoff_async(inputs=inputs) + + class ExpressionAction: definition_type = FlowExpressionActionDefinition @@ -177,6 +197,7 @@ _ACTION_TYPES: tuple[_ActionType, ...] = ( EachAction, CodeAction, ToolAction, + CrewAction, ExpressionAction, ) diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index e2b3a7ad4..946ebd336 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -41,6 +41,7 @@ def test_flow_public_exports_are_explicit(): "FlowConfigDefinition", "FlowConversationalDefinition", "FlowConversationalRouterDefinition", + "FlowCrewActionDefinition", "FlowDefinition", "FlowDefinitionCondition", "FlowDefinitionDiagnostic", diff --git a/lib/crewai/tests/test_flow_from_definition.py b/lib/crewai/tests/test_flow_from_definition.py index aac114c4d..69bb96816 100644 --- a/lib/crewai/tests/test_flow_from_definition.py +++ b/lib/crewai/tests/test_flow_from_definition.py @@ -765,6 +765,252 @@ methods: ) +def test_crew_action_runs_inline_yaml_definition(monkeypatch: pytest.MonkeyPatch): + from crewai import Crew + + async def fake_kickoff_async( + self: Crew, inputs: dict[str, Any] | None = None, **_kwargs: Any + ) -> dict[str, Any]: + return { + "crew": self.name, + "agents": [agent.role for agent in self.agents], + "tasks": [task.description for task in self.tasks], + "inputs": inputs, + } + + monkeypatch.setattr(Crew, "kickoff_async", fake_kickoff_async) + + yaml_str = """ +schema: crewai.flow/v1 +name: CrewFlow +methods: + research: + do: + call: crew + with: + name: inline_research + agents: + researcher: + role: Researcher + goal: Research {topic} + backstory: Knows things. + tasks: + - name: research_task + description: Research {topic} + expected_output: Findings about {topic} + agent: researcher + inputs: + topic: "${state.topic}" + start: true +""" + + flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str)) + + assert flow.kickoff(inputs={"topic": "AI"}) == { + "crew": "inline_research", + "agents": ["Researcher"], + "tasks": ["Research {topic}"], + "inputs": {"topic": "AI"}, + } + + +def test_crew_action_round_trips_with_inline_definition(): + definition = FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "CrewFlow", + "methods": { + "research": { + "start": True, + "do": { + "call": "crew", + "with": { + "name": "inline_research", + "agents": { + "researcher": { + "role": "Researcher", + "goal": "Research {topic}", + "backstory": "Knows things.", + } + }, + "tasks": [ + { + "name": "research_task", + "description": "Research {topic}", + "expected_output": "Findings about {topic}", + "agent": "researcher", + } + ], + "inputs": {"topic": "${state.topic}"}, + }, + }, + } + }, + } + ) + + assert definition.to_dict()["methods"]["research"]["do"]["call"] == "crew" + assert ( + definition.to_dict()["methods"]["research"]["do"]["with"]["agents"][ + "researcher" + ]["role"] + == "Researcher" + ) + + +def test_crew_action_normalizes_named_agent_list_definition(): + definition = FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "CrewFlow", + "methods": { + "research": { + "start": True, + "do": { + "call": "crew", + "with": { + "agents": [ + { + "name": "researcher", + "role": "Researcher", + "goal": "Research {topic}", + "backstory": "Knows things.", + } + ], + "tasks": [ + { + "description": "Research {topic}", + "expected_output": "Findings about {topic}", + "agent": "researcher", + } + ], + }, + }, + } + }, + } + ) + + assert ( + definition.to_dict()["methods"]["research"]["do"]["with"]["agents"][ + "researcher" + ]["role"] + == "Researcher" + ) + + +def test_crew_action_json_schema_describes_inline_crew_definitions(): + schema_defs = FlowDefinition.json_schema()["$defs"] + agents_schema = schema_defs["CrewDefinition"]["properties"]["agents"] + + assert set(schema_defs["CrewDefinition"]["properties"]) >= { + "agents", + "tasks", + "inputs", + } + assert {option["type"] for option in agents_schema["anyOf"]} == {"array", "object"} + assert set(schema_defs["CrewAgentDefinition"]["properties"]) >= { + "role", + "goal", + "backstory", + "settings", + } + assert set(schema_defs["CrewTaskDefinition"]["properties"]) >= { + "description", + "expected_output", + "agent", + "context", + } + + +def test_crew_action_rejects_incomplete_inline_agent_definition(): + with pytest.raises(ValidationError, match="goal"): + FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "CrewFlow", + "methods": { + "research": { + "start": True, + "do": { + "call": "crew", + "with": { + "agents": { + "researcher": { + "role": "Researcher", + "backstory": "Knows things.", + } + }, + "tasks": [ + { + "description": "Research", + "expected_output": "Findings", + "agent": "researcher", + } + ], + }, + }, + } + }, + } + ) + + +def test_crew_action_rejects_ref(): + with pytest.raises(ValidationError, match="ref"): + FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "CrewFlow", + "methods": { + "research": { + "start": True, + "do": { + "call": "crew", + "ref": "project.crew:build_crew", + "with": {"inputs": {"topic": "AI"}}, + }, + } + }, + } + ) + + +def test_crew_action_rejects_non_mapping_inputs_in_definition(): + with pytest.raises(ValidationError, match="crew.inputs must be a mapping"): + FlowDefinition.from_dict( + { + "schema": "crewai.flow/v1", + "name": "CrewFlow", + "methods": { + "research": { + "start": True, + "do": { + "call": "crew", + "with": { + "agents": { + "researcher": { + "role": "Researcher", + "goal": "Research", + "backstory": "Knows things.", + } + }, + "tasks": [ + { + "description": "Research", + "expected_output": "Findings", + "agent": "researcher", + } + ], + "inputs": "topic", + }, + }, + } + }, + } + ) + + def test_tool_action_reports_invalid_cel_expression(): yaml_str = f""" schema: crewai.flow/v1 From 06ada68083f98fb83b40fa12b196c5e1578fa01a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Moura?= Date: Tue, 16 Jun 2026 18:45:26 -0300 Subject: [PATCH 4/4] Enhance crew loading and validation logic (#6182) * Enhance crew loading and validation logic - Updated `crew_loader.py` to pass the project root when loading task and agent definitions, improving the handling of Python references. - Refactored `json_loader.py` to include additional validation for Python references, ensuring they are resolved within the project root and enforcing depth limits. - Added tests in `test_crew_loader.py` and `test_json_loader.py` to validate rejection of unsafe Python references and input files outside the project root. - Improved error handling for JSON project validation, ensuring clearer feedback for invalid configurations. * Refactor tests for hierarchical verbose manager agent - Removed `@pytest.mark.vcr()` decorators from `test_hierarchical_verbose_manager_agent` and `test_hierarchical_verbose_false_manager_agent`. - Introduced mocking for task outputs in both tests to simulate execution without relying on external dependencies. - Ensured that the `crew.kickoff()` method is called within a context that patches the `Task.execute_sync` method, improving test isolation and reliability. * Fix JSON loader PR review comments * Fix JSON loader project root after rebase * Handle UNC paths in JSON input files --- lib/crewai/src/crewai/project/crew_loader.py | 7 +- lib/crewai/src/crewai/project/json_loader.py | 543 +++++++++++++++++-- lib/crewai/tests/project/test_crew_loader.py | 235 ++++++++ lib/crewai/tests/project/test_json_loader.py | 48 ++ lib/crewai/tests/test_crew.py | 18 +- 5 files changed, 786 insertions(+), 65 deletions(-) diff --git a/lib/crewai/src/crewai/project/crew_loader.py b/lib/crewai/src/crewai/project/crew_loader.py index f4ee0d077..1afa1e828 100644 --- a/lib/crewai/src/crewai/project/crew_loader.py +++ b/lib/crewai/src/crewai/project/crew_loader.py @@ -119,7 +119,11 @@ def _load_crew_project( for index, task_defn in enumerate(project.task_definitions): task_source = f"{source_label}: tasks[{index}]" - task_class = _task_class_from_definition(task_defn, f"{task_source}: type") + task_class = _task_class_from_definition( + task_defn, + f"{task_source}: type", + project_root=project_root, + ) task_kwargs = _task_kwargs_from_definition( task_defn, agents_map=agents_map, @@ -147,6 +151,7 @@ def _load_crew_project( tasks=tasks_list, agents_map=agents_map, source=source_label, + project_root=project_root, ) try: diff --git a/lib/crewai/src/crewai/project/json_loader.py b/lib/crewai/src/crewai/project/json_loader.py index ab13d881e..107eb8c0c 100644 --- a/lib/crewai/src/crewai/project/json_loader.py +++ b/lib/crewai/src/crewai/project/json_loader.py @@ -4,11 +4,15 @@ from __future__ import annotations from collections.abc import Callable from dataclasses import dataclass +import importlib +import inspect import json import logging -from pathlib import Path +from pathlib import Path, PureWindowsPath import re +import sys from typing import Any, cast +from urllib.parse import unquote, urlparse from pydantic import BaseModel, ValidationError @@ -93,6 +97,9 @@ _CONDITIONAL_TASK_TYPE_ALIASES = { "crewai.tasks.conditional_task.ConditionalTask", } _URI_RE = re.compile(r"^[A-Za-z][A-Za-z0-9+.-]*:") +_WINDOWS_DRIVE_PATH_RE = re.compile(r"^[A-Za-z]:") +_WINDOWS_UNC_PATH_RE = re.compile(r"^(?:\\\\|//)[^\\/]+[\\/][^\\/]+(?:[\\/]|$)") +_MAX_PYTHON_REF_DEPTH = 64 _AGENT_CALLABLE_FIELDS = {"guardrail", "step_callback"} _AGENT_CALLABLE_LIST_FIELDS = {"callbacks"} @@ -204,8 +211,12 @@ def load_agent(source: str | Path) -> Any: """Load an existing ``Agent`` from a ``.json`` / ``.jsonc`` definition file.""" path = Path(source) defn = _expect_object(load_jsonc_file(path), path) - root = path.parent.parent if path.parent.name == "agents" else Path.cwd() - agent_class = _agent_class_from_definition(defn, f"{path}: type") + root = path.parent.parent if path.parent.name == "agents" else path.parent + agent_class = _agent_class_from_definition( + defn, + f"{path}: type", + project_root=root, + ) agent_kwargs = _agent_kwargs_from_definition( defn, path, @@ -343,6 +354,7 @@ def _load_json_crew_project_definition( agent_defn, f"{agent_source}: type", resolve_python_refs=not collect_errors, + project_root=project_root, ) agent_kwargs = _agent_kwargs_from_definition( agent_defn, @@ -399,6 +411,7 @@ def _load_json_crew_project_definition( task_defn, task_path, resolve_python_refs=not collect_errors, + project_root=project_root, ) ) missing_required = [ @@ -567,11 +580,17 @@ def _python_ref_errors(value: Any, source: str | Path) -> list[str]: path = value.get(PYTHON_REF_KEY) if not isinstance(path, str) or not path.strip(): return [f"{source}: Python reference '{PYTHON_REF_KEY}' must be a string"] + path = path.strip() if "." not in path: return [ f"{source}: Python reference '{path}' must be a dotted import path " "like 'module.attribute'" ] + if not all(part.isidentifier() for part in path.split(".")): + return [ + f"{source}: Python reference '{path}' must contain only valid " + "Python identifiers separated by dots" + ] return [] @@ -588,17 +607,37 @@ def _resolve_python_ref( source: str | Path, *, expected: str, + project_root: Path | None, ) -> Any: - from crewai.utilities.import_utils import import_and_validate_definition - path = _python_ref_path(value, source) try: - resolved = import_and_validate_definition(path) + resolved = _import_project_python_reference(path, source, project_root) + except JSONProjectError: + raise except Exception as exc: - raise JSONProjectError(f"{source}: failed to import '{path}': {exc}") from exc + logger.debug( + "Failed to resolve JSON Python reference %r from %s", + path, + source, + exc_info=True, + ) + raise JSONProjectError( + f"{source}: failed to import Python reference '{path}'" + ) from exc if expected == "any": return resolved + if expected == "object": + if ( + isinstance(resolved, type) + or inspect.ismodule(resolved) + or inspect.isroutine(resolved) + ): + raise JSONProjectError( + f"{source}: Python reference '{path}' is not a supported object " + "reference" + ) + return resolved if expected == "callable" and not callable(resolved): raise JSONProjectError(f"{source}: Python reference '{path}' is not callable") if expected == "class" and not isinstance(resolved, type): @@ -606,13 +645,172 @@ def _resolve_python_ref( return resolved +def _import_project_python_reference( + path: str, + source: str | Path, + project_root: Path | None, +) -> Any: + module_path, _, attr = path.rpartition(".") + root = _project_root(project_root) + _project_module_file(module_path, root, source) + displaced_modules = _evict_external_cached_modules(module_path, root) + + logger.info( + "Resolving JSON Python reference '%s' for %s from project root %s", + path, + source, + root, + ) + + inserted_sys_path = False + root_str = str(root) + if not sys.path or sys.path[0] != root_str: + sys.path.insert(0, root_str) + inserted_sys_path = True + + try: + try: + module = importlib.import_module(module_path) + except Exception as exc: + logger.debug( + "Failed to import JSON Python reference module %r from %s", + module_path, + source, + exc_info=True, + ) + raise JSONProjectError( + f"{source}: failed to import Python reference '{path}'" + ) from exc + + if not _module_is_project_local(module, root): + raise JSONProjectError( + f"{source}: Python reference '{path}' resolved outside project root" + ) + if not hasattr(module, attr): + raise JSONProjectError( + f"{source}: Python reference '{path}' could not be resolved" + ) + return getattr(module, attr) + finally: + if inserted_sys_path: + try: + sys.path.remove(root_str) + except ValueError: + logger.debug( + "Project root %s was already removed from sys.path while " + "resolving JSON Python reference %r", + root, + path, + ) + _restore_external_cached_modules(displaced_modules, root) + + +def _project_root(project_root: Path | None) -> Path: + return (project_root or Path.cwd()).resolve() + + +def _project_module_file( + module_path: str, + project_root: Path, + source: str | Path, +) -> Path: + module_rel = Path(*module_path.split(".")) + candidates = [ + project_root / module_rel.with_suffix(".py"), + project_root / module_rel / "__init__.py", + ] + for candidate in candidates: + resolved = candidate.resolve() + if resolved.is_file() and _is_relative_to(resolved, project_root): + return resolved + raise JSONProjectError( + f"{source}: Python references in JSON configs must point to modules inside the " + f"project root; '{module_path}' was not found under {project_root}" + ) + + +def _evict_external_cached_modules( + module_path: str, + project_root: Path, +) -> dict[str, Any]: + displaced_modules: dict[str, Any] = {} + parts = module_path.split(".") + for index in range(len(parts), 0, -1): + prefix = ".".join(parts[:index]) + module = sys.modules.get(prefix) + if module is None or _module_is_project_local(module, project_root): + continue + displaced_modules[prefix] = module + logger.debug( + "Evicting cached module %r before resolving JSON Python reference " + "from project root %s", + prefix, + project_root, + ) + sys.modules.pop(prefix, None) + return displaced_modules + + +def _restore_external_cached_modules( + displaced_modules: dict[str, Any], + project_root: Path, +) -> None: + if not displaced_modules: + return + + displaced_prefixes = tuple(displaced_modules) + for name, module in list(sys.modules.items()): + if not any( + name == prefix or name.startswith(f"{prefix}.") + for prefix in displaced_prefixes + ): + continue + if _module_is_project_local(module, project_root): + logger.debug( + "Removing project-local module %r before restoring cached module", + name, + ) + sys.modules.pop(name, None) + + for name in sorted(displaced_modules, key=lambda value: value.count(".")): + logger.debug( + "Restoring cached module %r after JSON Python reference import", + name, + ) + sys.modules[name] = displaced_modules[name] + + +def _module_is_project_local(module: Any, project_root: Path) -> bool: + module_file = getattr(module, "__file__", None) + if module_file and _is_relative_to(Path(module_file).resolve(), project_root): + return True + module_paths = getattr(module, "__path__", None) + if module_paths is not None: + return any( + _is_relative_to(Path(path).resolve(), project_root) for path in module_paths + ) + return False + + +def _is_relative_to(path: Path, root: Path) -> bool: + try: + path.relative_to(root) + except ValueError: + return False + return True + + def _resolve_python_class( value: Any, source: str | Path, *, base_class: type[Any] | None = None, + project_root: Path | None, ) -> type[Any]: - cls = cast(type[Any], _resolve_python_ref(value, source, expected="class")) + cls = cast( + type[Any], + _resolve_python_ref(value, source, expected="class", project_root=project_root), + ) if base_class is not None and not issubclass(cls, base_class): raise JSONProjectError( f"{source}: Python reference '{_python_ref_path(value, source)}' " @@ -626,6 +824,7 @@ def _agent_class_from_definition( source: str | Path, *, resolve_python_refs: bool = True, + project_root: Path | None = None, ) -> type[Any]: from crewai import Agent @@ -643,7 +842,12 @@ def _agent_class_from_definition( return agent_class from crewai.agents.agent_builder.base_agent import BaseAgent - return _resolve_python_class(type_value, source, base_class=BaseAgent) + return _resolve_python_class( + type_value, + source, + base_class=BaseAgent, + project_root=project_root, + ) if isinstance(type_value, str): raise JSONProjectError( f"{source}: unsupported agent type '{type_value}'. Use 'Agent' or " @@ -657,6 +861,7 @@ def _task_class_from_definition( source: str | Path, *, resolve_python_refs: bool = True, + project_root: Path | None = None, ) -> type[Any]: from crewai import Task @@ -676,7 +881,12 @@ def _task_class_from_definition( if errors: raise JSONProjectValidationError(errors) return task_class - return _resolve_python_class(type_value, source, base_class=task_class) + return _resolve_python_class( + type_value, + source, + base_class=task_class, + project_root=project_root, + ) if isinstance(type_value, str): raise JSONProjectError( f"{source}: unsupported task type '{type_value}'. Use 'Task', " @@ -713,6 +923,7 @@ def _agent_kwargs_from_definition( defn, f"{path}: type", resolve_python_refs=resolve_python_refs, + project_root=project_root, ) allowed_fields = _agent_allowed_fields(agent_class) extra_allowed = {"settings", "type"} @@ -757,8 +968,8 @@ def _agent_kwargs_from_definition( agent_kwargs = {key: value for key, value in defn.items() if key in allowed_fields} agent_kwargs.update(settings) if resolve_tools: - _resolve_tool_fields(agent_kwargs, project_root=project_root) - _resolve_agent_python_refs(agent_kwargs, path) + _resolve_tool_fields(agent_kwargs, path, project_root=project_root) + _resolve_agent_python_refs(agent_kwargs, path, project_root) else: # Validation/deploy mode: check tool declarations structurally without # importing or instantiating anything — custom: tools execute @@ -778,7 +989,11 @@ def _task_kwargs_from_definition( source: str, project_root: Path | None = None, ) -> dict[str, Any]: - task_class = _task_class_from_definition(task_defn, f"{source}: type") + task_class = _task_class_from_definition( + task_defn, + f"{source}: type", + project_root=project_root, + ) allowed_fields = _task_allowed_fields(task_class) errors = _field_errors( task_defn, @@ -815,8 +1030,8 @@ def _task_kwargs_from_definition( context_tasks.append(task_name_map[ctx_name]) task_kwargs["context"] = context_tasks - _resolve_tool_fields(task_kwargs, project_root=project_root) - _resolve_task_python_refs(task_kwargs, source) + _resolve_tool_fields(task_kwargs, source, project_root=project_root) + _resolve_task_python_refs(task_kwargs, source, project_root) if "input_files" in task_kwargs: task_kwargs["input_files"] = _normalize_input_files( task_kwargs["input_files"], @@ -832,6 +1047,7 @@ def _crew_kwargs_from_definition( tasks: list[Any], agents_map: dict[str, Any], source: Path | str, + project_root: Path | None = None, ) -> dict[str, Any]: errors = _field_errors( defn, @@ -857,19 +1073,25 @@ def _crew_kwargs_from_definition( ) crew_kwargs["manager_agent"] = agents_map[manager_agent] - _resolve_crew_python_refs(crew_kwargs, source) + _resolve_crew_python_refs(crew_kwargs, source, project_root) return crew_kwargs def _resolve_tool_fields( - kwargs: dict[str, Any], project_root: Path | None = None + kwargs: dict[str, Any], + source: str | Path, + project_root: Path | None = None, ) -> None: tools = kwargs.get("tools") if tools is not None: kwargs["tools"] = _resolve_tools(tools, project_root=project_root) if "mcps" in kwargs: - kwargs["mcps"] = _resolve_mcp_python_refs(kwargs["mcps"]) + kwargs["mcps"] = _resolve_mcp_python_refs( + kwargs["mcps"], + f"{source}: mcps", + project_root, + ) def _field_errors( @@ -924,6 +1146,7 @@ def _task_definition_errors( source: str | Path, *, resolve_python_refs: bool, + project_root: Path | None, ) -> list[str]: skip_unknown = _definition_has_python_type(task_defn) and not resolve_python_refs try: @@ -931,6 +1154,7 @@ def _task_definition_errors( task_defn, f"{source}: type", resolve_python_refs=resolve_python_refs, + project_root=project_root, ) except JSONProjectValidationError as exc: return exc.errors @@ -946,6 +1170,13 @@ def _task_definition_errors( skip_unknown=skip_unknown, ) errors.extend(_python_reference_definition_errors(task_defn, source)) + errors.extend( + _input_files_definition_errors( + task_defn.get("input_files"), + f"{source}: input_files", + project_root, + ) + ) return errors @@ -1001,20 +1232,33 @@ def _python_reference_value_errors(value: Any, source: str | Path) -> list[str]: def _python_reference_value_errors_recursive( - value: Any, source: str | Path + value: Any, source: str | Path, depth: int = 0 ) -> list[str]: + if depth > _MAX_PYTHON_REF_DEPTH: + return [ + f"{source}: Python reference nesting exceeds maximum depth " + f"{_MAX_PYTHON_REF_DEPTH}" + ] if _is_python_ref(value): return _python_ref_errors(value, source) errors: list[str] = [] if isinstance(value, list): for index, item in enumerate(value): errors.extend( - _python_reference_value_errors_recursive(item, f"{source}[{index}]") + _python_reference_value_errors_recursive( + item, + f"{source}[{index}]", + depth + 1, + ) ) elif isinstance(value, dict): for key, item in value.items(): errors.extend( - _python_reference_value_errors_recursive(item, f"{source}.{key}") + _python_reference_value_errors_recursive( + item, + f"{source}.{key}", + depth + 1, + ) ) return errors @@ -1069,34 +1313,54 @@ def _mcp_python_ref_errors(value: Any, source: str | Path) -> list[str]: return errors -def _resolve_agent_python_refs(kwargs: dict[str, Any], source: str | Path) -> None: +def _resolve_agent_python_refs( + kwargs: dict[str, Any], + source: str | Path, + project_root: Path | None, +) -> None: _resolve_callable_fields( kwargs, source, scalar_fields=_AGENT_CALLABLE_FIELDS, list_fields=_AGENT_CALLABLE_LIST_FIELDS, + project_root=project_root, ) if _is_python_ref(kwargs.get("executor_class")): kwargs["executor_class"] = _resolve_python_class( - kwargs["executor_class"], f"{source}: executor_class" + kwargs["executor_class"], + f"{source}: executor_class", + project_root=project_root, ) if "embedder" in kwargs: - kwargs["embedder"] = _resolve_embedder_python_refs(kwargs["embedder"], source) + kwargs["embedder"] = _resolve_embedder_python_refs( + kwargs["embedder"], source, project_root + ) if "a2a" in kwargs: - kwargs["a2a"] = _resolve_a2a_python_refs(kwargs["a2a"], source) - _resolve_object_reference_fields(kwargs, source, _AGENT_OBJECT_REF_FIELDS) + kwargs["a2a"] = _resolve_a2a_python_refs(kwargs["a2a"], source, project_root) + _resolve_object_reference_fields( + kwargs, source, _AGENT_OBJECT_REF_FIELDS, project_root + ) -def _resolve_task_python_refs(kwargs: dict[str, Any], source: str | Path) -> None: +def _resolve_task_python_refs( + kwargs: dict[str, Any], + source: str | Path, + project_root: Path | None, +) -> None: _resolve_callable_fields( kwargs, source, scalar_fields=_TASK_CALLABLE_FIELDS, list_fields=_TASK_CALLABLE_LIST_FIELDS, + project_root=project_root, ) for field in _TASK_MODEL_CLASS_FIELDS: if _is_python_ref(kwargs.get(field)): - kwargs[field] = _resolve_model_class(kwargs[field], f"{source}: {field}") + kwargs[field] = _resolve_model_class( + kwargs[field], + f"{source}: {field}", + project_root, + ) if _is_python_ref(kwargs.get("converter_cls")): from crewai.utilities.converter import Converter @@ -1104,51 +1368,93 @@ def _resolve_task_python_refs(kwargs: dict[str, Any], source: str | Path) -> Non kwargs["converter_cls"], f"{source}: converter_cls", base_class=Converter, + project_root=project_root, ) elif isinstance(kwargs.get("converter_cls"), str): raise JSONProjectError( f"{source}: converter_cls must use " f'{{"{PYTHON_REF_KEY}": "module.ConverterSubclass"}}' ) - _resolve_object_reference_fields(kwargs, source, _TASK_OBJECT_REF_FIELDS) + _resolve_object_reference_fields( + kwargs, source, _TASK_OBJECT_REF_FIELDS, project_root + ) -def _resolve_crew_python_refs(kwargs: dict[str, Any], source: str | Path) -> None: +def _resolve_crew_python_refs( + kwargs: dict[str, Any], + source: str | Path, + project_root: Path | None, +) -> None: _resolve_callable_fields( kwargs, source, scalar_fields=_CREW_CALLABLE_FIELDS, list_fields=_CREW_CALLABLE_LIST_FIELDS, + project_root=project_root, ) if "embedder" in kwargs: - kwargs["embedder"] = _resolve_embedder_python_refs(kwargs["embedder"], source) - _resolve_object_reference_fields(kwargs, source, _CREW_OBJECT_REF_FIELDS) + kwargs["embedder"] = _resolve_embedder_python_refs( + kwargs["embedder"], source, project_root + ) + _resolve_object_reference_fields( + kwargs, source, _CREW_OBJECT_REF_FIELDS, project_root + ) def _resolve_object_reference_fields( kwargs: dict[str, Any], source: str | Path, fields: set[str], + project_root: Path | None, ) -> None: for field in fields: if field not in kwargs: continue kwargs[field] = _resolve_python_refs_recursively( - kwargs[field], f"{source}: {field}" + kwargs[field], + f"{source}: {field}", + project_root, ) -def _resolve_python_refs_recursively(value: Any, source: str | Path) -> Any: +def _resolve_python_refs_recursively( + value: Any, + source: str | Path, + project_root: Path | None, + depth: int = 0, +) -> Any: + if depth > _MAX_PYTHON_REF_DEPTH: + raise JSONProjectValidationError( + [ + f"{source}: Python reference nesting exceeds maximum depth " + f"{_MAX_PYTHON_REF_DEPTH}" + ] + ) if _is_python_ref(value): - return _resolve_python_ref(value, source, expected="any") + return _resolve_python_ref( + value, + source, + expected="object", + project_root=project_root, + ) if isinstance(value, list): return [ - _resolve_python_refs_recursively(item, f"{source}[{index}]") + _resolve_python_refs_recursively( + item, + f"{source}[{index}]", + project_root, + depth + 1, + ) for index, item in enumerate(value) ] if isinstance(value, dict): return { - key: _resolve_python_refs_recursively(item, f"{source}.{key}") + key: _resolve_python_refs_recursively( + item, + f"{source}.{key}", + project_root, + depth + 1, + ) for key, item in value.items() } return value @@ -1160,6 +1466,7 @@ def _resolve_callable_fields( *, scalar_fields: set[str], list_fields: set[str], + project_root: Path | None, ) -> None: for field in scalar_fields: if _is_python_ref(kwargs.get(field)): @@ -1167,6 +1474,7 @@ def _resolve_callable_fields( kwargs[field], f"{source}: {field}", expected="callable", + project_root=project_root, ) for field in list_fields: value = kwargs.get(field) @@ -1174,7 +1482,10 @@ def _resolve_callable_fields( continue kwargs[field] = [ _resolve_python_ref( - item, f"{source}: {field}[{index}]", expected="callable" + item, + f"{source}: {field}[{index}]", + expected="callable", + project_root=project_root, ) if _is_python_ref(item) else item @@ -1182,11 +1493,24 @@ def _resolve_callable_fields( ] -def _resolve_model_class(value: Any, source: str | Path) -> type[BaseModel]: - return _resolve_python_class(value, source, base_class=BaseModel) +def _resolve_model_class( + value: Any, + source: str | Path, + project_root: Path | None, +) -> type[BaseModel]: + return _resolve_python_class( + value, + source, + base_class=BaseModel, + project_root=project_root, + ) -def _resolve_embedder_python_refs(value: Any, source: str | Path) -> Any: +def _resolve_embedder_python_refs( + value: Any, + source: str | Path, + project_root: Path | None, +) -> Any: if not isinstance(value, dict): return value config = value.get("config") @@ -1206,15 +1530,24 @@ def _resolve_embedder_python_refs(value: Any, source: str | Path) -> Any: embedding_callable, f"{source}: embedder.config.embedding_callable", base_class=CustomEmbeddingFunction, + project_root=project_root, ) normalized["config"] = normalized_config return normalized -def _resolve_a2a_python_refs(value: Any, source: str | Path) -> Any: +def _resolve_a2a_python_refs( + value: Any, + source: str | Path, + project_root: Path | None, +) -> Any: if isinstance(value, list): return [ - _resolve_a2a_python_refs(item, f"{source}: a2a[{index}]") + _resolve_a2a_python_refs( + item, + f"{source}: a2a[{index}]", + project_root, + ) for index, item in enumerate(value) ] if not isinstance(value, dict): @@ -1228,6 +1561,7 @@ def _resolve_a2a_python_refs(value: Any, source: str | Path) -> Any: normalized["response_model"] = _resolve_model_class( response_model, f"{source}: a2a.response_model", + project_root, ) elif isinstance(response_model, dict): from crewai.utilities.pydantic_schema_utils import create_model_from_schema @@ -1236,11 +1570,19 @@ def _resolve_a2a_python_refs(value: Any, source: str | Path) -> Any: return normalized -def _resolve_mcp_python_refs(value: Any) -> Any: +def _resolve_mcp_python_refs( + value: Any, + source: str | Path, + project_root: Path | None, +) -> Any: if not isinstance(value, list): return value return [ - _resolve_mcp_config_python_refs(config, index) + _resolve_mcp_config_python_refs( + config, + f"{source}[{index}]", + project_root, + ) if isinstance(config, dict) else config for index, config in enumerate(value) @@ -1248,7 +1590,9 @@ def _resolve_mcp_python_refs(value: Any) -> Any: def _resolve_mcp_config_python_refs( - config: dict[str, Any], index: int + config: dict[str, Any], + source: str | Path, + project_root: Path | None, ) -> dict[str, Any]: tool_filter = config.get("tool_filter") if tool_filter is None: @@ -1257,8 +1601,9 @@ def _resolve_mcp_config_python_refs( if _is_python_ref(tool_filter): normalized["tool_filter"] = _resolve_python_ref( tool_filter, - f"mcps[{index}].tool_filter", + f"{source}.tool_filter", expected="callable", + project_root=project_root, ) elif isinstance(tool_filter, dict) and tool_filter.get("type") == "static": from crewai.mcp.filters import create_static_tool_filter @@ -1267,15 +1612,11 @@ def _resolve_mcp_config_python_refs( blocked_tool_names = tool_filter.get("blocked_tool_names") if allowed_tool_names is not None and not _is_string_list(allowed_tool_names): raise JSONProjectValidationError( - [ - f"mcps[{index}].tool_filter.allowed_tool_names must be a list of strings" - ] + [f"{source}.tool_filter.allowed_tool_names must be a list of strings"] ) if blocked_tool_names is not None and not _is_string_list(blocked_tool_names): raise JSONProjectValidationError( - [ - f"mcps[{index}].tool_filter.blocked_tool_names must be a list of strings" - ] + [f"{source}.tool_filter.blocked_tool_names must be a list of strings"] ) normalized["tool_filter"] = create_static_tool_filter( allowed_tool_names=allowed_tool_names, @@ -1304,7 +1645,11 @@ def _normalize_input_files( for name, file_spec in value.items(): if isinstance(file_spec, str): normalized[name] = { - "source": _resolve_project_path(file_spec, project_root) + "source": _resolve_project_path( + file_spec, + project_root, + f"{source}: input_files.{name}", + ) } continue if isinstance(file_spec, dict): @@ -1313,7 +1658,9 @@ def _normalize_input_files( field_value = normalized_spec.get(field) if isinstance(field_value, str): normalized_spec[field] = _resolve_project_path( - field_value, project_root + field_value, + project_root, + f"{source}: input_files.{name}.{field}", ) normalized[name] = normalized_spec continue @@ -1321,13 +1668,89 @@ def _normalize_input_files( return normalized -def _resolve_project_path(value: str, project_root: Path | None) -> str: - if not value or _URI_RE.match(value): +def _input_files_definition_errors( + value: Any, + source: str | Path, + project_root: Path | None, +) -> list[str]: + if value is None: + return [] + if not isinstance(value, dict): + return [f"{source} must be an object mapping names to file specs"] + + errors: list[str] = [] + for name, file_spec in value.items(): + if isinstance(file_spec, str): + try: + _resolve_project_path(file_spec, project_root, f"{source}.{name}") + except JSONProjectValidationError as exc: + errors.extend(exc.errors) + continue + if isinstance(file_spec, dict): + for field in ("source", "path"): + field_value = file_spec.get(field) + if not isinstance(field_value, str): + continue + try: + _resolve_project_path( + field_value, + project_root, + f"{source}.{name}.{field}", + ) + except JSONProjectValidationError as exc: + errors.extend(exc.errors) + return errors + + +def _resolve_project_path( + value: str, + project_root: Path | None, + source: str | Path, +) -> str: + if not value: return value - path = Path(value) - if path.is_absolute(): + root = _project_root(project_root) + parsed = urlparse(value) + path_value = value + if ( + parsed.scheme + and parsed.scheme.lower() != "file" + and not _WINDOWS_DRIVE_PATH_RE.match(value) + ): return value - return str(((project_root or Path.cwd()) / path).resolve()) + if parsed.scheme.lower() == "file": + if parsed.netloc not in {"", "localhost"}: + raise JSONProjectValidationError( + [f"{source}: file URI '{value}' must point to a local project path"] + ) + path_value = unquote(parsed.path) + if re.match(r"^/[A-Za-z]:", path_value): + path_value = path_value[1:] + path = Path(path_value) + elif _URI_RE.match(value): + path = Path(path_value) + else: + path = Path(path_value) + if ( + _looks_like_windows_absolute_path(path_value) + or _WINDOWS_DRIVE_PATH_RE.match(path_value) + ) and not path.is_absolute(): + raise JSONProjectValidationError( + [f"{source}: path '{value}' resolves outside the project root {root}"] + ) + resolved = path.resolve() if path.is_absolute() else (root / path).resolve() + if not _is_relative_to(resolved, root): + raise JSONProjectValidationError( + [f"{source}: path '{value}' resolves outside the project root {root}"] + ) + return str(resolved) + + +def _looks_like_windows_absolute_path(value: str) -> bool: + if _WINDOWS_UNC_PATH_RE.match(value): + return True + windows_path = PureWindowsPath(value) + return windows_path.is_absolute() def _format_validation_error(path: str | Path, exc: ValidationError) -> str: diff --git a/lib/crewai/tests/project/test_crew_loader.py b/lib/crewai/tests/project/test_crew_loader.py index 367bdbd30..f8cb806ad 100644 --- a/lib/crewai/tests/project/test_crew_loader.py +++ b/lib/crewai/tests/project/test_crew_loader.py @@ -4,6 +4,8 @@ from __future__ import annotations import json from pathlib import Path +import sys +import types import pytest @@ -560,6 +562,98 @@ class TestLoadCrew: assert "summary" in task.output_json.model_fields assert task.converter_cls.__name__ == "SpecialConverter" + def test_crew_rejects_stdlib_python_ref_for_agent_callback( + self, tmp_path: Path + ): + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent( + agents_dir, + "worker", + step_callback={"python": "os.system"}, + ) + + crew_def = { + "name": "unsafe_callback_crew", + "agents": ["worker"], + "tasks": [ + { + "name": "work", + "description": "Do work", + "expected_output": "Work done", + "agent": "worker", + } + ], + } + crew_file = _write_crew(tmp_path, crew_def) + + with pytest.raises(JSONProjectError, match="project root"): + load_crew(crew_file) + + def test_crew_rejects_stdlib_python_ref_for_mcp_tool_filter( + self, tmp_path: Path + ): + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent( + agents_dir, + "worker", + mcps=[ + { + "command": "python", + "args": ["server.py"], + "tool_filter": {"python": "os.system"}, + } + ], + ) + + crew_def = { + "name": "unsafe_mcp_filter_crew", + "agents": ["worker"], + "tasks": [ + { + "name": "work", + "description": "Do work", + "expected_output": "Work done", + "agent": "worker", + } + ], + } + crew_file = _write_crew(tmp_path, crew_def) + + with pytest.raises(JSONProjectError, match="project root"): + load_crew(crew_file) + + def test_crew_rejects_callable_python_ref_for_object_field( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + _write_python_defs(tmp_path) + monkeypatch.syspath_prepend(str(tmp_path)) + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent( + agents_dir, + "worker", + security_config={"python": "json_refs.always_true"}, + ) + + crew_def = { + "name": "unsafe_object_ref_crew", + "agents": ["worker"], + "tasks": [ + { + "name": "work", + "description": "Do work", + "expected_output": "Work done", + "agent": "worker", + } + ], + } + crew_file = _write_crew(tmp_path, crew_def) + + with pytest.raises(JSONProjectError, match="supported object reference"): + load_crew(crew_file) + def test_crew_loads_project_relative_input_files(self, tmp_path: Path): agents_dir = tmp_path / "agents" agents_dir.mkdir() @@ -595,6 +689,147 @@ class TestLoadCrew: assert _input_file_path(input_files["brief"]) == brief_path assert _input_file_path(input_files["spec"]) == spec_path + def test_crew_rejects_relative_input_file_outside_project(self, tmp_path: Path): + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent(agents_dir, "reader") + + crew_def = { + "name": "unsafe_input_files_crew", + "agents": ["reader"], + "tasks": [ + { + "name": "read", + "description": "Read files", + "expected_output": "File summary", + "agent": "reader", + "input_files": {"secret": "../secret.txt"}, + } + ], + } + crew_file = _write_crew(tmp_path, crew_def) + + with pytest.raises(JSONProjectValidationError, match="outside the project root"): + load_crew(crew_file) + + def test_crew_rejects_absolute_input_file_outside_project(self, tmp_path: Path): + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent(agents_dir, "reader") + outside_path = tmp_path.parent / "secret.txt" + + crew_def = { + "name": "unsafe_absolute_input_files_crew", + "agents": ["reader"], + "tasks": [ + { + "name": "read", + "description": "Read files", + "expected_output": "File summary", + "agent": "reader", + "input_files": {"secret": str(outside_path)}, + } + ], + } + crew_file = _write_crew(tmp_path, crew_def) + + with pytest.raises(JSONProjectValidationError, match="outside the project root"): + load_crew(crew_file) + + def test_crew_rejects_file_uri_input_file_outside_project(self, tmp_path: Path): + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent(agents_dir, "reader") + outside_uri = (tmp_path.parent / "secret.txt").as_uri() + + crew_def = { + "name": "unsafe_file_uri_input_files_crew", + "agents": ["reader"], + "tasks": [ + { + "name": "read", + "description": "Read files", + "expected_output": "File summary", + "agent": "reader", + "input_files": {"secret": outside_uri}, + } + ], + } + crew_file = _write_crew(tmp_path, crew_def) + + with pytest.raises(JSONProjectValidationError, match="outside the project root"): + load_crew(crew_file) + + @pytest.mark.parametrize( + "outside_path", + [ + r"C:\Users\alice\.ssh\id_rsa", + "C:/Users/alice/.ssh/id_rsa", + r"\\server\share\secret.txt", + "//server/share/secret.txt", + ], + ) + def test_crew_rejects_windows_input_file_outside_project( + self, tmp_path: Path, outside_path: str + ): + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent(agents_dir, "reader") + + crew_def = { + "name": "unsafe_windows_input_files_crew", + "agents": ["reader"], + "tasks": [ + { + "name": "read", + "description": "Read files", + "expected_output": "File summary", + "agent": "reader", + "input_files": {"secret": outside_path}, + } + ], + } + crew_file = _write_crew(tmp_path, crew_def) + + with pytest.raises(JSONProjectValidationError, match="outside the project root"): + load_crew(crew_file) + + def test_crew_restores_external_module_cache_after_project_ref( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + _write_python_defs(tmp_path) + external_module = types.ModuleType("json_refs") + external_module.__file__ = str(tmp_path.parent / "json_refs.py") + external_module.marker = "external" + monkeypatch.setitem(sys.modules, "json_refs", external_module) + + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent( + agents_dir, + "worker", + step_callback={"python": "json_refs.task_callback"}, + ) + + crew_def = { + "name": "cache_restore_crew", + "agents": ["worker"], + "tasks": [ + { + "name": "work", + "description": "Do work", + "expected_output": "Work done", + "agent": "worker", + } + ], + } + crew_file = _write_crew(tmp_path, crew_def) + + crew, _ = load_crew(crew_file) + + assert crew.agents[0].step_callback.__name__ == "task_callback" + assert sys.modules["json_refs"] is external_module + def test_missing_agent_file_raises(self, tmp_path: Path): agents_dir = tmp_path / "agents" agents_dir.mkdir() diff --git a/lib/crewai/tests/project/test_json_loader.py b/lib/crewai/tests/project/test_json_loader.py index 0da719c5a..3acfabf5d 100644 --- a/lib/crewai/tests/project/test_json_loader.py +++ b/lib/crewai/tests/project/test_json_loader.py @@ -11,6 +11,7 @@ import pytest from crewai.llms.base_llm import BaseLLM from crewai.project.json_loader import ( JSONProjectValidationError, + _looks_like_windows_absolute_path, find_json_project_file, load_agent, strip_jsonc_comments, @@ -74,6 +75,31 @@ def test_find_json_project_file_prefers_jsonc(tmp_path: Path): assert find_json_project_file(tmp_path, "agent") == jsonc_path +@pytest.mark.parametrize( + "path_value", + [ + r"C:\Users\alice\.ssh\id_rsa", + "C:/Users/alice/.ssh/id_rsa", + r"\\server\share\secret.txt", + "//server/share/secret.txt", + ], +) +def test_windows_absolute_path_detection(path_value: str): + assert _looks_like_windows_absolute_path(path_value) + + +@pytest.mark.parametrize( + "path_value", + [ + r"folder\file.txt", + "folder/file.txt", + r"\server\share\secret.txt", + ], +) +def test_windows_absolute_path_detection_ignores_relative_paths(path_value: str): + assert not _looks_like_windows_absolute_path(path_value) + + class TestLoadAgent: def test_load_minimal_agent(self, tmp_path: Path): agent_def = { @@ -480,6 +506,28 @@ class TestValidationDoesNotExecuteTools: assert "Invalid custom tool name" in str(exc_info.value) + def test_validate_rejects_deep_python_ref_nesting(self, tmp_path): + from crewai.project.json_loader import validate_crew_project + + crew_path = self._write_project( + tmp_path, + tool_line='{"tool_type": "some.module.Tool"}', + ) + agent_file = tmp_path / "agents" / "worker.jsonc" + agent_def = json.loads(agent_file.read_text()) + nested: dict[str, object] = {} + current = nested + for _ in range(70): + child: dict[str, object] = {} + current["nested"] = child + current = child + current["ref"] = {"python": "callbacks.step_callback"} + agent_def["security_config"] = nested + agent_file.write_text(json.dumps(agent_def)) + + with pytest.raises(JSONProjectValidationError, match="maximum depth"): + validate_crew_project(crew_path, tmp_path / "agents") + class TestCustomToolPathSafety: @pytest.mark.parametrize( diff --git a/lib/crewai/tests/test_crew.py b/lib/crewai/tests/test_crew.py index 8ce25774e..5b06685d8 100644 --- a/lib/crewai/tests/test_crew.py +++ b/lib/crewai/tests/test_crew.py @@ -3830,7 +3830,6 @@ def test_crew_testing_function(researcher): assert isinstance(received_events[1], CrewTestCompletedEvent) -@pytest.mark.vcr() def test_hierarchical_verbose_manager_agent(researcher, writer): task = Task( description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.", @@ -3845,13 +3844,18 @@ def test_hierarchical_verbose_manager_agent(researcher, writer): verbose=True, ) - crew.kickoff() + mock_task_output = TaskOutput( + description="Mock description", raw="mocked output", agent="mocked agent", messages=[] + ) + task.output = mock_task_output + + with patch.object(Task, "execute_sync", return_value=mock_task_output): + crew.kickoff() assert crew.manager_agent is not None assert crew.manager_agent.verbose -@pytest.mark.vcr() def test_hierarchical_verbose_false_manager_agent(researcher, writer): task = Task( description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.", @@ -3866,7 +3870,13 @@ def test_hierarchical_verbose_false_manager_agent(researcher, writer): verbose=False, ) - crew.kickoff() + mock_task_output = TaskOutput( + description="Mock description", raw="mocked output", agent="mocked agent", messages=[] + ) + task.output = mock_task_output + + with patch.object(Task, "execute_sync", return_value=mock_task_output): + crew.kickoff() assert crew.manager_agent is not None assert not crew.manager_agent.verbose