Compare commits

...

2 Commits

Author SHA1 Message Date
Devin AI
3158e50a8e fix: prevent circular reference in signed envelope and remove unused variable
- Use dict(cleaned_arguments) copy in message dict to avoid circular
  reference when envelope is stored back into cleaned_arguments
- Remove unused 'tools' variable in test_resolver_no_security_when_not_configured

Co-Authored-By: João <joao@crewai.com>
2026-03-14 17:57:21 +00:00
Devin AI
909ebd869b feat: add MCP security layer for message signing, tool integrity, and replay protection
Addresses issue #4875 - MCP tool calling has no per-message authentication
or integrity verification.

Adds:
- MCPSecurityConfig: Pydantic model for security settings (agent identity,
  key pairs, Trust Authority keys, signing/verification toggles)
- MCPSecurityManager: Stateful manager handling ECDSA P-256 key generation,
  agent passport creation/signing, message signing/verification, tool
  integrity checks, and nonce-based replay protection
- Integration into MCPClient, MCPToolResolver, and all config models
  (MCPServerStdio, MCPServerHTTP, MCPServerSSE)
- 30 comprehensive tests covering all security features and graceful
  degradation when mcp-secure is not installed

Uses the mcp-secure library (optional dependency) which implements the
IETF draft-sharif-mcps-secure-mcp specification.

Co-Authored-By: João <joao@crewai.com>
2026-03-14 17:47:51 +00:00
6 changed files with 916 additions and 0 deletions

View File

@@ -18,6 +18,7 @@ from crewai.mcp.filters import (
create_dynamic_tool_filter,
create_static_tool_filter,
)
from crewai.mcp.security import MCPSecurityConfig, MCPSecurityManager
from crewai.mcp.tool_resolver import MCPToolResolver
from crewai.mcp.transports.base import BaseTransport, TransportType
@@ -25,6 +26,8 @@ from crewai.mcp.transports.base import BaseTransport, TransportType
__all__ = [
"BaseTransport",
"MCPClient",
"MCPSecurityConfig",
"MCPSecurityManager",
"MCPServerConfig",
"MCPServerHTTP",
"MCPServerSSE",

View File

@@ -27,6 +27,7 @@ from crewai.events.types.mcp_events import (
MCPToolExecutionFailedEvent,
MCPToolExecutionStartedEvent,
)
from crewai.mcp.security import MCPSecurityManager
from crewai.mcp.transports.base import BaseTransport
from crewai.mcp.transports.http import HTTPTransport
from crewai.mcp.transports.sse import SSETransport
@@ -77,6 +78,7 @@ class MCPClient:
max_retries: int = MCP_MAX_RETRIES,
cache_tools_list: bool = False,
logger: logging.Logger | None = None,
security_manager: MCPSecurityManager | None = None,
) -> None:
"""Initialize MCP client.
@@ -88,6 +90,8 @@ class MCPClient:
max_retries: Maximum retry attempts for operations.
cache_tools_list: Whether to cache tool list results.
logger: Optional logger instance.
security_manager: Optional security manager for message signing
and tool integrity verification.
"""
self.transport = transport
self.connect_timeout = connect_timeout
@@ -95,6 +99,7 @@ class MCPClient:
self.discovery_timeout = discovery_timeout
self.max_retries = max_retries
self.cache_tools_list = cache_tools_list
self.security_manager = security_manager
# self._logger = logger or logging.getLogger(__name__)
self._session: Any = None
self._initialized = False
@@ -439,6 +444,10 @@ class MCPClient:
) -> Any:
"""Call a tool on the MCP server.
When a :class:`MCPSecurityManager` is attached, the outgoing JSON-RPC
message is signed before transmission so the server can verify the
caller's identity and detect tampering.
Args:
tool_name: Name of the tool to call.
arguments: Tool arguments.
@@ -452,6 +461,24 @@ class MCPClient:
arguments = arguments or {}
cleaned_arguments = self._clean_tool_arguments(arguments)
# Sign the outgoing message when a security manager is present.
# We use a shallow copy of cleaned_arguments inside the message dict
# so that storing the envelope back into cleaned_arguments does not
# create a circular reference (envelope -> message -> params -> arguments).
if self.security_manager is not None:
message = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": dict(cleaned_arguments),
},
}
signed_envelope = self.security_manager.sign_message(message)
# Attach the signed envelope as metadata so the server can verify
if signed_envelope is not message:
cleaned_arguments["_mcps_envelope"] = signed_envelope
# Get server info for events
server_name, server_url, transport_type = self._get_server_info()

View File

@@ -4,9 +4,12 @@ This module provides Pydantic models for configuring MCP servers with
various transport types, similar to OpenAI's Agents SDK.
"""
from __future__ import annotations
from pydantic import BaseModel, Field
from crewai.mcp.filters import ToolFilter
from crewai.mcp.security import MCPSecurityConfig
class MCPServerStdio(BaseModel):
@@ -48,6 +51,13 @@ class MCPServerStdio(BaseModel):
default=False,
description="Whether to cache the tool list for faster subsequent access.",
)
security: MCPSecurityConfig | None = Field(
default=None,
description=(
"Optional security configuration for cryptographic identity, "
"message signing, and tool integrity verification."
),
)
class MCPServerHTTP(BaseModel):
@@ -85,6 +95,13 @@ class MCPServerHTTP(BaseModel):
default=False,
description="Whether to cache the tool list for faster subsequent access.",
)
security: MCPSecurityConfig | None = Field(
default=None,
description=(
"Optional security configuration for cryptographic identity, "
"message signing, and tool integrity verification."
),
)
class MCPServerSSE(BaseModel):
@@ -118,6 +135,13 @@ class MCPServerSSE(BaseModel):
default=False,
description="Whether to cache the tool list for faster subsequent access.",
)
security: MCPSecurityConfig | None = Field(
default=None,
description=(
"Optional security configuration for cryptographic identity, "
"message signing, and tool integrity verification."
),
)
# Type alias for all MCP server configurations

View File

@@ -0,0 +1,360 @@
"""MCP security layer for CrewAI agents.
This module provides cryptographic identity, message signing, tool integrity
verification, and replay protection for MCP tool calls, addressing the security
gaps described in OWASP MCP Top 10.
It wraps the ``mcp-secure`` library (optional dependency) and exposes:
- :class:`MCPSecurityConfig` -- Pydantic model for security settings.
- :class:`MCPSecurityManager` -- Stateful manager handling key generation,
passport creation/signing, message signing/verification, tool integrity
checks, and nonce-based replay protection.
When ``mcp-secure`` is not installed, :meth:`MCPSecurityManager.is_available`
returns ``False`` and all operations gracefully degrade to no-ops.
Example:
```python
from crewai.mcp.security import MCPSecurityConfig, MCPSecurityManager
config = MCPSecurityConfig(
agent_name="my-agent",
agent_version="1.0.0",
capabilities=["read", "write"],
)
manager = MCPSecurityManager(config)
# Sign a tool call message
message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1}
envelope = manager.sign_message(message)
# Verify a tool definition
tool = {"name": "read_file", "description": "...", "inputSchema": {}}
signature = "..." # from server
is_valid = manager.verify_tool(tool, signature)
```
"""
from __future__ import annotations
import logging
from typing import Any
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
def _mcp_secure_available() -> bool:
"""Check whether the ``mcp-secure`` package is importable."""
try:
import mcp_secure # noqa: F401
return True
except ImportError:
return False
class MCPSecurityConfig(BaseModel):
"""Configuration for MCP security features.
All fields have sensible defaults so that a minimal configuration like
``MCPSecurityConfig()`` auto-generates keys and creates an unsigned
passport. For production use, supply a Trust Authority key pair so that
passports are cryptographically signed and verifiable.
Example:
```python
# Minimal -- keys auto-generated, passport self-signed
config = MCPSecurityConfig(agent_name="researcher")
# Production -- TA-signed passport, tool verification enabled
config = MCPSecurityConfig(
agent_name="researcher",
agent_version="2.0.0",
capabilities=["search", "summarize"],
ta_public_key="<PEM public key>",
ta_private_key="<PEM private key>",
verify_tool_signatures=True,
sign_messages=True,
)
```
"""
agent_name: str = Field(
default="crewai-agent",
description="Human-readable name for the agent passport.",
)
agent_version: str = Field(
default="1.0.0",
description="Semantic version included in the agent passport.",
)
capabilities: list[str] = Field(
default_factory=list,
description="List of capability strings for the agent passport.",
)
# Agent key pair -- auto-generated when left as ``None``.
private_key: str | None = Field(
default=None,
description=(
"PEM-encoded ECDSA P-256 private key for signing messages. "
"Auto-generated if not provided."
),
)
public_key: str | None = Field(
default=None,
description=(
"PEM-encoded ECDSA P-256 public key matching the private key. "
"Auto-generated if not provided."
),
)
# Trust Authority key pair -- used to sign and verify passports.
ta_private_key: str | None = Field(
default=None,
description=(
"PEM-encoded private key of the Trust Authority. "
"When provided, passports are TA-signed."
),
)
ta_public_key: str | None = Field(
default=None,
description=(
"PEM-encoded public key of the Trust Authority. "
"When provided, incoming passports can be verified."
),
)
sign_messages: bool = Field(
default=True,
description="Whether to sign outgoing MCP JSON-RPC messages.",
)
verify_tool_signatures: bool = Field(
default=True,
description="Whether to verify tool definition signatures from the server.",
)
class MCPSecurityManager:
"""Manages MCP security operations for a single agent.
Handles key generation, passport lifecycle, message signing/verification,
tool integrity checks, and replay protection via nonce tracking.
The manager is safe to share across MCP clients that belong to the same
agent; each call to :meth:`sign_message` generates a unique nonce.
"""
def __init__(self, config: MCPSecurityConfig) -> None:
self._config = config
self._passport: dict[str, Any] | None = None
self._nonce_store: Any | None = None
self._private_key: str | None = config.private_key
self._public_key: str | None = config.public_key
if _mcp_secure_available():
self._initialize()
@staticmethod
def is_available() -> bool:
"""Return ``True`` when the ``mcp-secure`` package is installed."""
return _mcp_secure_available()
@property
def passport(self) -> dict[str, Any] | None:
"""Return the current agent passport, or ``None`` if unavailable."""
return self._passport
@property
def passport_id(self) -> str | None:
"""Return the passport ID, or ``None`` if no passport exists."""
if self._passport is None:
return None
return self._passport.get("passport_id")
@property
def public_key(self) -> str | None:
"""Return the agent's public key."""
return self._public_key
# ------------------------------------------------------------------
# Initialization
# ------------------------------------------------------------------
def _initialize(self) -> None:
"""Generate keys (if needed), create passport, and set up nonce store."""
from mcp_secure import (
NonceStore,
create_passport,
generate_key_pair,
sign_passport,
)
# Generate agent keys if not provided
if self._private_key is None or self._public_key is None:
keys = generate_key_pair()
self._private_key = keys["private_key"]
self._public_key = keys["public_key"]
# Create passport
passport = create_passport(
name=self._config.agent_name,
version=self._config.agent_version,
public_key=self._public_key,
capabilities=self._config.capabilities,
)
# Sign with Trust Authority key if available
if self._config.ta_private_key is not None:
passport = sign_passport(passport, self._config.ta_private_key)
self._passport = passport
self._nonce_store = NonceStore()
# ------------------------------------------------------------------
# Message signing / verification
# ------------------------------------------------------------------
def sign_message(self, message: dict[str, Any]) -> dict[str, Any]:
"""Sign an outgoing MCP JSON-RPC message.
Returns the original *message* unchanged when security is unavailable
or message signing is disabled.
Args:
message: The JSON-RPC message dict to sign.
Returns:
A signed envelope dict (when signing is active) or the original
message (when signing is inactive).
"""
if not self.is_available() or not self._config.sign_messages:
return message
if self._passport is None or self._private_key is None:
return message
from mcp_secure import sign_message
return sign_message(message, self._passport["passport_id"], self._private_key)
def verify_message(self, envelope: dict[str, Any], sender_public_key: str) -> bool:
"""Verify an incoming signed MCP message envelope.
Args:
envelope: The signed envelope dict.
sender_public_key: PEM-encoded public key of the sender.
Returns:
``True`` if the envelope signature and nonce are valid.
"""
if not self.is_available():
return True
from mcp_secure import verify_message
result = verify_message(envelope, sender_public_key)
if not result.get("valid", False):
logger.warning("MCP message verification failed: %s", result.get("error"))
return False
# Replay protection
nonce = envelope.get("nonce")
if nonce is not None and self._nonce_store is not None:
if not self._nonce_store.check(nonce):
logger.warning("MCP message replay detected (duplicate nonce)")
return False
return True
# ------------------------------------------------------------------
# Tool integrity
# ------------------------------------------------------------------
def sign_tool(self, tool_definition: dict[str, Any]) -> str | None:
"""Sign a tool definition.
Args:
tool_definition: Tool definition dict (name, description, inputSchema).
Returns:
Signature string, or ``None`` when security is unavailable.
"""
if not self.is_available() or self._private_key is None:
return None
from mcp_secure import sign_tool
return sign_tool(tool_definition, self._private_key)
def verify_tool(
self,
tool_definition: dict[str, Any],
signature: str,
signer_public_key: str | None = None,
) -> bool:
"""Verify a tool definition signature.
Args:
tool_definition: Tool definition dict.
signature: Signature string to verify.
signer_public_key: PEM-encoded public key of the signer.
Falls back to the agent's own public key if ``None``.
Returns:
``True`` if the signature is valid, ``False`` otherwise.
Always returns ``True`` when security is unavailable or
tool verification is disabled.
"""
if not self.is_available() or not self._config.verify_tool_signatures:
return True
key = signer_public_key or self._public_key
if key is None:
return True
from mcp_secure import verify_tool
return verify_tool(tool_definition, signature, key)
# ------------------------------------------------------------------
# Passport verification
# ------------------------------------------------------------------
def verify_passport(
self, passport: dict[str, Any], ta_public_key: str | None = None
) -> bool:
"""Verify a remote passport signature.
Args:
passport: Passport dict to verify.
ta_public_key: Trust Authority public key. Falls back to the
configured ``ta_public_key`` if ``None``.
Returns:
``True`` if the passport signature is valid.
"""
if not self.is_available():
return True
key = ta_public_key or self._config.ta_public_key
if key is None:
return True
from mcp_secure import verify_passport_signature
return verify_passport_signature(passport, key)
# ------------------------------------------------------------------
# Cleanup
# ------------------------------------------------------------------
def destroy(self) -> None:
"""Release resources held by the security manager."""
if self._nonce_store is not None:
self._nonce_store.destroy()
self._nonce_store = None
self._passport = None

View File

@@ -23,6 +23,7 @@ from crewai.mcp.config import (
MCPServerSSE,
MCPServerStdio,
)
from crewai.mcp.security import MCPSecurityConfig, MCPSecurityManager
from crewai.mcp.transports.http import HTTPTransport
from crewai.mcp.transports.sse import SSETransport
from crewai.mcp.transports.stdio import StdioTransport
@@ -324,9 +325,19 @@ class MCPToolResolver:
from crewai.tools.mcp_native_tool import MCPNativeTool
discovery_transport, server_name = self._create_transport(mcp_config)
# Build a security manager when the config has security settings
security_manager: MCPSecurityManager | None = None
security_config: MCPSecurityConfig | None = getattr(
mcp_config, "security", None
)
if security_config is not None and MCPSecurityManager.is_available():
security_manager = MCPSecurityManager(security_config)
discovery_client = MCPClient(
transport=discovery_transport,
cache_tools_list=mcp_config.cache_tools_list,
security_manager=security_manager,
)
async def _setup_client_and_list_tools() -> list[dict[str, Any]]:
@@ -404,6 +415,7 @@ class MCPToolResolver:
return MCPClient(
transport=transport,
cache_tools_list=mcp_config.cache_tools_list,
security_manager=security_manager,
)
tools = []

View File

@@ -0,0 +1,490 @@
"""Tests for MCP security integration (issue #4875).
Covers:
- MCPSecurityConfig model validation
- MCPSecurityManager key generation, passport creation, message signing,
tool integrity verification, replay protection, and graceful degradation
- Integration with MCPClient and MCPToolResolver via security_manager
- Config models accepting optional security field
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from crewai.mcp.config import MCPServerHTTP, MCPServerSSE, MCPServerStdio
from crewai.mcp.security import MCPSecurityConfig, MCPSecurityManager
# ---------------------------------------------------------------------------
# MCPSecurityConfig tests
# ---------------------------------------------------------------------------
class TestMCPSecurityConfig:
"""Tests for the MCPSecurityConfig Pydantic model."""
def test_default_values(self):
config = MCPSecurityConfig()
assert config.agent_name == "crewai-agent"
assert config.agent_version == "1.0.0"
assert config.capabilities == []
assert config.private_key is None
assert config.public_key is None
assert config.ta_private_key is None
assert config.ta_public_key is None
assert config.sign_messages is True
assert config.verify_tool_signatures is True
def test_custom_values(self):
config = MCPSecurityConfig(
agent_name="researcher",
agent_version="2.0.0",
capabilities=["search", "summarize"],
sign_messages=False,
verify_tool_signatures=False,
)
assert config.agent_name == "researcher"
assert config.agent_version == "2.0.0"
assert config.capabilities == ["search", "summarize"]
assert config.sign_messages is False
assert config.verify_tool_signatures is False
# ---------------------------------------------------------------------------
# MCPSecurityManager tests
# ---------------------------------------------------------------------------
class TestMCPSecurityManager:
"""Tests for the MCPSecurityManager class."""
def test_is_available(self):
"""mcp-secure is installed in the test env so this should be True."""
assert MCPSecurityManager.is_available() is True
def test_auto_generates_keys(self):
config = MCPSecurityConfig(agent_name="test-agent")
manager = MCPSecurityManager(config)
assert manager.public_key is not None
assert manager._private_key is not None
assert manager.passport is not None
assert manager.passport_id is not None
def test_uses_provided_keys(self):
from mcp_secure import generate_key_pair
keys = generate_key_pair()
config = MCPSecurityConfig(
agent_name="test-agent",
private_key=keys["private_key"],
public_key=keys["public_key"],
)
manager = MCPSecurityManager(config)
assert manager._private_key == keys["private_key"]
assert manager.public_key == keys["public_key"]
def test_passport_creation(self):
config = MCPSecurityConfig(
agent_name="researcher",
agent_version="2.0.0",
capabilities=["search"],
)
manager = MCPSecurityManager(config)
passport = manager.passport
assert passport is not None
assert passport["agent"]["name"] == "researcher"
assert passport["agent"]["version"] == "2.0.0"
assert "search" in passport["agent"]["capabilities"]
def test_passport_signed_by_ta(self):
from mcp_secure import generate_key_pair, verify_passport_signature
ta_keys = generate_key_pair()
config = MCPSecurityConfig(
agent_name="test-agent",
ta_private_key=ta_keys["private_key"],
ta_public_key=ta_keys["public_key"],
)
manager = MCPSecurityManager(config)
assert manager.passport is not None
assert "signature" in manager.passport
assert verify_passport_signature(manager.passport, ta_keys["public_key"])
# -- Message signing / verification ------------------------------------
def test_sign_message(self):
config = MCPSecurityConfig(agent_name="test-agent")
manager = MCPSecurityManager(config)
message = {
"jsonrpc": "2.0",
"method": "tools/call",
"id": 1,
"params": {"name": "read_file", "arguments": {"path": "/tmp/test"}},
}
envelope = manager.sign_message(message)
# Envelope must contain MCPS fields
assert "mcps" in envelope
assert "passport_id" in envelope
assert "timestamp" in envelope
assert "nonce" in envelope
assert "signature" in envelope
assert "message" in envelope
def test_sign_message_disabled(self):
config = MCPSecurityConfig(agent_name="test-agent", sign_messages=False)
manager = MCPSecurityManager(config)
message = {"jsonrpc": "2.0", "method": "tools/list", "id": 1}
result = manager.sign_message(message)
# Should return the original message unchanged
assert result is message
def test_verify_own_message(self):
config = MCPSecurityConfig(agent_name="test-agent")
manager = MCPSecurityManager(config)
message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1}
envelope = manager.sign_message(message)
assert manager.verify_message(envelope, manager.public_key)
def test_verify_tampered_message_fails(self):
config = MCPSecurityConfig(agent_name="test-agent")
manager = MCPSecurityManager(config)
message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1}
envelope = manager.sign_message(message)
# Tamper with the message inside the envelope
envelope["message"]["method"] = "tools/evil"
assert manager.verify_message(envelope, manager.public_key) is False
def test_replay_protection(self):
"""Same nonce used twice should be detected as replay."""
config = MCPSecurityConfig(agent_name="test-agent")
manager = MCPSecurityManager(config)
message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1}
envelope = manager.sign_message(message)
# First verification should pass
assert manager.verify_message(envelope, manager.public_key) is True
# Second verification with same nonce should fail (replay)
assert manager.verify_message(envelope, manager.public_key) is False
def test_unique_nonces(self):
"""Each signed message should have a unique nonce."""
config = MCPSecurityConfig(agent_name="test-agent")
manager = MCPSecurityManager(config)
message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1}
envelope1 = manager.sign_message(message)
envelope2 = manager.sign_message(message)
assert envelope1["nonce"] != envelope2["nonce"]
# -- Tool integrity ----------------------------------------------------
def test_sign_and_verify_tool(self):
config = MCPSecurityConfig(agent_name="test-agent")
manager = MCPSecurityManager(config)
tool = {
"name": "read_file",
"description": "Read a file from disk",
"inputSchema": {"type": "object", "properties": {"path": {"type": "string"}}},
}
signature = manager.sign_tool(tool)
assert signature is not None
assert manager.verify_tool(tool, signature) is True
def test_verify_tampered_tool_fails(self):
config = MCPSecurityConfig(agent_name="test-agent")
manager = MCPSecurityManager(config)
tool = {
"name": "read_file",
"description": "Read a file from disk",
"inputSchema": {"type": "object"},
}
signature = manager.sign_tool(tool)
# Tamper with the tool definition
tool["description"] = "EVIL: delete all files"
assert manager.verify_tool(tool, signature) is False
def test_verify_tool_disabled(self):
config = MCPSecurityConfig(
agent_name="test-agent", verify_tool_signatures=False
)
manager = MCPSecurityManager(config)
tool = {"name": "anything", "description": "whatever"}
# Should always return True when verification is disabled
assert manager.verify_tool(tool, "invalid-signature") is True
# -- Passport verification ---------------------------------------------
def test_verify_passport(self):
from mcp_secure import generate_key_pair
ta_keys = generate_key_pair()
config = MCPSecurityConfig(
agent_name="test-agent",
ta_private_key=ta_keys["private_key"],
ta_public_key=ta_keys["public_key"],
)
manager = MCPSecurityManager(config)
assert manager.verify_passport(manager.passport, ta_keys["public_key"]) is True
# -- Cleanup -----------------------------------------------------------
def test_destroy(self):
config = MCPSecurityConfig(agent_name="test-agent")
manager = MCPSecurityManager(config)
assert manager.passport is not None
manager.destroy()
assert manager.passport is None
# -- Graceful degradation when mcp-secure not installed ----------------
def test_graceful_degradation_sign_message(self):
"""sign_message returns the original message when mcp-secure is unavailable."""
config = MCPSecurityConfig(agent_name="test-agent")
with patch("crewai.mcp.security._mcp_secure_available", return_value=False):
manager = MCPSecurityManager(config)
message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1}
result = manager.sign_message(message)
assert result is message
def test_graceful_degradation_verify_message(self):
"""verify_message returns True when mcp-secure is unavailable."""
config = MCPSecurityConfig(agent_name="test-agent")
with patch("crewai.mcp.security._mcp_secure_available", return_value=False):
manager = MCPSecurityManager(config)
assert manager.verify_message({}, "any-key") is True
def test_graceful_degradation_verify_tool(self):
"""verify_tool returns True when mcp-secure is unavailable."""
config = MCPSecurityConfig(agent_name="test-agent")
with patch("crewai.mcp.security._mcp_secure_available", return_value=False):
manager = MCPSecurityManager(config)
assert manager.verify_tool({"name": "x"}, "sig") is True
def test_graceful_degradation_sign_tool(self):
"""sign_tool returns None when mcp-secure is unavailable."""
config = MCPSecurityConfig(agent_name="test-agent")
with patch("crewai.mcp.security._mcp_secure_available", return_value=False):
manager = MCPSecurityManager(config)
assert manager.sign_tool({"name": "x"}) is None
# ---------------------------------------------------------------------------
# Config models accept security field
# ---------------------------------------------------------------------------
class TestConfigSecurityField:
"""Verify all MCP config models accept the optional security field."""
def test_stdio_config_with_security(self):
security = MCPSecurityConfig(agent_name="stdio-agent")
config = MCPServerStdio(
command="python",
args=["server.py"],
security=security,
)
assert config.security is not None
assert config.security.agent_name == "stdio-agent"
def test_http_config_with_security(self):
security = MCPSecurityConfig(agent_name="http-agent")
config = MCPServerHTTP(
url="https://example.com/mcp",
security=security,
)
assert config.security is not None
assert config.security.agent_name == "http-agent"
def test_sse_config_with_security(self):
security = MCPSecurityConfig(agent_name="sse-agent")
config = MCPServerSSE(
url="https://example.com/mcp/sse",
security=security,
)
assert config.security is not None
assert config.security.agent_name == "sse-agent"
def test_config_without_security(self):
"""Security is optional -- configs without it should still work."""
config = MCPServerHTTP(url="https://example.com/mcp")
assert config.security is None
# ---------------------------------------------------------------------------
# MCPClient integration
# ---------------------------------------------------------------------------
class TestMCPClientSecurityIntegration:
"""Verify MCPClient passes signed envelopes when security_manager is set."""
@pytest.mark.asyncio
async def test_call_tool_signs_message(self):
"""call_tool should attach _mcps_envelope when security is enabled."""
from crewai.mcp.client import MCPClient
config = MCPSecurityConfig(agent_name="test-agent")
manager = MCPSecurityManager(config)
transport = MagicMock()
transport.connected = True
transport.transport_type = MagicMock()
transport.transport_type.value = "http"
transport.url = "https://example.com/mcp"
client = MCPClient(transport=transport, security_manager=manager)
client._initialized = True
# Mock session.call_tool
mock_result = MagicMock()
mock_result.isError = False
mock_result.content = [MagicMock(text="success")]
client._session = MagicMock()
client._session.call_tool = AsyncMock(return_value=mock_result)
result = await client.call_tool("read_file", {"path": "/tmp/test"})
# Verify call_tool was called with arguments containing _mcps_envelope
call_args = client._session.call_tool.call_args
actual_args = call_args[0][1] # second positional arg = arguments dict
assert "_mcps_envelope" in actual_args
envelope = actual_args["_mcps_envelope"]
assert "signature" in envelope
assert "nonce" in envelope
assert "passport_id" in envelope
@pytest.mark.asyncio
async def test_call_tool_without_security(self):
"""call_tool should NOT attach _mcps_envelope when security is None."""
from crewai.mcp.client import MCPClient
transport = MagicMock()
transport.connected = True
transport.transport_type = MagicMock()
transport.transport_type.value = "http"
transport.url = "https://example.com/mcp"
client = MCPClient(transport=transport, security_manager=None)
client._initialized = True
mock_result = MagicMock()
mock_result.isError = False
mock_result.content = [MagicMock(text="success")]
client._session = MagicMock()
client._session.call_tool = AsyncMock(return_value=mock_result)
await client.call_tool("read_file", {"path": "/tmp/test"})
call_args = client._session.call_tool.call_args
actual_args = call_args[0][1]
assert "_mcps_envelope" not in actual_args
# ---------------------------------------------------------------------------
# MCPToolResolver integration
# ---------------------------------------------------------------------------
class TestToolResolverSecurityIntegration:
"""Verify MCPToolResolver creates a security manager from config."""
def test_resolver_creates_security_manager(self):
"""When config has security, resolver should pass security_manager to MCPClient."""
security = MCPSecurityConfig(agent_name="test-agent")
http_config = MCPServerHTTP(
url="https://example.com/mcp",
security=security,
)
mock_tool_definitions = [
{
"name": "test_tool",
"description": "A test tool",
"inputSchema": {"type": "object"},
}
]
with patch("crewai.mcp.tool_resolver.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
from crewai.agent.core import Agent
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
mcps=[http_config],
)
tools = agent.get_mcp_tools([http_config])
# The MCPClient constructor should have received a security_manager
call_kwargs = mock_client_class.call_args.kwargs
assert "security_manager" in call_kwargs
assert call_kwargs["security_manager"] is not None
assert isinstance(call_kwargs["security_manager"], MCPSecurityManager)
def test_resolver_no_security_when_not_configured(self):
"""When config has no security, security_manager should be None."""
http_config = MCPServerHTTP(url="https://example.com/mcp")
mock_tool_definitions = [
{
"name": "test_tool",
"description": "A test tool",
"inputSchema": {"type": "object"},
}
]
with patch("crewai.mcp.tool_resolver.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
from crewai.agent.core import Agent
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
mcps=[http_config],
)
agent.get_mcp_tools([http_config])
call_kwargs = mock_client_class.call_args.kwargs
assert call_kwargs.get("security_manager") is None