Compare commits

...

1 Commits

Author SHA1 Message Date
Iris Clawd
3bd055fcf3 feat: add command allowlist validation for MCP stdio transport
Add an optional allowed_commands parameter to StdioTransport that
validates the command basename against an allowlist before spawning
a subprocess. This provides defense-in-depth against configuration-
driven command injection as MCP server discovery becomes more dynamic.

- DEFAULT_ALLOWED_COMMANDS includes common runtimes: python, python3,
  node, npx, uvx, uv, deno, docker
- Validation checks os.path.basename(command) for cross-platform support
- Users can extend the allowlist, pass a custom set, or set
  allowed_commands=None to disable the check entirely
- No breaking change: all currently documented MCP server examples use
  commands in the default allowlist
- MCPServerStdio config model updated with allowed_commands field
- tool_resolver passes allowed_commands through to StdioTransport

Closes #5080
2026-03-30 22:08:51 +00:00
6 changed files with 167 additions and 1 deletions

View File

@@ -7,6 +7,7 @@ various transport types, similar to OpenAI's Agents SDK.
from pydantic import BaseModel, Field
from crewai.mcp.filters import ToolFilter
from crewai.mcp.transports.stdio import DEFAULT_ALLOWED_COMMANDS
class MCPServerStdio(BaseModel):
@@ -44,6 +45,14 @@ class MCPServerStdio(BaseModel):
default=None,
description="Optional tool filter for filtering available tools.",
)
allowed_commands: frozenset[str] | None = Field(
default=DEFAULT_ALLOWED_COMMANDS,
description=(
"Optional frozenset of allowed command basenames for security validation. "
"Defaults to common runtimes (python, node, npx, uvx, uv, deno, docker). "
"Set to None to disable the allowlist check."
),
)
cache_tools_list: bool = Field(
default=False,
description="Whether to cache the tool list for faster subsequent access.",

View File

@@ -292,6 +292,7 @@ class MCPToolResolver:
command=mcp_config.command,
args=mcp_config.args,
env=mcp_config.env,
allowed_commands=mcp_config.allowed_commands,
)
server_name = f"{mcp_config.command}_{'_'.join(mcp_config.args)}"
elif isinstance(mcp_config, MCPServerHTTP):

View File

@@ -3,11 +3,12 @@
from crewai.mcp.transports.base import BaseTransport, TransportType
from crewai.mcp.transports.http import HTTPTransport
from crewai.mcp.transports.sse import SSETransport
from crewai.mcp.transports.stdio import StdioTransport
from crewai.mcp.transports.stdio import DEFAULT_ALLOWED_COMMANDS, StdioTransport
__all__ = [
"BaseTransport",
"DEFAULT_ALLOWED_COMMANDS",
"HTTPTransport",
"SSETransport",
"StdioTransport",

View File

@@ -9,6 +9,22 @@ from typing_extensions import Self
from crewai.mcp.transports.base import BaseTransport, TransportType
# Default allowlist for common MCP server runtimes.
# Covers the vast majority of MCP server launch commands.
# Pass ``allowed_commands=None`` to disable validation entirely.
DEFAULT_ALLOWED_COMMANDS: frozenset[str] = frozenset(
{
"python",
"python3",
"node",
"npx",
"uvx",
"uv",
"deno",
"docker",
}
)
class StdioTransport(BaseTransport):
"""Stdio transport for connecting to local MCP servers.
@@ -34,6 +50,7 @@ class StdioTransport(BaseTransport):
command: str,
args: list[str] | None = None,
env: dict[str, str] | None = None,
allowed_commands: frozenset[str] | None = DEFAULT_ALLOWED_COMMANDS,
**kwargs: Any,
) -> None:
"""Initialize stdio transport.
@@ -42,9 +59,24 @@ class StdioTransport(BaseTransport):
command: Command to execute (e.g., "python", "node", "npx").
args: Command arguments (e.g., ["server.py"] or ["-y", "@mcp/server"]).
env: Environment variables to pass to the process.
allowed_commands: Optional frozenset of allowed command basenames.
Defaults to ``DEFAULT_ALLOWED_COMMANDS`` which includes common
runtimes (python, node, npx, uvx, uv, deno, docker). Pass
``None`` to disable the check entirely.
**kwargs: Additional transport options.
"""
super().__init__(**kwargs)
if allowed_commands is not None:
base_command = os.path.basename(command)
if base_command not in allowed_commands:
raise ValueError(
f"Command '{command}' is not in the allowed commands list: "
f"{sorted(allowed_commands)}. "
f"To allow this command, add it to allowed_commands or pass "
f"allowed_commands=None to disable this check."
)
self.command = command
self.args = args or []
self.env = env or {}

View File

@@ -0,0 +1,30 @@
"""Tests for MCPServerStdio allowed_commands config integration."""
import pytest
from crewai.mcp.config import MCPServerStdio
from crewai.mcp.transports.stdio import DEFAULT_ALLOWED_COMMANDS
class TestMCPServerStdioConfig:
"""Tests for the allowed_commands field on MCPServerStdio."""
def test_default_allowed_commands(self):
"""MCPServerStdio should default to DEFAULT_ALLOWED_COMMANDS."""
config = MCPServerStdio(command="python", args=["server.py"])
assert config.allowed_commands == DEFAULT_ALLOWED_COMMANDS
def test_custom_allowed_commands(self):
"""Users can override allowed_commands in config."""
custom = frozenset({"my-runtime"})
config = MCPServerStdio(
command="my-runtime", args=[], allowed_commands=custom
)
assert config.allowed_commands == custom
def test_none_allowed_commands(self):
"""Users can disable the allowlist via config."""
config = MCPServerStdio(
command="anything", args=[], allowed_commands=None
)
assert config.allowed_commands is None

View File

@@ -0,0 +1,93 @@
"""Tests for StdioTransport command allowlist validation."""
import pytest
from crewai.mcp.transports.stdio import DEFAULT_ALLOWED_COMMANDS, StdioTransport
class TestStdioTransportAllowlist:
"""Tests for the command allowlist feature."""
def test_default_allowed_commands_contains_common_runtimes(self):
"""DEFAULT_ALLOWED_COMMANDS should include all common MCP server runtimes."""
expected = {"python", "python3", "node", "npx", "uvx", "uv", "deno", "docker"}
assert expected == DEFAULT_ALLOWED_COMMANDS
def test_allowed_command_passes_validation(self):
"""Commands in the default allowlist should be accepted."""
for cmd in DEFAULT_ALLOWED_COMMANDS:
transport = StdioTransport(command=cmd, args=["server.py"])
assert transport.command == cmd
def test_allowed_command_with_full_path(self):
"""Full paths to allowed commands should pass (basename is checked)."""
transport = StdioTransport(command="/usr/bin/python3", args=["server.py"])
assert transport.command == "/usr/bin/python3"
def test_disallowed_command_raises_value_error(self):
"""Commands not in the allowlist should raise ValueError."""
with pytest.raises(ValueError, match="not in the allowed commands list"):
StdioTransport(command="malicious-binary", args=["--evil"])
def test_disallowed_command_with_full_path_raises(self):
"""Full paths to disallowed commands should also be rejected."""
with pytest.raises(ValueError, match="not in the allowed commands list"):
StdioTransport(command="/tmp/evil/script", args=[])
def test_allowed_commands_none_disables_validation(self):
"""Setting allowed_commands=None should disable the check entirely."""
transport = StdioTransport(
command="any-custom-binary",
args=["--flag"],
allowed_commands=None,
)
assert transport.command == "any-custom-binary"
def test_custom_allowlist(self):
"""Users should be able to pass a custom allowlist."""
custom = frozenset({"my-server", "python"})
# Allowed
transport = StdioTransport(
command="my-server", args=[], allowed_commands=custom
)
assert transport.command == "my-server"
# Not allowed
with pytest.raises(ValueError, match="not in the allowed commands list"):
StdioTransport(command="node", args=[], allowed_commands=custom)
def test_extended_allowlist(self):
"""Users should be able to extend the default allowlist."""
extended = DEFAULT_ALLOWED_COMMANDS | frozenset({"my-custom-runtime"})
transport = StdioTransport(
command="my-custom-runtime", args=[], allowed_commands=extended
)
assert transport.command == "my-custom-runtime"
# Original defaults still work
transport2 = StdioTransport(
command="python", args=["server.py"], allowed_commands=extended
)
assert transport2.command == "python"
def test_error_message_includes_sorted_allowed_commands(self):
"""The error message should list the allowed commands for discoverability."""
with pytest.raises(ValueError) as exc_info:
StdioTransport(command="bad-cmd", args=[])
error_msg = str(exc_info.value)
assert "bad-cmd" in error_msg
assert "allowed_commands=None" in error_msg
def test_args_and_env_still_work(self):
"""Existing args and env functionality should be unaffected."""
transport = StdioTransport(
command="python",
args=["server.py", "--port", "8080"],
env={"API_KEY": "test123"},
)
assert transport.command == "python"
assert transport.args == ["server.py", "--port", "8080"]
assert transport.env == {"API_KEY": "test123"}