diff --git a/lib/cli/src/crewai_cli/create_json_crew.py b/lib/cli/src/crewai_cli/create_json_crew.py index 0f980cc77..353127491 100644 --- a/lib/cli/src/crewai_cli/create_json_crew.py +++ b/lib/cli/src/crewai_cli/create_json_crew.py @@ -705,6 +705,9 @@ def _agent_to_jsonc(agent: dict[str, Any]) -> str: // Example: "role": "Senior {{industry}} Researcher" "role": {json.dumps(agent["role"])}, + // Optional custom Agent subclass + // "type": {{"python": "my_project.agents.CustomAgent"}}, + // The agent's primary objective "goal": {json.dumps(agent["goal"])}, @@ -728,7 +731,9 @@ def _agent_to_jsonc(agent: dict[str, Any]) -> str: // Optional agent-level guardrail — validates this agent's final output. // String guardrails are checked by an LLM and can reject/retry output. + // Python refs must point to module-level functions/classes in trusted code. // "guardrail": "Only answer with information supported by retrieved evidence.", + // "step_callback": {{"python": "my_project.callbacks.on_agent_step"}}, // "guardrail_max_retries": 2, // Advanced agent options: @@ -786,11 +791,20 @@ def _task_to_json_fragment(task: dict[str, Any]) -> str: lines.append("") lines.append(" // Advanced task options:") lines.append(" // Docs: https://docs.crewai.com/concepts/tasks") - lines.append(' // "output_json": null,') + lines.append(' // "type": "ConditionalTask",') + lines.append( + ' // "condition": { "python": "my_project.conditions.should_run" },' + ) + lines.append( + ' // "output_json": { "python": "my_project.models.ReportOutput" },' + ) lines.append(' // "output_pydantic": null,') lines.append(' // "response_model": null,') + lines.append( + ' // "converter_cls": { "python": "my_project.converters.CustomConverter" },' + ) lines.append(' // "markdown": false,') - lines.append(' // "input_files": [],') + lines.append(' // "input_files": { "brief": "data/brief.txt" },') lines.append(' // "security_config": {},') lines.append("") lines.append(" // Which agent handles this task") @@ -874,7 +888,11 @@ def _crew_to_jsonc( // Advanced crew options: // Docs: https://docs.crewai.com/concepts/crews + // For hierarchical crews, manager_agent can reference an agents/.jsonc file + // that is not included in the "agents" list. // "manager_agent": "{agents[0]["name"]}", + // "before_kickoff_callbacks": [{{"python": "my_project.callbacks.before_kickoff"}}], + // "after_kickoff_callbacks": [{{"python": "my_project.callbacks.after_kickoff"}}], // "function_calling_llm": "openai/gpt-4o-mini", // "max_rpm": null, // "cache": true, diff --git a/lib/cli/tests/test_create_crew.py b/lib/cli/tests/test_create_crew.py index 060a3c24f..c6ed1a039 100644 --- a/lib/cli/tests/test_create_crew.py +++ b/lib/cli/tests/test_create_crew.py @@ -721,9 +721,30 @@ def test_json_create_provider_preselects_default_model(tmp_path, monkeypatch): assert '"guardrail_max_retries": 2' in crew_template assert "Docs: https://docs.crewai.com/concepts/tasks" in crew_template assert '"output_pydantic": null' in crew_template + assert '"type": "ConditionalTask"' in crew_template + assert '"condition": { "python": "my_project.conditions.should_run" }' in ( + crew_template + ) + assert '"output_json": { "python": "my_project.models.ReportOutput" }' in ( + crew_template + ) + assert ( + '"converter_cls": { "python": "my_project.converters.CustomConverter" }' + in crew_template + ) assert '"markdown": false' in crew_template + assert '"input_files": { "brief": "data/brief.txt" }' in crew_template assert "Docs: https://docs.crewai.com/concepts/crews" in crew_template + assert "manager_agent can reference an agents/.jsonc file" in crew_template assert '"manager_agent": "researcher"' in crew_template + assert ( + '"before_kickoff_callbacks": [{"python": ' + '"my_project.callbacks.before_kickoff"}]' + ) in crew_template + assert ( + '"after_kickoff_callbacks": [{"python": ' + '"my_project.callbacks.after_kickoff"}]' + ) in crew_template assert '"output_log_file": "crew.log"' in crew_template assert "Crew-level LLM fields also accept object form" in crew_template assert '"chat_llm": {"model": "llama3", "provider": "ollama"' in ( @@ -740,7 +761,13 @@ def test_json_create_provider_preselects_default_model(tmp_path, monkeypatch): agent_template ) assert '"role": "Senior {industry} Researcher"' in agent_template + assert '"type": {"python": "my_project.agents.CustomAgent"}' in agent_template assert "Optional agent-level guardrail" in agent_template + assert "Python refs must point to module-level functions/classes" in agent_template + assert ( + '"step_callback": {"python": "my_project.callbacks.on_agent_step"}' + in agent_template + ) assert '"guardrail_max_retries": 2' in agent_template assert "Docs: https://docs.crewai.com/concepts/agents" in agent_template assert '"reasoning": true' in agent_template diff --git a/lib/crewai/src/crewai/project/crew_loader.py b/lib/crewai/src/crewai/project/crew_loader.py index fce183f77..e106e60d3 100644 --- a/lib/crewai/src/crewai/project/crew_loader.py +++ b/lib/crewai/src/crewai/project/crew_loader.py @@ -11,6 +11,7 @@ from crewai.project.json_loader import ( JSONProjectError, JSONProjectValidationError, _crew_kwargs_from_definition, + _task_class_from_definition, _task_kwargs_from_definition, load_json_crew_project, ) @@ -26,16 +27,14 @@ def load_crew( default inputs. Agent definitions are resolved from individual ``.jsonc`` / ``.json`` files inside an ``agents/`` directory. """ - from crewai import Agent, Crew, Task + from crewai import Crew, Task crew_path = Path(source) project = load_json_crew_project(crew_path, agents_dir=agents_dir) - agents_map: dict[str, Any] = {} - for name in project.agent_names: - agent_def = project.agents[name] + def build_agent(agent_def: Any) -> Any: try: - agents_map[name] = Agent(**agent_def.kwargs) + return agent_def.agent_class(**agent_def.kwargs) except ValidationError as exc: raise JSONProjectError( f"{agent_def.path}: validation failed: {exc}" @@ -45,11 +44,16 @@ def load_crew( f"{agent_def.path}: failed to load agent: {exc}" ) from exc + agents_map: dict[str, Any] = {} + for name, agent_def in project.agents.items(): + agents_map[name] = build_agent(agent_def) + tasks_list: list[Task] = [] task_name_map: dict[str, Task] = {} for index, task_defn in enumerate(project.task_definitions): source_label = f"{crew_path}: tasks[{index}]" + task_class = _task_class_from_definition(task_defn, f"{source_label}: type") task_kwargs = _task_kwargs_from_definition( task_defn, agents_map=agents_map, @@ -58,9 +62,13 @@ def load_crew( project_root=crew_path.parent, ) try: - task = Task(**task_kwargs) + task = task_class(**task_kwargs) except ValidationError as exc: raise JSONProjectError(f"{source_label}: validation failed: {exc}") from exc + except Exception as exc: + raise JSONProjectError( + f"{source_label}: failed to load task: {exc}" + ) from exc tasks_list.append(task) task_name = task_defn.get("name") @@ -69,7 +77,7 @@ def load_crew( crew_kwargs = _crew_kwargs_from_definition( project.definition, - agents=list(agents_map.values()), + agents=[agents_map[name] for name in project.agent_names], tasks=tasks_list, agents_map=agents_map, source=crew_path, diff --git a/lib/crewai/src/crewai/project/json_loader.py b/lib/crewai/src/crewai/project/json_loader.py index 88d00ee26..b0b3bca16 100644 --- a/lib/crewai/src/crewai/project/json_loader.py +++ b/lib/crewai/src/crewai/project/json_loader.py @@ -7,9 +7,9 @@ import json import logging from pathlib import Path import re -from typing import Any +from typing import Any, cast -from pydantic import ValidationError +from pydantic import BaseModel, ValidationError logger = logging.getLogger(__name__) @@ -71,6 +71,66 @@ _CREW_RUNTIME_FIELDS = { JSON_PROJECT_EXTENSIONS = (".jsonc", ".json") +PYTHON_REF_KEY = "python" + +_AGENT_TYPE_ALIASES = { + "agent", + "Agent", + "crewai.Agent", + "crewai.agent.core.Agent", +} +_TASK_TYPE_ALIASES = { + "task", + "Task", + "crewai.Task", + "crewai.task.Task", +} +_CONDITIONAL_TASK_TYPE_ALIASES = { + "conditional", + "conditional_task", + "ConditionalTask", + "crewai.tasks.conditional_task.ConditionalTask", +} +_URI_RE = re.compile(r"^[A-Za-z][A-Za-z0-9+.-]*:") + +_AGENT_CALLABLE_FIELDS = {"guardrail", "step_callback"} +_AGENT_CALLABLE_LIST_FIELDS = {"callbacks"} +_TASK_CALLABLE_FIELDS = {"callback", "condition", "guardrail"} +_TASK_CALLABLE_LIST_FIELDS = {"guardrails"} +_TASK_MODEL_CLASS_FIELDS = {"output_json", "output_pydantic", "response_model"} +_CREW_CALLABLE_FIELDS = {"step_callback", "task_callback"} +_CREW_CALLABLE_LIST_FIELDS = {"before_kickoff_callbacks", "after_kickoff_callbacks"} +_AGENT_OBJECT_REF_FIELDS = { + "agent_executor", + "checkpoint", + "embedder", + "function_calling_llm", + "i18n", + "knowledge", + "knowledge_config", + "knowledge_sources", + "knowledge_storage", + "llm", + "memory", + "planning_config", + "security_config", + "skills", +} +_TASK_OBJECT_REF_FIELDS = {"security_config"} +_CREW_OBJECT_REF_FIELDS = { + "chat_llm", + "checkpoint", + "embedder", + "function_calling_llm", + "knowledge", + "knowledge_sources", + "manager_agent", + "manager_llm", + "memory", + "planning_llm", + "security_config", + "skills", +} @dataclass(frozen=True) @@ -81,6 +141,7 @@ class JSONAgentDefinition: path: Path definition: dict[str, Any] kwargs: dict[str, Any] + agent_class: type[Any] @dataclass(frozen=True) @@ -136,15 +197,19 @@ def load_jsonc_file(source: str | Path) -> Any: def load_agent(source: str | Path) -> Any: """Load an existing ``Agent`` from a ``.json`` / ``.jsonc`` definition file.""" - from crewai import Agent - path = Path(source) defn = _expect_object(load_jsonc_file(path), path) root = path.parent.parent if path.parent.name == "agents" else Path.cwd() - agent_kwargs = _agent_kwargs_from_definition(defn, path, project_root=root) + agent_class = _agent_class_from_definition(defn, f"{path}: type") + agent_kwargs = _agent_kwargs_from_definition( + defn, + path, + agent_class=agent_class, + project_root=root, + ) try: - return Agent(**agent_kwargs) + return agent_class(**agent_kwargs) except ValidationError as exc: raise JSONProjectError(_format_validation_error(path, exc)) from exc except Exception as exc: @@ -207,6 +272,7 @@ def load_json_crew_project( {"inputs"}, ) ) + fail_many(_python_reference_definition_errors(defn, crew_path)) agent_names = defn.get("agents", []) if not isinstance(agent_names, list) or not agent_names: @@ -215,10 +281,13 @@ def load_json_crew_project( agents_dir = Path(agents_dir) agent_definitions: dict[str, JSONAgentDefinition] = {} - for agent_name in agent_names: + + def load_agent_definition(agent_name: str) -> None: if not isinstance(agent_name, str) or not agent_name: fail(f"{crew_path}: each agent reference must be a non-empty string") - continue + return + if agent_name in agent_definitions: + return agent_file = find_json_project_file(agents_dir, agent_name) if agent_file is None: message = ( @@ -232,46 +301,69 @@ def load_json_crew_project( ) else: raise FileNotFoundError(message) - continue + return try: agent_defn = _expect_object(load_jsonc_file(agent_file), agent_file) + agent_class = _agent_class_from_definition( + agent_defn, + f"{agent_file}: type", + resolve_python_refs=not collect_errors, + ) agent_kwargs = _agent_kwargs_from_definition( agent_defn, agent_file, + agent_class=agent_class, # Validation must never execute project code (custom tools). resolve_tools=not collect_errors, + resolve_python_refs=not collect_errors, project_root=crew_path.parent, ) except Exception as exc: if collect_errors: errors.append(str(exc)) - continue + return raise agent_definitions[agent_name] = JSONAgentDefinition( name=agent_name, path=agent_file, definition=agent_defn, kwargs=agent_kwargs, + agent_class=agent_class, ) + for agent_name in agent_names: + load_agent_definition(agent_name) + + manager_agent = defn.get("manager_agent") + if manager_agent is not None: + if isinstance(manager_agent, str) and manager_agent: + load_agent_definition(manager_agent) + elif _is_python_ref(manager_agent): + pass + else: + fail( + f"{crew_path}: 'manager_agent' must be an agent definition name " + f'or a {{"{PYTHON_REF_KEY}": "module.agent"}} reference' + ) + + known_agents = set(agent_definitions) + task_defs = defn.get("tasks", []) if not isinstance(task_defs, list) or not task_defs: fail(f"{crew_path}: 'tasks' must be a non-empty list") task_defs = [] known_tasks: set[str] = set() - known_agents = {name for name in agent_names if isinstance(name, str)} for index, task_defn in enumerate(task_defs): task_path = f"{crew_path}: tasks[{index}]" if not isinstance(task_defn, dict): fail(f"{task_path} must be an object") continue fail_many( - _field_errors( + _task_definition_errors( task_defn, - _task_allowed_fields(), - _TASK_RUNTIME_FIELDS, task_path, + resolve_python_refs=not collect_errors, ) ) missing_required = [ @@ -284,7 +376,8 @@ def load_json_crew_project( agent_ref = task_defn.get("agent") if agent_ref is not None and agent_ref not in known_agents: fail( - f"{task_path} references agent '{agent_ref}' which is not in the crew agents list" + f"{task_path} references agent '{agent_ref}' which does not match " + "a loaded agent definition" ) fail_many( @@ -422,19 +515,180 @@ def _expect_object(value: Any, source: str | Path) -> dict[str, Any]: return value +def _is_python_ref(value: Any) -> bool: + return isinstance(value, dict) and PYTHON_REF_KEY in value + + +def _python_ref_errors(value: Any, source: str | Path) -> list[str]: + if not isinstance(value, dict): + return [ + f"{source}: Python reference must be an object like " + f'{{"{PYTHON_REF_KEY}": "module.attribute"}}' + ] + if set(value) != {PYTHON_REF_KEY}: + return [ + f"{source}: Python reference objects must only contain '{PYTHON_REF_KEY}'" + ] + path = value.get(PYTHON_REF_KEY) + if not isinstance(path, str) or not path.strip(): + return [f"{source}: Python reference '{PYTHON_REF_KEY}' must be a string"] + if "." not in path: + return [ + f"{source}: Python reference '{path}' must be a dotted import path " + "like 'module.attribute'" + ] + return [] + + +def _python_ref_path(value: Any, source: str | Path) -> str: + errors = _python_ref_errors(value, source) + if errors: + raise JSONProjectValidationError(errors) + path = cast(str, value[PYTHON_REF_KEY]) + return path.strip() + + +def _resolve_python_ref( + value: Any, + source: str | Path, + *, + expected: str, +) -> Any: + from crewai.utilities.import_utils import import_and_validate_definition + + path = _python_ref_path(value, source) + try: + resolved = import_and_validate_definition(path) + except Exception as exc: + raise JSONProjectError(f"{source}: failed to import '{path}': {exc}") from exc + + if expected == "any": + return resolved + if expected == "callable" and not callable(resolved): + raise JSONProjectError(f"{source}: Python reference '{path}' is not callable") + if expected == "class" and not isinstance(resolved, type): + raise JSONProjectError(f"{source}: Python reference '{path}' is not a class") + return resolved + + +def _resolve_python_class( + value: Any, + source: str | Path, + *, + base_class: type[Any] | None = None, +) -> type[Any]: + cls = cast(type[Any], _resolve_python_ref(value, source, expected="class")) + if base_class is not None and not issubclass(cls, base_class): + raise JSONProjectError( + f"{source}: Python reference '{_python_ref_path(value, source)}' " + f"must be a subclass of {base_class.__module__}.{base_class.__name__}" + ) + return cls + + +def _agent_class_from_definition( + defn: dict[str, Any], + source: str | Path, + *, + resolve_python_refs: bool = True, +) -> type[Any]: + from crewai import Agent + + agent_class = cast(type[Any], Agent) + type_value = defn.get("type") + if type_value is None: + return agent_class + if isinstance(type_value, str) and type_value in _AGENT_TYPE_ALIASES: + return agent_class + if _is_python_ref(type_value): + if not resolve_python_refs: + errors = _python_ref_errors(type_value, source) + if errors: + raise JSONProjectValidationError(errors) + return agent_class + from crewai.agents.agent_builder.base_agent import BaseAgent + + return _resolve_python_class(type_value, source, base_class=BaseAgent) + if isinstance(type_value, str): + raise JSONProjectError( + f"{source}: unsupported agent type '{type_value}'. Use 'Agent' or " + f'{{"{PYTHON_REF_KEY}": "module.CustomAgent"}}.' + ) + raise JSONProjectValidationError(_python_ref_errors(type_value, source)) + + +def _task_class_from_definition( + defn: dict[str, Any], + source: str | Path, + *, + resolve_python_refs: bool = True, +) -> type[Any]: + from crewai import Task + + task_class = cast(type[Any], Task) + type_value = defn.get("type") + if type_value is None: + return task_class + if isinstance(type_value, str) and type_value in _TASK_TYPE_ALIASES: + return task_class + if isinstance(type_value, str) and type_value in _CONDITIONAL_TASK_TYPE_ALIASES: + from crewai.tasks.conditional_task import ConditionalTask + + return cast(type[Any], ConditionalTask) + if _is_python_ref(type_value): + if not resolve_python_refs: + errors = _python_ref_errors(type_value, source) + if errors: + raise JSONProjectValidationError(errors) + return task_class + return _resolve_python_class(type_value, source, base_class=task_class) + if isinstance(type_value, str): + raise JSONProjectError( + f"{source}: unsupported task type '{type_value}'. Use 'Task', " + f"'ConditionalTask', or " + f'{{"{PYTHON_REF_KEY}": "module.CustomTask"}}.' + ) + raise JSONProjectValidationError(_python_ref_errors(type_value, source)) + + +def _model_fields_for(model_cls: type[Any], source: str | Path) -> set[str]: + fields = getattr(model_cls, "model_fields", None) + if not isinstance(fields, dict): + raise JSONProjectError( + f"{source}: {model_cls.__module__}.{model_cls.__name__} must be a " + "Pydantic model class" + ) + return set(fields) + + +def _definition_has_python_type(defn: dict[str, Any]) -> bool: + return _is_python_ref(defn.get("type")) + + def _agent_kwargs_from_definition( defn: dict[str, Any], path: Path | str, *, + agent_class: type[Any] | None = None, resolve_tools: bool = True, + resolve_python_refs: bool = True, project_root: Path | None = None, ) -> dict[str, Any]: + agent_class = agent_class or _agent_class_from_definition( + defn, + f"{path}: type", + resolve_python_refs=resolve_python_refs, + ) + allowed_fields = _agent_allowed_fields(agent_class) + extra_allowed = {"settings", "type"} + skip_unknown = _definition_has_python_type(defn) and not resolve_python_refs errors = _field_errors( defn, - _agent_allowed_fields(), + allowed_fields, _AGENT_RUNTIME_FIELDS, path, - {"settings"}, + extra_allowed, + skip_unknown=skip_unknown, ) for required in ("role", "goal", "backstory"): if required not in defn: @@ -450,21 +704,26 @@ def _agent_kwargs_from_definition( errors.extend( _field_errors( settings, - _agent_allowed_fields(), + allowed_fields, _AGENT_RUNTIME_FIELDS, f"{path}: settings", + skip_unknown=skip_unknown, ) ) + errors.extend(_python_reference_definition_errors(defn, path)) + if isinstance(settings, dict): + errors.extend( + _python_reference_definition_errors(settings, f"{path}: settings") + ) if errors: raise JSONProjectValidationError(errors) - agent_kwargs = { - key: value for key, value in defn.items() if key in _agent_allowed_fields() - } + agent_kwargs = {key: value for key, value in defn.items() if key in allowed_fields} agent_kwargs.update(settings) if resolve_tools: _resolve_tool_fields(agent_kwargs, project_root=project_root) + _resolve_agent_python_refs(agent_kwargs, path) else: # Validation/deploy mode: check tool declarations structurally without # importing or instantiating anything — custom: tools execute @@ -484,24 +743,28 @@ def _task_kwargs_from_definition( source: str, project_root: Path | None = None, ) -> dict[str, Any]: + task_class = _task_class_from_definition(task_defn, f"{source}: type") + allowed_fields = _task_allowed_fields(task_class) errors = _field_errors( task_defn, - _task_allowed_fields(), + allowed_fields, _TASK_RUNTIME_FIELDS, source, + {"type"}, ) if errors: raise JSONProjectValidationError(errors) task_kwargs = { - key: value for key, value in task_defn.items() if key in _task_allowed_fields() + key: value for key, value in task_defn.items() if key in allowed_fields } agent_ref = task_kwargs.get("agent") if agent_ref is not None and isinstance(agent_ref, str): if agent_ref not in agents_map: raise JSONProjectError( - f"{source} references agent '{agent_ref}' which is not in the crew agents list" + f"{source} references agent '{agent_ref}' which does not match " + "a loaded agent definition" ) task_kwargs["agent"] = agents_map[agent_ref] @@ -518,6 +781,13 @@ def _task_kwargs_from_definition( task_kwargs["context"] = context_tasks _resolve_tool_fields(task_kwargs, project_root=project_root) + _resolve_task_python_refs(task_kwargs, source) + if "input_files" in task_kwargs: + task_kwargs["input_files"] = _normalize_input_files( + task_kwargs["input_files"], + source, + project_root, + ) return task_kwargs @@ -548,10 +818,12 @@ def _crew_kwargs_from_definition( if isinstance(manager_agent, str): if manager_agent not in agents_map: raise JSONProjectError( - f"{source}: manager_agent '{manager_agent}' is not in the crew agents list" + f"{source}: manager_agent '{manager_agent}' does not match an agent definition" ) crew_kwargs["manager_agent"] = agents_map[manager_agent] + _resolve_crew_python_refs(crew_kwargs, source) + return crew_kwargs @@ -561,6 +833,8 @@ def _resolve_tool_fields( tools = kwargs.get("tools") if tools is not None: kwargs["tools"] = _resolve_tools(tools, project_root=project_root) + if "mcps" in kwargs: + kwargs["mcps"] = _resolve_mcp_python_refs(kwargs["mcps"]) def _field_errors( @@ -569,11 +843,17 @@ def _field_errors( runtime_fields: set[str], source: str | Path, extra_allowed: set[str] | None = None, + *, + skip_unknown: bool = False, ) -> list[str]: extra_allowed = extra_allowed or set() keys = set(data) runtime = sorted(keys & runtime_fields) - unknown = sorted(keys - allowed_fields - runtime_fields - extra_allowed) + unknown = ( + [] + if skip_unknown + else sorted(keys - allowed_fields - runtime_fields - extra_allowed) + ) errors: list[str] = [] if runtime: @@ -586,16 +866,16 @@ def _field_errors( return errors -def _agent_allowed_fields() -> set[str]: +def _agent_allowed_fields(agent_class: type[Any] | None = None) -> set[str]: from crewai import Agent - return set(Agent.model_fields) - _AGENT_RUNTIME_FIELDS + return _model_fields_for(agent_class or Agent, "agent type") - _AGENT_RUNTIME_FIELDS -def _task_allowed_fields() -> set[str]: +def _task_allowed_fields(task_class: type[Any] | None = None) -> set[str]: from crewai import Task - return set(Task.model_fields) - _TASK_RUNTIME_FIELDS + return _model_fields_for(task_class or Task, "task type") - _TASK_RUNTIME_FIELDS def _crew_allowed_fields() -> set[str]: @@ -604,6 +884,417 @@ def _crew_allowed_fields() -> set[str]: return set(Crew.model_fields) - _CREW_RUNTIME_FIELDS +def _task_definition_errors( + task_defn: dict[str, Any], + source: str | Path, + *, + resolve_python_refs: bool, +) -> list[str]: + skip_unknown = _definition_has_python_type(task_defn) and not resolve_python_refs + try: + task_class = _task_class_from_definition( + task_defn, + f"{source}: type", + resolve_python_refs=resolve_python_refs, + ) + except JSONProjectValidationError as exc: + return exc.errors + except JSONProjectError as exc: + return [str(exc)] + + errors = _field_errors( + task_defn, + _task_allowed_fields(task_class), + _TASK_RUNTIME_FIELDS, + source, + {"type"}, + skip_unknown=skip_unknown, + ) + errors.extend(_python_reference_definition_errors(task_defn, source)) + return errors + + +def _python_reference_definition_errors( + defn: dict[str, Any], + source: str | Path, +) -> list[str]: + errors: list[str] = [] + for field in ( + _AGENT_CALLABLE_FIELDS + | _AGENT_CALLABLE_LIST_FIELDS + | _TASK_CALLABLE_FIELDS + | _TASK_CALLABLE_LIST_FIELDS + | _TASK_MODEL_CLASS_FIELDS + | _CREW_CALLABLE_FIELDS + | _CREW_CALLABLE_LIST_FIELDS + | {"converter_cls", "executor_class"} + ): + if field not in defn: + continue + errors.extend(_python_reference_value_errors(defn[field], f"{source}: {field}")) + + for field in ( + _AGENT_OBJECT_REF_FIELDS | _TASK_OBJECT_REF_FIELDS | _CREW_OBJECT_REF_FIELDS + ): + if field not in defn: + continue + errors.extend( + _python_reference_value_errors_recursive(defn[field], f"{source}: {field}") + ) + + errors.extend( + _embedder_python_ref_errors(defn.get("embedder"), f"{source}: embedder") + ) + errors.extend(_a2a_python_ref_errors(defn.get("a2a"), f"{source}: a2a")) + errors.extend(_mcp_python_ref_errors(defn.get("mcps"), f"{source}: mcps")) + + type_value = defn.get("type") + if _is_python_ref(type_value): + errors.extend(_python_ref_errors(type_value, f"{source}: type")) + return errors + + +def _python_reference_value_errors(value: Any, source: str | Path) -> list[str]: + errors: list[str] = [] + if _is_python_ref(value): + return _python_ref_errors(value, source) + if isinstance(value, list): + for index, item in enumerate(value): + if _is_python_ref(item): + errors.extend(_python_ref_errors(item, f"{source}[{index}]")) + return errors + + +def _python_reference_value_errors_recursive( + value: Any, source: str | Path +) -> list[str]: + if _is_python_ref(value): + return _python_ref_errors(value, source) + errors: list[str] = [] + if isinstance(value, list): + for index, item in enumerate(value): + errors.extend( + _python_reference_value_errors_recursive(item, f"{source}[{index}]") + ) + elif isinstance(value, dict): + for key, item in value.items(): + errors.extend( + _python_reference_value_errors_recursive(item, f"{source}.{key}") + ) + return errors + + +def _embedder_python_ref_errors(value: Any, source: str | Path) -> list[str]: + if not isinstance(value, dict): + return [] + config = value.get("config") + if not isinstance(config, dict): + return [] + embedding_callable = config.get("embedding_callable") + if _is_python_ref(embedding_callable): + return _python_ref_errors( + embedding_callable, f"{source}.config.embedding_callable" + ) + return [] + + +def _a2a_python_ref_errors(value: Any, source: str | Path) -> list[str]: + configs = value if isinstance(value, list) else [value] + errors: list[str] = [] + for index, config in enumerate(configs): + if not isinstance(config, dict): + continue + response_model = config.get("response_model") + if _is_python_ref(response_model): + errors.extend( + _python_ref_errors(response_model, f"{source}[{index}].response_model") + ) + return errors + + +def _mcp_python_ref_errors(value: Any, source: str | Path) -> list[str]: + if not isinstance(value, list): + return [] + errors: list[str] = [] + for index, config in enumerate(value): + if not isinstance(config, dict): + continue + tool_filter = config.get("tool_filter") + if _is_python_ref(tool_filter): + errors.extend( + _python_ref_errors(tool_filter, f"{source}[{index}].tool_filter") + ) + elif isinstance(tool_filter, dict) and tool_filter.get("type") == "static": + for key in ("allowed_tool_names", "blocked_tool_names"): + names = tool_filter.get(key) + if names is not None and not _is_string_list(names): + errors.append( + f"{source}[{index}].tool_filter.{key} must be a list of strings" + ) + return errors + + +def _resolve_agent_python_refs(kwargs: dict[str, Any], source: str | Path) -> None: + _resolve_callable_fields( + kwargs, + source, + scalar_fields=_AGENT_CALLABLE_FIELDS, + list_fields=_AGENT_CALLABLE_LIST_FIELDS, + ) + if _is_python_ref(kwargs.get("executor_class")): + kwargs["executor_class"] = _resolve_python_class( + kwargs["executor_class"], f"{source}: executor_class" + ) + if "embedder" in kwargs: + kwargs["embedder"] = _resolve_embedder_python_refs(kwargs["embedder"], source) + if "a2a" in kwargs: + kwargs["a2a"] = _resolve_a2a_python_refs(kwargs["a2a"], source) + _resolve_object_reference_fields(kwargs, source, _AGENT_OBJECT_REF_FIELDS) + + +def _resolve_task_python_refs(kwargs: dict[str, Any], source: str | Path) -> None: + _resolve_callable_fields( + kwargs, + source, + scalar_fields=_TASK_CALLABLE_FIELDS, + list_fields=_TASK_CALLABLE_LIST_FIELDS, + ) + for field in _TASK_MODEL_CLASS_FIELDS: + if _is_python_ref(kwargs.get(field)): + kwargs[field] = _resolve_model_class(kwargs[field], f"{source}: {field}") + if _is_python_ref(kwargs.get("converter_cls")): + from crewai.utilities.converter import Converter + + kwargs["converter_cls"] = _resolve_python_class( + kwargs["converter_cls"], + f"{source}: converter_cls", + base_class=Converter, + ) + elif isinstance(kwargs.get("converter_cls"), str): + raise JSONProjectError( + f"{source}: converter_cls must use " + f'{{"{PYTHON_REF_KEY}": "module.ConverterSubclass"}}' + ) + _resolve_object_reference_fields(kwargs, source, _TASK_OBJECT_REF_FIELDS) + + +def _resolve_crew_python_refs(kwargs: dict[str, Any], source: str | Path) -> None: + _resolve_callable_fields( + kwargs, + source, + scalar_fields=_CREW_CALLABLE_FIELDS, + list_fields=_CREW_CALLABLE_LIST_FIELDS, + ) + if "embedder" in kwargs: + kwargs["embedder"] = _resolve_embedder_python_refs(kwargs["embedder"], source) + _resolve_object_reference_fields(kwargs, source, _CREW_OBJECT_REF_FIELDS) + + +def _resolve_object_reference_fields( + kwargs: dict[str, Any], + source: str | Path, + fields: set[str], +) -> None: + for field in fields: + if field not in kwargs: + continue + kwargs[field] = _resolve_python_refs_recursively( + kwargs[field], f"{source}: {field}" + ) + + +def _resolve_python_refs_recursively(value: Any, source: str | Path) -> Any: + if _is_python_ref(value): + return _resolve_python_ref(value, source, expected="any") + if isinstance(value, list): + return [ + _resolve_python_refs_recursively(item, f"{source}[{index}]") + for index, item in enumerate(value) + ] + if isinstance(value, dict): + return { + key: _resolve_python_refs_recursively(item, f"{source}.{key}") + for key, item in value.items() + } + return value + + +def _resolve_callable_fields( + kwargs: dict[str, Any], + source: str | Path, + *, + scalar_fields: set[str], + list_fields: set[str], +) -> None: + for field in scalar_fields: + if _is_python_ref(kwargs.get(field)): + kwargs[field] = _resolve_python_ref( + kwargs[field], + f"{source}: {field}", + expected="callable", + ) + for field in list_fields: + value = kwargs.get(field) + if not isinstance(value, list): + continue + kwargs[field] = [ + _resolve_python_ref( + item, f"{source}: {field}[{index}]", expected="callable" + ) + if _is_python_ref(item) + else item + for index, item in enumerate(value) + ] + + +def _resolve_model_class(value: Any, source: str | Path) -> type[BaseModel]: + return _resolve_python_class(value, source, base_class=BaseModel) + + +def _resolve_embedder_python_refs(value: Any, source: str | Path) -> Any: + if not isinstance(value, dict): + return value + config = value.get("config") + if not isinstance(config, dict): + return value + embedding_callable = config.get("embedding_callable") + if not _is_python_ref(embedding_callable): + return value + + from crewai.rag.embeddings.providers.custom.embedding_callable import ( + CustomEmbeddingFunction, + ) + + normalized = dict(value) + normalized_config = dict(config) + normalized_config["embedding_callable"] = _resolve_python_class( + embedding_callable, + f"{source}: embedder.config.embedding_callable", + base_class=CustomEmbeddingFunction, + ) + normalized["config"] = normalized_config + return normalized + + +def _resolve_a2a_python_refs(value: Any, source: str | Path) -> Any: + if isinstance(value, list): + return [ + _resolve_a2a_python_refs(item, f"{source}: a2a[{index}]") + for index, item in enumerate(value) + ] + if not isinstance(value, dict): + return value + response_model = value.get("response_model") + if response_model is None: + return value + + normalized = dict(value) + if _is_python_ref(response_model): + normalized["response_model"] = _resolve_model_class( + response_model, + f"{source}: a2a.response_model", + ) + elif isinstance(response_model, dict): + from crewai.utilities.pydantic_schema_utils import create_model_from_schema + + normalized["response_model"] = create_model_from_schema(response_model) + return normalized + + +def _resolve_mcp_python_refs(value: Any) -> Any: + if not isinstance(value, list): + return value + return [ + _resolve_mcp_config_python_refs(config, index) + if isinstance(config, dict) + else config + for index, config in enumerate(value) + ] + + +def _resolve_mcp_config_python_refs( + config: dict[str, Any], index: int +) -> dict[str, Any]: + tool_filter = config.get("tool_filter") + if tool_filter is None: + return config + normalized = dict(config) + if _is_python_ref(tool_filter): + normalized["tool_filter"] = _resolve_python_ref( + tool_filter, + f"mcps[{index}].tool_filter", + expected="callable", + ) + elif isinstance(tool_filter, dict) and tool_filter.get("type") == "static": + from crewai.mcp.filters import create_static_tool_filter + + allowed_tool_names = tool_filter.get("allowed_tool_names") + blocked_tool_names = tool_filter.get("blocked_tool_names") + if allowed_tool_names is not None and not _is_string_list(allowed_tool_names): + raise JSONProjectValidationError( + [ + f"mcps[{index}].tool_filter.allowed_tool_names must be a list of strings" + ] + ) + if blocked_tool_names is not None and not _is_string_list(blocked_tool_names): + raise JSONProjectValidationError( + [ + f"mcps[{index}].tool_filter.blocked_tool_names must be a list of strings" + ] + ) + normalized["tool_filter"] = create_static_tool_filter( + allowed_tool_names=allowed_tool_names, + blocked_tool_names=blocked_tool_names, + ) + return normalized + + +def _is_string_list(value: Any) -> bool: + return isinstance(value, list) and all(isinstance(item, str) for item in value) + + +def _normalize_input_files( + value: Any, + source: str | Path, + project_root: Path | None, +) -> Any: + if value is None: + return value + if not isinstance(value, dict): + raise JSONProjectValidationError( + [f"{source}: input_files must be an object mapping names to file specs"] + ) + + normalized: dict[str, Any] = {} + for name, file_spec in value.items(): + if isinstance(file_spec, str): + normalized[name] = { + "source": _resolve_project_path(file_spec, project_root) + } + continue + if isinstance(file_spec, dict): + normalized_spec = dict(file_spec) + for field in ("source", "path"): + field_value = normalized_spec.get(field) + if isinstance(field_value, str): + normalized_spec[field] = _resolve_project_path( + field_value, project_root + ) + normalized[name] = normalized_spec + continue + normalized[name] = file_spec + return normalized + + +def _resolve_project_path(value: str, project_root: Path | None) -> str: + if not value or _URI_RE.match(value): + return value + path = Path(value) + if path.is_absolute(): + return value + return str(((project_root or Path.cwd()) / path).resolve()) + + def _format_validation_error(path: str | Path, exc: ValidationError) -> str: return f"{path}: validation failed: {exc}" diff --git a/lib/crewai/tests/project/test_crew_loader.py b/lib/crewai/tests/project/test_crew_loader.py index 04af84372..c4d9f75fa 100644 --- a/lib/crewai/tests/project/test_crew_loader.py +++ b/lib/crewai/tests/project/test_crew_loader.py @@ -12,6 +12,36 @@ from crewai.project.json_loader import JSONProjectError, JSONProjectValidationEr from crewai.project.crew_loader import load_crew +def _write_python_defs(tmp_path: Path) -> None: + module = tmp_path / "json_refs.py" + module.write_text( + "from pydantic import BaseModel\n" + "from crewai import Agent, Task\n" + "from crewai.security.security_config import SecurityConfig\n" + "from crewai.utilities.converter import Converter\n" + "\n" + "def always_true(_context):\n" + " return True\n" + "\n" + "def task_callback(output):\n" + " return output\n" + "\n" + "class SpecialAgent(Agent):\n" + " specialty: str = 'general'\n" + "\n" + "class SpecialTask(Task):\n" + " priority: int = 0\n" + "\n" + "class ReportModel(BaseModel):\n" + " summary: str\n" + "\n" + "class SpecialConverter(Converter):\n" + " pass\n" + "\n" + "security_config = SecurityConfig(fingerprint='agent-seed')\n" + ) + + def _write_agent(agents_dir: Path, name: str, **overrides) -> Path: defn = { "role": f"{name} role", @@ -30,6 +60,15 @@ def _write_crew(project_dir: Path, crew_def: dict) -> Path: return f +def _input_file_path(value) -> Path: + if isinstance(value, dict): + source = value.get("source", value) + else: + source = getattr(value, "source", value) + path = getattr(source, "path", source) + return Path(str(path)) + + class TestLoadCrew: def test_minimal_crew(self, tmp_path: Path): agents_dir = tmp_path / "agents" @@ -139,6 +178,38 @@ class TestLoadCrew: from crewai import Process assert crew.process == Process.hierarchical + def test_crew_hierarchical_manager_agent_from_separate_agent_file( + self, tmp_path: Path + ): + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent(agents_dir, "worker") + _write_agent(agents_dir, "manager") + + crew_def = { + "name": "hier_manager_crew", + "agents": ["worker"], + "tasks": [ + { + "name": "work", + "description": "Do work", + "expected_output": "Work done", + "agent": "manager", + } + ], + "process": "hierarchical", + "manager_agent": "manager", + } + crew_file = _write_crew(tmp_path, crew_def) + + crew, _ = load_crew(crew_file) + + assert len(crew.agents) == 1 + assert crew.agents[0].role == "worker role" + assert crew.manager_agent is not None + assert crew.manager_agent.role == "manager role" + assert crew.tasks[0].agent is crew.manager_agent + def test_crew_accepts_llm_config_objects(self, tmp_path: Path): agents_dir = tmp_path / "agents" agents_dir.mkdir() @@ -289,6 +360,156 @@ class TestLoadCrew: assert task.guardrail == "Return a summary field." assert task.allow_crewai_trigger_context is False + def test_crew_loads_conditional_task_with_python_condition( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + _write_python_defs(tmp_path) + monkeypatch.syspath_prepend(str(tmp_path)) + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent(agents_dir, "worker") + + crew_def = { + "name": "conditional_crew", + "agents": ["worker"], + "tasks": [ + { + "name": "first", + "description": "First task", + "expected_output": "First output", + "agent": "worker", + }, + { + "type": "ConditionalTask", + "name": "second", + "description": "Second task", + "expected_output": "Second output", + "agent": "worker", + "condition": {"python": "json_refs.always_true"}, + }, + ], + } + crew_file = _write_crew(tmp_path, crew_def) + + crew, _ = load_crew(crew_file) + + from crewai.tasks.conditional_task import ConditionalTask + + assert isinstance(crew.tasks[1], ConditionalTask) + assert crew.tasks[1].should_execute(None) + + def test_crew_loads_custom_agent_and_task_types( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + _write_python_defs(tmp_path) + monkeypatch.syspath_prepend(str(tmp_path)) + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent( + agents_dir, + "specialist", + type={"python": "json_refs.SpecialAgent"}, + security_config={"python": "json_refs.security_config"}, + specialty="research", + ) + + crew_def = { + "name": "custom_types_crew", + "agents": ["specialist"], + "tasks": [ + { + "type": {"python": "json_refs.SpecialTask"}, + "name": "prioritized", + "description": "Do prioritized work", + "expected_output": "Prioritized output", + "agent": "specialist", + "priority": 7, + } + ], + } + crew_file = _write_crew(tmp_path, crew_def) + + crew, _ = load_crew(crew_file) + + assert crew.agents[0].__class__.__name__ == "SpecialAgent" + assert crew.agents[0].specialty == "research" + from crewai.security.fingerprint import Fingerprint + + assert crew.agents[0].security_config.fingerprint == Fingerprint.generate( + seed="agent-seed" + ) + assert crew.tasks[0].__class__.__name__ == "SpecialTask" + assert crew.tasks[0].priority == 7 + + def test_crew_loads_python_ref_task_fields( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ): + _write_python_defs(tmp_path) + monkeypatch.syspath_prepend(str(tmp_path)) + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent(agents_dir, "writer") + + crew_def = { + "name": "python_refs_crew", + "agents": ["writer"], + "tasks": [ + { + "name": "write", + "description": "Write something", + "expected_output": "Written content", + "agent": "writer", + "callback": {"python": "json_refs.task_callback"}, + "output_json": {"python": "json_refs.ReportModel"}, + "converter_cls": {"python": "json_refs.SpecialConverter"}, + } + ], + } + crew_file = _write_crew(tmp_path, crew_def) + + crew, _ = load_crew(crew_file) + task = crew.tasks[0] + + assert task.callback.__name__ == "task_callback" + assert task.output_json.__name__ == "ReportModel" + assert "summary" in task.output_json.model_fields + assert task.converter_cls.__name__ == "SpecialConverter" + + def test_crew_loads_project_relative_input_files(self, tmp_path: Path): + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + _write_agent(agents_dir, "reader") + data_dir = tmp_path / "data" + data_dir.mkdir() + brief_path = data_dir / "brief.txt" + spec_path = data_dir / "spec.md" + brief_path.write_text("brief") + spec_path.write_text("spec") + + crew_def = { + "name": "input_files_crew", + "agents": ["reader"], + "tasks": [ + { + "name": "read", + "description": "Read files", + "expected_output": "File summary", + "agent": "reader", + "input_files": { + "brief": "data/brief.txt", + "spec": {"source": "data/spec.md"}, + }, + } + ], + } + crew_file = _write_crew(tmp_path, crew_def) + + crew, _ = load_crew(crew_file) + input_files = crew.tasks[0].input_files + + assert _input_file_path(input_files["brief"]) == brief_path + assert _input_file_path(input_files["spec"]) == spec_path + def test_missing_agent_file_raises(self, tmp_path: Path): agents_dir = tmp_path / "agents" agents_dir.mkdir() diff --git a/lib/crewai/tests/project/test_json_loader.py b/lib/crewai/tests/project/test_json_loader.py index be8a034d1..0da719c5a 100644 --- a/lib/crewai/tests/project/test_json_loader.py +++ b/lib/crewai/tests/project/test_json_loader.py @@ -248,6 +248,33 @@ class TestLoadAgent: assert len(agent.tools or []) == 1 assert agent.tools[0].name == "echo" + def test_load_agent_accepts_static_mcp_tool_filter(self, tmp_path: Path): + agent_def = { + "role": "MCP User", + "goal": "Use MCP tools", + "backstory": "MCP expert.", + "mcps": [ + { + "command": "python", + "args": ["server.py"], + "tool_filter": { + "type": "static", + "allowed_tool_names": ["read_file"], + "blocked_tool_names": ["delete_file"], + }, + } + ], + } + agent_file = tmp_path / "agent.json" + agent_file.write_text(json.dumps(agent_def)) + + agent = load_agent(agent_file) + + tool_filter = agent.mcps[0].tool_filter + assert tool_filter({"name": "read_file"}) + assert not tool_filter({"name": "delete_file"}) + assert not tool_filter({"name": "write_file"}) + def test_load_agent_rejects_runtime_fields(self, tmp_path: Path): agent_def = { "id": "00000000-0000-4000-8000-000000000000", @@ -399,6 +426,33 @@ class TestValidationDoesNotExecuteTools: assert not sentinel.exists(), "validation must not execute tools/.py" assert project.agent_names == ["worker"] + def test_validate_does_not_import_python_refs( + self, tmp_path, monkeypatch: pytest.MonkeyPatch + ): + from crewai.project.json_loader import validate_crew_project + + sentinel = tmp_path / "python_ref_executed.txt" + (tmp_path / "callbacks.py").write_text( + "from pathlib import Path\n" + f"Path({str(sentinel)!r}).write_text('boom')\n" + "def step_callback(*_args, **_kwargs):\n" + " return None\n" + ) + monkeypatch.syspath_prepend(str(tmp_path)) + sys.modules.pop("callbacks", None) + crew_path = self._write_project( + tmp_path, + tool_line='{"tool_type": "some.module.Tool"}', + ) + agent_file = tmp_path / "agents" / "worker.jsonc" + agent_def = json.loads(agent_file.read_text()) + agent_def["step_callback"] = {"python": "callbacks.step_callback"} + agent_file.write_text(json.dumps(agent_def)) + + validate_crew_project(crew_path, tmp_path / "agents") + + assert not sentinel.exists(), "validation must not import Python refs" + def test_validate_reports_missing_custom_tool_file(self, tmp_path): from crewai.project.json_loader import ( JSONProjectValidationError,