feat: add command allowlist validation for MCP stdio transport

Addresses #5080 - adds an optional allowed_commands parameter to
StdioTransport that validates commands against an allowlist before
spawning subprocesses.

- Add DEFAULT_ALLOWED_COMMANDS frozenset (python, python3, node, npx, uvx, deno)
- Add allowed_commands parameter to StdioTransport.__init__ with validation
- Add allowed_commands field to MCPServerStdio config model
- Forward allowed_commands through MCPToolResolver._create_transport
- Export DEFAULT_ALLOWED_COMMANDS from transports __init__.py
- Add 29 tests covering allowlist validation, opt-out, custom lists, and integration

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2026-03-25 16:34:33 +00:00
parent 4d1c041cc1
commit 394a6df835
5 changed files with 217 additions and 1 deletions

View File

@@ -4,9 +4,11 @@ This module provides Pydantic models for configuring MCP servers with
various transport types, similar to OpenAI's Agents SDK.
"""
from pydantic import BaseModel, Field
from crewai.mcp.filters import ToolFilter
from crewai.mcp.transports.stdio import DEFAULT_ALLOWED_COMMANDS
class MCPServerStdio(BaseModel):
@@ -40,6 +42,13 @@ class MCPServerStdio(BaseModel):
default=None,
description="Environment variables to pass to the process.",
)
allowed_commands: frozenset[str] | None = Field(
default=DEFAULT_ALLOWED_COMMANDS,
description=(
"Optional set of allowed executable base names. "
"Defaults to DEFAULT_ALLOWED_COMMANDS. Set to None to disable."
),
)
tool_filter: ToolFilter | None = Field(
default=None,
description="Optional tool filter for filtering available tools.",

View File

@@ -292,6 +292,7 @@ class MCPToolResolver:
command=mcp_config.command,
args=mcp_config.args,
env=mcp_config.env,
allowed_commands=mcp_config.allowed_commands,
)
server_name = f"{mcp_config.command}_{'_'.join(mcp_config.args)}"
elif isinstance(mcp_config, MCPServerHTTP):

View File

@@ -3,10 +3,11 @@
from crewai.mcp.transports.base import BaseTransport, TransportType
from crewai.mcp.transports.http import HTTPTransport
from crewai.mcp.transports.sse import SSETransport
from crewai.mcp.transports.stdio import StdioTransport
from crewai.mcp.transports.stdio import DEFAULT_ALLOWED_COMMANDS, StdioTransport
__all__ = [
"DEFAULT_ALLOWED_COMMANDS",
"BaseTransport",
"HTTPTransport",
"SSETransport",

View File

@@ -1,6 +1,7 @@
"""Stdio transport for MCP servers running as local processes."""
import asyncio
from collections.abc import Set
import os
import subprocess
from typing import Any
@@ -10,6 +11,17 @@ from typing_extensions import Self
from crewai.mcp.transports.base import BaseTransport, TransportType
# Default allowlist of common MCP server runtimes.
DEFAULT_ALLOWED_COMMANDS: frozenset[str] = frozenset({
"python",
"python3",
"node",
"npx",
"uvx",
"deno",
})
class StdioTransport(BaseTransport):
"""Stdio transport for connecting to local MCP servers.
@@ -17,6 +29,11 @@ class StdioTransport(BaseTransport):
communicating via standard input/output streams. Supports Python,
Node.js, and other command-line servers.
An optional ``allowed_commands`` parameter restricts which executables
can be launched. It defaults to :data:`DEFAULT_ALLOWED_COMMANDS` which
covers the most common MCP server runtimes. Pass ``None`` to disable
the check entirely.
Example:
```python
transport = StdioTransport(
@@ -34,6 +51,7 @@ class StdioTransport(BaseTransport):
command: str,
args: list[str] | None = None,
env: dict[str, str] | None = None,
allowed_commands: Set[str] | None = DEFAULT_ALLOWED_COMMANDS,
**kwargs: Any,
) -> None:
"""Initialize stdio transport.
@@ -42,9 +60,22 @@ class StdioTransport(BaseTransport):
command: Command to execute (e.g., "python", "node", "npx").
args: Command arguments (e.g., ["server.py"] or ["-y", "@mcp/server"]).
env: Environment variables to pass to the process.
allowed_commands: Optional set of allowed executable base names.
Defaults to :data:`DEFAULT_ALLOWED_COMMANDS`. Pass ``None``
to disable the allowlist check.
**kwargs: Additional transport options.
"""
super().__init__(**kwargs)
if allowed_commands is not None:
base_command = os.path.basename(command)
if base_command not in allowed_commands:
raise ValueError(
f"Command '{command}' (resolved to '{base_command}') is not in "
f"the allowed commands list: {sorted(allowed_commands)}. "
f"Pass allowed_commands=None to disable this check."
)
self.command = command
self.args = args or []
self.env = env or {}

View File

@@ -0,0 +1,174 @@
"""Tests for StdioTransport command allowlist validation."""
from unittest.mock import AsyncMock, patch
import pytest
from crewai.mcp.config import MCPServerStdio
from crewai.mcp.transports.stdio import DEFAULT_ALLOWED_COMMANDS, StdioTransport
class TestDefaultAllowedCommands:
"""Verify the default allowlist contains expected runtimes."""
def test_default_allowlist_contains_common_runtimes(self):
expected = {"python", "python3", "node", "npx", "uvx", "deno"}
assert expected == DEFAULT_ALLOWED_COMMANDS
def test_default_allowlist_is_frozenset(self):
assert isinstance(DEFAULT_ALLOWED_COMMANDS, frozenset)
class TestStdioTransportAllowlist:
"""StdioTransport should validate commands against an allowlist."""
# -- Allowed commands (happy path) --
@pytest.mark.parametrize("cmd", sorted(DEFAULT_ALLOWED_COMMANDS))
def test_default_allowed_commands_accepted(self, cmd: str):
transport = StdioTransport(command=cmd)
assert transport.command == cmd
def test_allowed_command_with_full_path(self):
"""Full paths should be validated by their basename."""
transport = StdioTransport(command="/usr/bin/python3")
assert transport.command == "/usr/bin/python3"
def test_allowed_command_with_relative_path(self):
transport = StdioTransport(command="./venv/bin/python")
assert transport.command == "./venv/bin/python"
def test_allowed_command_with_windows_style_path_requires_none(self):
"""Windows-style backslash paths aren't parsed by os.path.basename on
POSIX, so they must opt out of the allowlist to work cross-platform."""
transport = StdioTransport(
command="C:\\Python311\\python", allowed_commands=None
)
assert transport.command == "C:\\Python311\\python"
# -- Blocked commands --
def test_blocked_command_raises_value_error(self):
with pytest.raises(ValueError, match="not in the allowed commands list"):
StdioTransport(command="curl")
def test_blocked_command_with_full_path(self):
with pytest.raises(ValueError, match="not in the allowed commands list"):
StdioTransport(command="/usr/bin/curl")
def test_error_message_includes_command_name(self):
with pytest.raises(ValueError, match="curl"):
StdioTransport(command="curl")
def test_error_message_includes_sorted_allowlist(self):
with pytest.raises(ValueError, match=r"\['deno', 'node', 'npx'"):
StdioTransport(command="bash")
def test_error_message_suggests_disabling_check(self):
with pytest.raises(ValueError, match="allowed_commands=None"):
StdioTransport(command="bash")
# -- Opt-out: allowed_commands=None --
def test_none_disables_allowlist_check(self):
transport = StdioTransport(command="anything-goes", allowed_commands=None)
assert transport.command == "anything-goes"
def test_none_allows_arbitrary_path(self):
transport = StdioTransport(
command="/opt/custom/my-server", allowed_commands=None
)
assert transport.command == "/opt/custom/my-server"
# -- Custom allowlist --
def test_custom_allowlist_accepts_listed_command(self):
custom = frozenset({"my-runtime"})
transport = StdioTransport(command="my-runtime", allowed_commands=custom)
assert transport.command == "my-runtime"
def test_custom_allowlist_rejects_unlisted_command(self):
custom = frozenset({"my-runtime"})
with pytest.raises(ValueError, match="not in the allowed commands list"):
StdioTransport(command="python", allowed_commands=custom)
def test_custom_allowlist_as_set(self):
"""Plain sets should also work (Set[str] type hint)."""
custom = {"ruby", "perl"}
transport = StdioTransport(command="ruby", allowed_commands=custom)
assert transport.command == "ruby"
# -- Other constructor args still work --
def test_args_and_env_still_set(self):
transport = StdioTransport(
command="python",
args=["server.py", "--port", "8080"],
env={"KEY": "val"},
)
assert transport.args == ["server.py", "--port", "8080"]
assert transport.env == {"KEY": "val"}
def test_default_args_and_env(self):
transport = StdioTransport(command="node")
assert transport.args == []
assert transport.env == {}
class TestMCPServerStdioAllowedCommands:
"""MCPServerStdio config should expose allowed_commands field."""
def test_default_allowed_commands_on_config(self):
config = MCPServerStdio(command="python", args=["server.py"])
assert config.allowed_commands == DEFAULT_ALLOWED_COMMANDS
def test_custom_allowed_commands_on_config(self):
custom = frozenset({"ruby"})
config = MCPServerStdio(
command="ruby", args=["server.rb"], allowed_commands=custom
)
assert config.allowed_commands == custom
def test_none_allowed_commands_on_config(self):
config = MCPServerStdio(
command="my-binary", args=[], allowed_commands=None
)
assert config.allowed_commands is None
class TestToolResolverPassesAllowedCommands:
"""_create_transport should forward allowed_commands to StdioTransport."""
def test_create_transport_passes_allowed_commands(self):
from crewai.mcp.tool_resolver import MCPToolResolver
config = MCPServerStdio(
command="python",
args=["server.py"],
allowed_commands=DEFAULT_ALLOWED_COMMANDS,
)
transport, server_name = MCPToolResolver._create_transport(config)
assert isinstance(transport, StdioTransport)
assert transport.command == "python"
def test_create_transport_with_none_allowed_commands(self):
from crewai.mcp.tool_resolver import MCPToolResolver
config = MCPServerStdio(
command="custom-binary",
args=["--flag"],
allowed_commands=None,
)
transport, server_name = MCPToolResolver._create_transport(config)
assert isinstance(transport, StdioTransport)
assert transport.command == "custom-binary"
def test_create_transport_rejects_blocked_command(self):
from crewai.mcp.tool_resolver import MCPToolResolver
config = MCPServerStdio(
command="curl",
args=["http://evil.com"],
)
with pytest.raises(ValueError, match="not in the allowed commands list"):
MCPToolResolver._create_transport(config)