Compare commits

...

2 Commits

Author SHA1 Message Date
Iris Clawd
3bd055fcf3 feat: add command allowlist validation for MCP stdio transport
Add an optional allowed_commands parameter to StdioTransport that
validates the command basename against an allowlist before spawning
a subprocess. This provides defense-in-depth against configuration-
driven command injection as MCP server discovery becomes more dynamic.

- DEFAULT_ALLOWED_COMMANDS includes common runtimes: python, python3,
  node, npx, uvx, uv, deno, docker
- Validation checks os.path.basename(command) for cross-platform support
- Users can extend the allowlist, pass a custom set, or set
  allowed_commands=None to disable the check entirely
- No breaking change: all currently documented MCP server examples use
  commands in the default allowlist
- MCPServerStdio config model updated with allowed_commands field
- tool_resolver passes allowed_commands through to StdioTransport

Closes #5080
2026-03-30 22:08:51 +00:00
Lorenze Jay
bb9bcd6823 refactor: remove unused and methods from (#5172)
This commit cleans up the  class by removing the  and  methods, which are no longer needed. The changes help streamline the code and improve maintainability.
2026-03-30 15:01:58 -07:00
8 changed files with 167 additions and 56 deletions

View File

@@ -1966,37 +1966,6 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin):
"original_tool": original_tool,
}
def _extract_tool_name(self, tool_call: Any) -> str:
"""Extract tool name from various tool call formats."""
if hasattr(tool_call, "function"):
return sanitize_tool_name(tool_call.function.name)
if hasattr(tool_call, "function_call") and tool_call.function_call:
return sanitize_tool_name(tool_call.function_call.name)
if hasattr(tool_call, "name"):
return sanitize_tool_name(tool_call.name)
if isinstance(tool_call, dict):
func_info = tool_call.get("function", {})
return sanitize_tool_name(
func_info.get("name", "") or tool_call.get("name", "unknown")
)
return "unknown"
@router(execute_native_tool)
def check_native_todo_completion(
self,
) -> Literal["todo_satisfied", "todo_not_satisfied"]:
"""Check if the native tool execution satisfied the active todo.
Similar to check_todo_completion but for native tool execution path.
"""
current_todo = self.state.todos.current_todo
if not current_todo:
return "todo_not_satisfied"
# For native tools, any tool execution satisfies the todo
return "todo_satisfied"
@listen("initialized")
def continue_iteration(self) -> Literal["check_iteration"]:
"""Bridge listener that connects iteration loop back to iteration check."""

View File

@@ -7,6 +7,7 @@ various transport types, similar to OpenAI's Agents SDK.
from pydantic import BaseModel, Field
from crewai.mcp.filters import ToolFilter
from crewai.mcp.transports.stdio import DEFAULT_ALLOWED_COMMANDS
class MCPServerStdio(BaseModel):
@@ -44,6 +45,14 @@ class MCPServerStdio(BaseModel):
default=None,
description="Optional tool filter for filtering available tools.",
)
allowed_commands: frozenset[str] | None = Field(
default=DEFAULT_ALLOWED_COMMANDS,
description=(
"Optional frozenset of allowed command basenames for security validation. "
"Defaults to common runtimes (python, node, npx, uvx, uv, deno, docker). "
"Set to None to disable the allowlist check."
),
)
cache_tools_list: bool = Field(
default=False,
description="Whether to cache the tool list for faster subsequent access.",

View File

@@ -292,6 +292,7 @@ class MCPToolResolver:
command=mcp_config.command,
args=mcp_config.args,
env=mcp_config.env,
allowed_commands=mcp_config.allowed_commands,
)
server_name = f"{mcp_config.command}_{'_'.join(mcp_config.args)}"
elif isinstance(mcp_config, MCPServerHTTP):

View File

@@ -3,11 +3,12 @@
from crewai.mcp.transports.base import BaseTransport, TransportType
from crewai.mcp.transports.http import HTTPTransport
from crewai.mcp.transports.sse import SSETransport
from crewai.mcp.transports.stdio import StdioTransport
from crewai.mcp.transports.stdio import DEFAULT_ALLOWED_COMMANDS, StdioTransport
__all__ = [
"BaseTransport",
"DEFAULT_ALLOWED_COMMANDS",
"HTTPTransport",
"SSETransport",
"StdioTransport",

View File

@@ -9,6 +9,22 @@ from typing_extensions import Self
from crewai.mcp.transports.base import BaseTransport, TransportType
# Default allowlist for common MCP server runtimes.
# Covers the vast majority of MCP server launch commands.
# Pass ``allowed_commands=None`` to disable validation entirely.
DEFAULT_ALLOWED_COMMANDS: frozenset[str] = frozenset(
{
"python",
"python3",
"node",
"npx",
"uvx",
"uv",
"deno",
"docker",
}
)
class StdioTransport(BaseTransport):
"""Stdio transport for connecting to local MCP servers.
@@ -34,6 +50,7 @@ class StdioTransport(BaseTransport):
command: str,
args: list[str] | None = None,
env: dict[str, str] | None = None,
allowed_commands: frozenset[str] | None = DEFAULT_ALLOWED_COMMANDS,
**kwargs: Any,
) -> None:
"""Initialize stdio transport.
@@ -42,9 +59,24 @@ class StdioTransport(BaseTransport):
command: Command to execute (e.g., "python", "node", "npx").
args: Command arguments (e.g., ["server.py"] or ["-y", "@mcp/server"]).
env: Environment variables to pass to the process.
allowed_commands: Optional frozenset of allowed command basenames.
Defaults to ``DEFAULT_ALLOWED_COMMANDS`` which includes common
runtimes (python, node, npx, uvx, uv, deno, docker). Pass
``None`` to disable the check entirely.
**kwargs: Additional transport options.
"""
super().__init__(**kwargs)
if allowed_commands is not None:
base_command = os.path.basename(command)
if base_command not in allowed_commands:
raise ValueError(
f"Command '{command}' is not in the allowed commands list: "
f"{sorted(allowed_commands)}. "
f"To allow this command, add it to allowed_commands or pass "
f"allowed_commands=None to disable this check."
)
self.command = command
self.args = args or []
self.env = env or {}

View File

@@ -879,30 +879,6 @@ class TestNativeToolExecution:
assert len(tool_messages) == 1
assert tool_messages[0]["tool_call_id"] == "call_1"
def test_check_native_todo_completion_requires_current_todo(
self, mock_dependencies
):
from crewai.utilities.planning_types import TodoList
executor = AgentExecutor(**mock_dependencies)
# No current todo → not satisfied
executor.state.todos = TodoList(items=[])
assert executor.check_native_todo_completion() == "todo_not_satisfied"
# With a current todo that has tool_to_use → satisfied
running = TodoItem(
step_number=1,
description="Use the expected tool",
tool_to_use="expected_tool",
status="running",
)
executor.state.todos = TodoList(items=[running])
assert executor.check_native_todo_completion() == "todo_satisfied"
# With a current todo without tool_to_use → still satisfied
running.tool_to_use = None
assert executor.check_native_todo_completion() == "todo_satisfied"
class TestPlannerObserver:

View File

@@ -0,0 +1,30 @@
"""Tests for MCPServerStdio allowed_commands config integration."""
import pytest
from crewai.mcp.config import MCPServerStdio
from crewai.mcp.transports.stdio import DEFAULT_ALLOWED_COMMANDS
class TestMCPServerStdioConfig:
"""Tests for the allowed_commands field on MCPServerStdio."""
def test_default_allowed_commands(self):
"""MCPServerStdio should default to DEFAULT_ALLOWED_COMMANDS."""
config = MCPServerStdio(command="python", args=["server.py"])
assert config.allowed_commands == DEFAULT_ALLOWED_COMMANDS
def test_custom_allowed_commands(self):
"""Users can override allowed_commands in config."""
custom = frozenset({"my-runtime"})
config = MCPServerStdio(
command="my-runtime", args=[], allowed_commands=custom
)
assert config.allowed_commands == custom
def test_none_allowed_commands(self):
"""Users can disable the allowlist via config."""
config = MCPServerStdio(
command="anything", args=[], allowed_commands=None
)
assert config.allowed_commands is None

View File

@@ -0,0 +1,93 @@
"""Tests for StdioTransport command allowlist validation."""
import pytest
from crewai.mcp.transports.stdio import DEFAULT_ALLOWED_COMMANDS, StdioTransport
class TestStdioTransportAllowlist:
"""Tests for the command allowlist feature."""
def test_default_allowed_commands_contains_common_runtimes(self):
"""DEFAULT_ALLOWED_COMMANDS should include all common MCP server runtimes."""
expected = {"python", "python3", "node", "npx", "uvx", "uv", "deno", "docker"}
assert expected == DEFAULT_ALLOWED_COMMANDS
def test_allowed_command_passes_validation(self):
"""Commands in the default allowlist should be accepted."""
for cmd in DEFAULT_ALLOWED_COMMANDS:
transport = StdioTransport(command=cmd, args=["server.py"])
assert transport.command == cmd
def test_allowed_command_with_full_path(self):
"""Full paths to allowed commands should pass (basename is checked)."""
transport = StdioTransport(command="/usr/bin/python3", args=["server.py"])
assert transport.command == "/usr/bin/python3"
def test_disallowed_command_raises_value_error(self):
"""Commands not in the allowlist should raise ValueError."""
with pytest.raises(ValueError, match="not in the allowed commands list"):
StdioTransport(command="malicious-binary", args=["--evil"])
def test_disallowed_command_with_full_path_raises(self):
"""Full paths to disallowed commands should also be rejected."""
with pytest.raises(ValueError, match="not in the allowed commands list"):
StdioTransport(command="/tmp/evil/script", args=[])
def test_allowed_commands_none_disables_validation(self):
"""Setting allowed_commands=None should disable the check entirely."""
transport = StdioTransport(
command="any-custom-binary",
args=["--flag"],
allowed_commands=None,
)
assert transport.command == "any-custom-binary"
def test_custom_allowlist(self):
"""Users should be able to pass a custom allowlist."""
custom = frozenset({"my-server", "python"})
# Allowed
transport = StdioTransport(
command="my-server", args=[], allowed_commands=custom
)
assert transport.command == "my-server"
# Not allowed
with pytest.raises(ValueError, match="not in the allowed commands list"):
StdioTransport(command="node", args=[], allowed_commands=custom)
def test_extended_allowlist(self):
"""Users should be able to extend the default allowlist."""
extended = DEFAULT_ALLOWED_COMMANDS | frozenset({"my-custom-runtime"})
transport = StdioTransport(
command="my-custom-runtime", args=[], allowed_commands=extended
)
assert transport.command == "my-custom-runtime"
# Original defaults still work
transport2 = StdioTransport(
command="python", args=["server.py"], allowed_commands=extended
)
assert transport2.command == "python"
def test_error_message_includes_sorted_allowed_commands(self):
"""The error message should list the allowed commands for discoverability."""
with pytest.raises(ValueError) as exc_info:
StdioTransport(command="bad-cmd", args=[])
error_msg = str(exc_info.value)
assert "bad-cmd" in error_msg
assert "allowed_commands=None" in error_msg
def test_args_and_env_still_work(self):
"""Existing args and env functionality should be unaffected."""
transport = StdioTransport(
command="python",
args=["server.py", "--port", "8080"],
env={"API_KEY": "test123"},
)
assert transport.command == "python"
assert transport.args == ["server.py", "--port", "8080"]
assert transport.env == {"API_KEY": "test123"}