fix: validate tool module imports in load_agent_from_repository to prevent RCE

Add an allowlist of trusted module prefixes (crewai_tools., crewai.tools.) and
a BaseTool subclass check before dynamically importing and instantiating tool
classes from the repository API response.

Previously, tool['module'] and tool['name'] from the API were passed directly
to importlib.import_module() and getattr() without any validation, allowing
a compromised or MITM'd API to import arbitrary Python modules (e.g.
subprocess.Popen) and achieve Remote Code Execution.

Fixes #5446

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2026-04-14 07:33:22 +00:00
parent 0dba95e166
commit 8d68d35622
2 changed files with 322 additions and 4 deletions

View File

@@ -6,6 +6,7 @@ import concurrent.futures
import contextvars
from dataclasses import dataclass, field
from datetime import datetime
import importlib
import inspect
import json
import re
@@ -1085,6 +1086,22 @@ def _print_current_organization() -> None:
)
# Allowlist of trusted module prefixes for dynamic tool imports from the
# repository API. Only modules whose fully-qualified name starts with one of
# these prefixes are permitted. This prevents a compromised or spoofed API
# response from importing arbitrary Python modules (e.g. ``subprocess``,
# ``os``, ``shutil``) and achieving Remote Code Execution.
ALLOWED_TOOL_MODULE_PREFIXES: Final[tuple[str, ...]] = (
"crewai_tools.",
"crewai.tools.",
)
def _is_trusted_tool_module(module_path: str) -> bool:
"""Return True if *module_path* is within the trusted tool allowlist."""
return any(module_path.startswith(prefix) for prefix in ALLOWED_TOOL_MODULE_PREFIXES)
def load_agent_from_repository(from_repository: str) -> dict[str, Any]:
"""Load an agent from the repository.
@@ -1099,8 +1116,6 @@ def load_agent_from_repository(from_repository: str) -> dict[str, Any]:
"""
attributes: dict[str, Any] = {}
if from_repository:
import importlib
if callable(_create_plus_client_hook):
client = _create_plus_client_hook()
else:
@@ -1128,8 +1143,26 @@ def load_agent_from_repository(from_repository: str) -> dict[str, Any]:
attributes[key] = []
for tool in value:
try:
module = importlib.import_module(tool["module"])
tool_class = getattr(module, tool["name"])
tool_module = tool["module"]
tool_name = tool["name"]
if not _is_trusted_tool_module(tool_module):
raise AgentRepositoryError(
f"Untrusted tool module '{tool_module}'. "
f"Only modules starting with {ALLOWED_TOOL_MODULE_PREFIXES} are allowed."
)
module = importlib.import_module(tool_module)
tool_class = getattr(module, tool_name)
if not (
isinstance(tool_class, type)
and issubclass(tool_class, BaseTool)
):
raise AgentRepositoryError(
f"Tool '{tool_name}' from module '{tool_module}' "
f"is not a BaseTool subclass."
)
tool_value = tool_class(**tool["init_params"])

View File

@@ -11,15 +11,19 @@ from pydantic import BaseModel, Field
from crewai.tools.base_tool import BaseTool
from crewai.utilities.agent_utils import (
ALLOWED_TOOL_MODULE_PREFIXES,
_asummarize_chunks,
_estimate_token_count,
_extract_summary_tags,
_format_messages_for_summary,
_is_trusted_tool_module,
_split_messages_into_chunks,
convert_tools_to_openai_schema,
load_agent_from_repository,
parse_tool_call_args,
summarize_messages,
)
from crewai.utilities.errors import AgentRepositoryError
class CalculatorInput(BaseModel):
@@ -1033,3 +1037,284 @@ class TestParseToolCallArgs:
_, error = parse_tool_call_args("{bad json}", "tool", "call_7")
assert error is not None
assert set(error.keys()) == {"call_id", "func_name", "result", "from_cache", "original_tool"}
# ---------------------------------------------------------------------------
# Tests for RCE fix: module allowlist in load_agent_from_repository
# (Issue #5446)
# ---------------------------------------------------------------------------
class TestIsTrustedToolModule:
"""Tests for _is_trusted_tool_module allowlist validation."""
def test_crewai_tools_prefix_is_trusted(self) -> None:
assert _is_trusted_tool_module("crewai_tools.tools.file_read_tool") is True
def test_crewai_tools_submodule_is_trusted(self) -> None:
assert _is_trusted_tool_module("crewai_tools.tools.exa_tools.exa_search_tool") is True
def test_crewai_dot_tools_prefix_is_trusted(self) -> None:
assert _is_trusted_tool_module("crewai.tools.some_tool") is True
def test_subprocess_is_rejected(self) -> None:
assert _is_trusted_tool_module("subprocess") is False
def test_os_is_rejected(self) -> None:
assert _is_trusted_tool_module("os") is False
def test_shutil_is_rejected(self) -> None:
assert _is_trusted_tool_module("shutil") is False
def test_builtins_is_rejected(self) -> None:
assert _is_trusted_tool_module("builtins") is False
def test_arbitrary_dotted_module_is_rejected(self) -> None:
assert _is_trusted_tool_module("some_package.evil_module") is False
def test_empty_string_is_rejected(self) -> None:
assert _is_trusted_tool_module("") is False
def test_partial_prefix_match_is_rejected(self) -> None:
"""Ensure 'crewai_tools_evil' doesn't pass by prefix match."""
assert _is_trusted_tool_module("crewai_tools_evil.module") is False
def test_crewai_without_tools_is_rejected(self) -> None:
"""Only 'crewai.tools.' is trusted, not arbitrary crewai submodules."""
assert _is_trusted_tool_module("crewai.cli.some_module") is False
def test_crewai_tools_exact_no_dot_is_rejected(self) -> None:
"""'crewai_tools' alone (no trailing dot / submodule) is rejected."""
assert _is_trusted_tool_module("crewai_tools") is False
class TestAllowedToolModulePrefixes:
"""Verify the constant hasn't been accidentally changed."""
def test_contains_expected_prefixes(self) -> None:
assert "crewai_tools." in ALLOWED_TOOL_MODULE_PREFIXES
assert "crewai.tools." in ALLOWED_TOOL_MODULE_PREFIXES
def test_does_not_contain_dangerous_prefixes(self) -> None:
for prefix in ALLOWED_TOOL_MODULE_PREFIXES:
assert "subprocess" not in prefix
assert "os." not in prefix
assert "sys." not in prefix
class TestLoadAgentFromRepositoryModuleValidation:
"""Integration tests for the module-allowlist guard in load_agent_from_repository."""
def _make_mock_response(self, agent_json: dict) -> MagicMock:
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = agent_json
return resp
@patch("crewai.utilities.agent_utils._print_current_organization")
@patch("crewai.utilities.agent_utils.asyncio.run")
@patch("crewai.utilities.agent_utils._create_plus_client_hook")
def test_rejects_subprocess_module(
self, mock_hook: MagicMock, mock_run: MagicMock, mock_print: MagicMock
) -> None:
"""Importing 'subprocess.Popen' must be blocked (the exact attack from issue #5446)."""
mock_client = MagicMock()
mock_hook.return_value = mock_client
mock_run.return_value = self._make_mock_response(
{
"role": "Helper",
"goal": "Help",
"backstory": "Backstory",
"tools": [
{
"module": "subprocess",
"name": "Popen",
"init_params": {"args": ["id"], "stdout": -1},
}
],
}
)
with pytest.raises(AgentRepositoryError, match="Untrusted tool module"):
load_agent_from_repository("my-agent")
@patch("crewai.utilities.agent_utils._print_current_organization")
@patch("crewai.utilities.agent_utils.asyncio.run")
@patch("crewai.utilities.agent_utils._create_plus_client_hook")
def test_rejects_os_module(
self, mock_hook: MagicMock, mock_run: MagicMock, mock_print: MagicMock
) -> None:
"""Importing 'os' module must be blocked."""
mock_client = MagicMock()
mock_hook.return_value = mock_client
mock_run.return_value = self._make_mock_response(
{
"role": "Helper",
"goal": "Help",
"backstory": "Backstory",
"tools": [
{
"module": "os",
"name": "system",
"init_params": {},
}
],
}
)
with pytest.raises(AgentRepositoryError, match="Untrusted tool module"):
load_agent_from_repository("my-agent")
@patch("crewai.utilities.agent_utils._print_current_organization")
@patch("crewai.utilities.agent_utils.asyncio.run")
@patch("crewai.utilities.agent_utils._create_plus_client_hook")
def test_rejects_arbitrary_module(
self, mock_hook: MagicMock, mock_run: MagicMock, mock_print: MagicMock
) -> None:
"""Arbitrary third-party modules must be blocked."""
mock_client = MagicMock()
mock_hook.return_value = mock_client
mock_run.return_value = self._make_mock_response(
{
"role": "Helper",
"goal": "Help",
"backstory": "Backstory",
"tools": [
{
"module": "some_evil_package.backdoor",
"name": "Exploit",
"init_params": {},
}
],
}
)
with pytest.raises(AgentRepositoryError, match="Untrusted tool module"):
load_agent_from_repository("my-agent")
@patch("crewai.utilities.agent_utils._print_current_organization")
@patch("crewai.utilities.agent_utils.asyncio.run")
@patch("crewai.utilities.agent_utils._create_plus_client_hook")
def test_rejects_non_basetool_class(
self, mock_hook: MagicMock, mock_run: MagicMock, mock_print: MagicMock
) -> None:
"""Even if the module is allowed, the resolved class must be a BaseTool subclass."""
mock_client = MagicMock()
mock_hook.return_value = mock_client
mock_run.return_value = self._make_mock_response(
{
"role": "Helper",
"goal": "Help",
"backstory": "Backstory",
"tools": [
{
"module": "crewai.tools.base_tool",
"name": "CrewStructuredTool",
"init_params": {},
}
],
}
)
# CrewStructuredTool is imported in base_tool but is not a BaseTool subclass
# (it's a LangChain-style tool). This should fail the subclass check.
with pytest.raises(AgentRepositoryError):
load_agent_from_repository("my-agent")
def test_accepts_trusted_crewai_tools_module(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""A tool from 'crewai_tools.*' that is a BaseTool subclass should load fine."""
import crewai.utilities.agent_utils as _mod
class FakeTool(BaseTool):
name: str = "fake"
description: str = "A fake tool"
def _run(self, **kwargs: Any) -> str:
return "ok"
fake_module = MagicMock()
fake_module.FakeTool = FakeTool
monkeypatch.setattr(_mod, "_print_current_organization", lambda: None)
monkeypatch.setattr(
_mod, "_create_plus_client_hook", lambda: MagicMock()
)
monkeypatch.setattr(
_mod.asyncio, "run",
lambda _coro: self._make_mock_response(
{
"role": "Helper",
"goal": "Help",
"backstory": "Backstory",
"tools": [
{
"module": "crewai_tools.tools.fake_tool",
"name": "FakeTool",
"init_params": {},
}
],
}
),
)
monkeypatch.setattr(
_mod.importlib, "import_module", lambda _name: fake_module
)
result = load_agent_from_repository("my-agent")
assert "tools" in result
assert len(result["tools"]) == 1
assert isinstance(result["tools"][0], BaseTool)
@patch("crewai.utilities.agent_utils._print_current_organization")
@patch("crewai.utilities.agent_utils.asyncio.run")
@patch("crewai.utilities.agent_utils._create_plus_client_hook")
def test_non_tools_attributes_still_loaded(
self, mock_hook: MagicMock, mock_run: MagicMock, mock_print: MagicMock
) -> None:
"""Non-tool attributes from the API response should still be loaded normally."""
mock_client = MagicMock()
mock_hook.return_value = mock_client
mock_run.return_value = self._make_mock_response(
{
"role": "Researcher",
"goal": "Research things",
"backstory": "A curious agent",
}
)
result = load_agent_from_repository("my-agent")
assert result["role"] == "Researcher"
assert result["goal"] == "Research things"
assert result["backstory"] == "A curious agent"
@patch("crewai.utilities.agent_utils._print_current_organization")
@patch("crewai.utilities.agent_utils.asyncio.run")
@patch("crewai.utilities.agent_utils._create_plus_client_hook")
def test_first_malicious_tool_blocks_entire_load(
self, mock_hook: MagicMock, mock_run: MagicMock, mock_print: MagicMock
) -> None:
"""If any tool in the list is untrusted, an error is raised immediately."""
mock_client = MagicMock()
mock_hook.return_value = mock_client
mock_run.return_value = self._make_mock_response(
{
"role": "Helper",
"goal": "Help",
"backstory": "Backstory",
"tools": [
{
"module": "subprocess",
"name": "Popen",
"init_params": {"args": ["whoami"]},
},
{
"module": "crewai_tools.tools.file_read_tool",
"name": "FileReadTool",
"init_params": {},
},
],
}
)
with pytest.raises(AgentRepositoryError, match="Untrusted tool module"):
load_agent_from_repository("my-agent")