mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-03-15 16:28:14 +00:00
Compare commits
2 Commits
gl/chore/r
...
devin/1773
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3158e50a8e | ||
|
|
909ebd869b |
@@ -18,6 +18,7 @@ from crewai.mcp.filters import (
|
||||
create_dynamic_tool_filter,
|
||||
create_static_tool_filter,
|
||||
)
|
||||
from crewai.mcp.security import MCPSecurityConfig, MCPSecurityManager
|
||||
from crewai.mcp.tool_resolver import MCPToolResolver
|
||||
from crewai.mcp.transports.base import BaseTransport, TransportType
|
||||
|
||||
@@ -25,6 +26,8 @@ from crewai.mcp.transports.base import BaseTransport, TransportType
|
||||
__all__ = [
|
||||
"BaseTransport",
|
||||
"MCPClient",
|
||||
"MCPSecurityConfig",
|
||||
"MCPSecurityManager",
|
||||
"MCPServerConfig",
|
||||
"MCPServerHTTP",
|
||||
"MCPServerSSE",
|
||||
|
||||
@@ -27,6 +27,7 @@ from crewai.events.types.mcp_events import (
|
||||
MCPToolExecutionFailedEvent,
|
||||
MCPToolExecutionStartedEvent,
|
||||
)
|
||||
from crewai.mcp.security import MCPSecurityManager
|
||||
from crewai.mcp.transports.base import BaseTransport
|
||||
from crewai.mcp.transports.http import HTTPTransport
|
||||
from crewai.mcp.transports.sse import SSETransport
|
||||
@@ -77,6 +78,7 @@ class MCPClient:
|
||||
max_retries: int = MCP_MAX_RETRIES,
|
||||
cache_tools_list: bool = False,
|
||||
logger: logging.Logger | None = None,
|
||||
security_manager: MCPSecurityManager | None = None,
|
||||
) -> None:
|
||||
"""Initialize MCP client.
|
||||
|
||||
@@ -88,6 +90,8 @@ class MCPClient:
|
||||
max_retries: Maximum retry attempts for operations.
|
||||
cache_tools_list: Whether to cache tool list results.
|
||||
logger: Optional logger instance.
|
||||
security_manager: Optional security manager for message signing
|
||||
and tool integrity verification.
|
||||
"""
|
||||
self.transport = transport
|
||||
self.connect_timeout = connect_timeout
|
||||
@@ -95,6 +99,7 @@ class MCPClient:
|
||||
self.discovery_timeout = discovery_timeout
|
||||
self.max_retries = max_retries
|
||||
self.cache_tools_list = cache_tools_list
|
||||
self.security_manager = security_manager
|
||||
# self._logger = logger or logging.getLogger(__name__)
|
||||
self._session: Any = None
|
||||
self._initialized = False
|
||||
@@ -439,6 +444,10 @@ class MCPClient:
|
||||
) -> Any:
|
||||
"""Call a tool on the MCP server.
|
||||
|
||||
When a :class:`MCPSecurityManager` is attached, the outgoing JSON-RPC
|
||||
message is signed before transmission so the server can verify the
|
||||
caller's identity and detect tampering.
|
||||
|
||||
Args:
|
||||
tool_name: Name of the tool to call.
|
||||
arguments: Tool arguments.
|
||||
@@ -452,6 +461,24 @@ class MCPClient:
|
||||
arguments = arguments or {}
|
||||
cleaned_arguments = self._clean_tool_arguments(arguments)
|
||||
|
||||
# Sign the outgoing message when a security manager is present.
|
||||
# We use a shallow copy of cleaned_arguments inside the message dict
|
||||
# so that storing the envelope back into cleaned_arguments does not
|
||||
# create a circular reference (envelope -> message -> params -> arguments).
|
||||
if self.security_manager is not None:
|
||||
message = {
|
||||
"jsonrpc": "2.0",
|
||||
"method": "tools/call",
|
||||
"params": {
|
||||
"name": tool_name,
|
||||
"arguments": dict(cleaned_arguments),
|
||||
},
|
||||
}
|
||||
signed_envelope = self.security_manager.sign_message(message)
|
||||
# Attach the signed envelope as metadata so the server can verify
|
||||
if signed_envelope is not message:
|
||||
cleaned_arguments["_mcps_envelope"] = signed_envelope
|
||||
|
||||
# Get server info for events
|
||||
server_name, server_url, transport_type = self._get_server_info()
|
||||
|
||||
|
||||
@@ -4,9 +4,12 @@ This module provides Pydantic models for configuring MCP servers with
|
||||
various transport types, similar to OpenAI's Agents SDK.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai.mcp.filters import ToolFilter
|
||||
from crewai.mcp.security import MCPSecurityConfig
|
||||
|
||||
|
||||
class MCPServerStdio(BaseModel):
|
||||
@@ -48,6 +51,13 @@ class MCPServerStdio(BaseModel):
|
||||
default=False,
|
||||
description="Whether to cache the tool list for faster subsequent access.",
|
||||
)
|
||||
security: MCPSecurityConfig | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Optional security configuration for cryptographic identity, "
|
||||
"message signing, and tool integrity verification."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class MCPServerHTTP(BaseModel):
|
||||
@@ -85,6 +95,13 @@ class MCPServerHTTP(BaseModel):
|
||||
default=False,
|
||||
description="Whether to cache the tool list for faster subsequent access.",
|
||||
)
|
||||
security: MCPSecurityConfig | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Optional security configuration for cryptographic identity, "
|
||||
"message signing, and tool integrity verification."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class MCPServerSSE(BaseModel):
|
||||
@@ -118,6 +135,13 @@ class MCPServerSSE(BaseModel):
|
||||
default=False,
|
||||
description="Whether to cache the tool list for faster subsequent access.",
|
||||
)
|
||||
security: MCPSecurityConfig | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Optional security configuration for cryptographic identity, "
|
||||
"message signing, and tool integrity verification."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# Type alias for all MCP server configurations
|
||||
|
||||
360
lib/crewai/src/crewai/mcp/security.py
Normal file
360
lib/crewai/src/crewai/mcp/security.py
Normal file
@@ -0,0 +1,360 @@
|
||||
"""MCP security layer for CrewAI agents.
|
||||
|
||||
This module provides cryptographic identity, message signing, tool integrity
|
||||
verification, and replay protection for MCP tool calls, addressing the security
|
||||
gaps described in OWASP MCP Top 10.
|
||||
|
||||
It wraps the ``mcp-secure`` library (optional dependency) and exposes:
|
||||
|
||||
- :class:`MCPSecurityConfig` -- Pydantic model for security settings.
|
||||
- :class:`MCPSecurityManager` -- Stateful manager handling key generation,
|
||||
passport creation/signing, message signing/verification, tool integrity
|
||||
checks, and nonce-based replay protection.
|
||||
|
||||
When ``mcp-secure`` is not installed, :meth:`MCPSecurityManager.is_available`
|
||||
returns ``False`` and all operations gracefully degrade to no-ops.
|
||||
|
||||
Example:
|
||||
```python
|
||||
from crewai.mcp.security import MCPSecurityConfig, MCPSecurityManager
|
||||
|
||||
config = MCPSecurityConfig(
|
||||
agent_name="my-agent",
|
||||
agent_version="1.0.0",
|
||||
capabilities=["read", "write"],
|
||||
)
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
# Sign a tool call message
|
||||
message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1}
|
||||
envelope = manager.sign_message(message)
|
||||
|
||||
# Verify a tool definition
|
||||
tool = {"name": "read_file", "description": "...", "inputSchema": {}}
|
||||
signature = "..." # from server
|
||||
is_valid = manager.verify_tool(tool, signature)
|
||||
```
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _mcp_secure_available() -> bool:
|
||||
"""Check whether the ``mcp-secure`` package is importable."""
|
||||
try:
|
||||
import mcp_secure # noqa: F401
|
||||
|
||||
return True
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
|
||||
class MCPSecurityConfig(BaseModel):
|
||||
"""Configuration for MCP security features.
|
||||
|
||||
All fields have sensible defaults so that a minimal configuration like
|
||||
``MCPSecurityConfig()`` auto-generates keys and creates an unsigned
|
||||
passport. For production use, supply a Trust Authority key pair so that
|
||||
passports are cryptographically signed and verifiable.
|
||||
|
||||
Example:
|
||||
```python
|
||||
# Minimal -- keys auto-generated, passport self-signed
|
||||
config = MCPSecurityConfig(agent_name="researcher")
|
||||
|
||||
# Production -- TA-signed passport, tool verification enabled
|
||||
config = MCPSecurityConfig(
|
||||
agent_name="researcher",
|
||||
agent_version="2.0.0",
|
||||
capabilities=["search", "summarize"],
|
||||
ta_public_key="<PEM public key>",
|
||||
ta_private_key="<PEM private key>",
|
||||
verify_tool_signatures=True,
|
||||
sign_messages=True,
|
||||
)
|
||||
```
|
||||
"""
|
||||
|
||||
agent_name: str = Field(
|
||||
default="crewai-agent",
|
||||
description="Human-readable name for the agent passport.",
|
||||
)
|
||||
agent_version: str = Field(
|
||||
default="1.0.0",
|
||||
description="Semantic version included in the agent passport.",
|
||||
)
|
||||
capabilities: list[str] = Field(
|
||||
default_factory=list,
|
||||
description="List of capability strings for the agent passport.",
|
||||
)
|
||||
|
||||
# Agent key pair -- auto-generated when left as ``None``.
|
||||
private_key: str | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"PEM-encoded ECDSA P-256 private key for signing messages. "
|
||||
"Auto-generated if not provided."
|
||||
),
|
||||
)
|
||||
public_key: str | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"PEM-encoded ECDSA P-256 public key matching the private key. "
|
||||
"Auto-generated if not provided."
|
||||
),
|
||||
)
|
||||
|
||||
# Trust Authority key pair -- used to sign and verify passports.
|
||||
ta_private_key: str | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"PEM-encoded private key of the Trust Authority. "
|
||||
"When provided, passports are TA-signed."
|
||||
),
|
||||
)
|
||||
ta_public_key: str | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"PEM-encoded public key of the Trust Authority. "
|
||||
"When provided, incoming passports can be verified."
|
||||
),
|
||||
)
|
||||
|
||||
sign_messages: bool = Field(
|
||||
default=True,
|
||||
description="Whether to sign outgoing MCP JSON-RPC messages.",
|
||||
)
|
||||
verify_tool_signatures: bool = Field(
|
||||
default=True,
|
||||
description="Whether to verify tool definition signatures from the server.",
|
||||
)
|
||||
|
||||
|
||||
class MCPSecurityManager:
|
||||
"""Manages MCP security operations for a single agent.
|
||||
|
||||
Handles key generation, passport lifecycle, message signing/verification,
|
||||
tool integrity checks, and replay protection via nonce tracking.
|
||||
|
||||
The manager is safe to share across MCP clients that belong to the same
|
||||
agent; each call to :meth:`sign_message` generates a unique nonce.
|
||||
"""
|
||||
|
||||
def __init__(self, config: MCPSecurityConfig) -> None:
|
||||
self._config = config
|
||||
self._passport: dict[str, Any] | None = None
|
||||
self._nonce_store: Any | None = None
|
||||
self._private_key: str | None = config.private_key
|
||||
self._public_key: str | None = config.public_key
|
||||
|
||||
if _mcp_secure_available():
|
||||
self._initialize()
|
||||
|
||||
@staticmethod
|
||||
def is_available() -> bool:
|
||||
"""Return ``True`` when the ``mcp-secure`` package is installed."""
|
||||
return _mcp_secure_available()
|
||||
|
||||
@property
|
||||
def passport(self) -> dict[str, Any] | None:
|
||||
"""Return the current agent passport, or ``None`` if unavailable."""
|
||||
return self._passport
|
||||
|
||||
@property
|
||||
def passport_id(self) -> str | None:
|
||||
"""Return the passport ID, or ``None`` if no passport exists."""
|
||||
if self._passport is None:
|
||||
return None
|
||||
return self._passport.get("passport_id")
|
||||
|
||||
@property
|
||||
def public_key(self) -> str | None:
|
||||
"""Return the agent's public key."""
|
||||
return self._public_key
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Initialization
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _initialize(self) -> None:
|
||||
"""Generate keys (if needed), create passport, and set up nonce store."""
|
||||
from mcp_secure import (
|
||||
NonceStore,
|
||||
create_passport,
|
||||
generate_key_pair,
|
||||
sign_passport,
|
||||
)
|
||||
|
||||
# Generate agent keys if not provided
|
||||
if self._private_key is None or self._public_key is None:
|
||||
keys = generate_key_pair()
|
||||
self._private_key = keys["private_key"]
|
||||
self._public_key = keys["public_key"]
|
||||
|
||||
# Create passport
|
||||
passport = create_passport(
|
||||
name=self._config.agent_name,
|
||||
version=self._config.agent_version,
|
||||
public_key=self._public_key,
|
||||
capabilities=self._config.capabilities,
|
||||
)
|
||||
|
||||
# Sign with Trust Authority key if available
|
||||
if self._config.ta_private_key is not None:
|
||||
passport = sign_passport(passport, self._config.ta_private_key)
|
||||
|
||||
self._passport = passport
|
||||
self._nonce_store = NonceStore()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Message signing / verification
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def sign_message(self, message: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Sign an outgoing MCP JSON-RPC message.
|
||||
|
||||
Returns the original *message* unchanged when security is unavailable
|
||||
or message signing is disabled.
|
||||
|
||||
Args:
|
||||
message: The JSON-RPC message dict to sign.
|
||||
|
||||
Returns:
|
||||
A signed envelope dict (when signing is active) or the original
|
||||
message (when signing is inactive).
|
||||
"""
|
||||
if not self.is_available() or not self._config.sign_messages:
|
||||
return message
|
||||
|
||||
if self._passport is None or self._private_key is None:
|
||||
return message
|
||||
|
||||
from mcp_secure import sign_message
|
||||
|
||||
return sign_message(message, self._passport["passport_id"], self._private_key)
|
||||
|
||||
def verify_message(self, envelope: dict[str, Any], sender_public_key: str) -> bool:
|
||||
"""Verify an incoming signed MCP message envelope.
|
||||
|
||||
Args:
|
||||
envelope: The signed envelope dict.
|
||||
sender_public_key: PEM-encoded public key of the sender.
|
||||
|
||||
Returns:
|
||||
``True`` if the envelope signature and nonce are valid.
|
||||
"""
|
||||
if not self.is_available():
|
||||
return True
|
||||
|
||||
from mcp_secure import verify_message
|
||||
|
||||
result = verify_message(envelope, sender_public_key)
|
||||
if not result.get("valid", False):
|
||||
logger.warning("MCP message verification failed: %s", result.get("error"))
|
||||
return False
|
||||
|
||||
# Replay protection
|
||||
nonce = envelope.get("nonce")
|
||||
if nonce is not None and self._nonce_store is not None:
|
||||
if not self._nonce_store.check(nonce):
|
||||
logger.warning("MCP message replay detected (duplicate nonce)")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Tool integrity
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def sign_tool(self, tool_definition: dict[str, Any]) -> str | None:
|
||||
"""Sign a tool definition.
|
||||
|
||||
Args:
|
||||
tool_definition: Tool definition dict (name, description, inputSchema).
|
||||
|
||||
Returns:
|
||||
Signature string, or ``None`` when security is unavailable.
|
||||
"""
|
||||
if not self.is_available() or self._private_key is None:
|
||||
return None
|
||||
|
||||
from mcp_secure import sign_tool
|
||||
|
||||
return sign_tool(tool_definition, self._private_key)
|
||||
|
||||
def verify_tool(
|
||||
self,
|
||||
tool_definition: dict[str, Any],
|
||||
signature: str,
|
||||
signer_public_key: str | None = None,
|
||||
) -> bool:
|
||||
"""Verify a tool definition signature.
|
||||
|
||||
Args:
|
||||
tool_definition: Tool definition dict.
|
||||
signature: Signature string to verify.
|
||||
signer_public_key: PEM-encoded public key of the signer.
|
||||
Falls back to the agent's own public key if ``None``.
|
||||
|
||||
Returns:
|
||||
``True`` if the signature is valid, ``False`` otherwise.
|
||||
Always returns ``True`` when security is unavailable or
|
||||
tool verification is disabled.
|
||||
"""
|
||||
if not self.is_available() or not self._config.verify_tool_signatures:
|
||||
return True
|
||||
|
||||
key = signer_public_key or self._public_key
|
||||
if key is None:
|
||||
return True
|
||||
|
||||
from mcp_secure import verify_tool
|
||||
|
||||
return verify_tool(tool_definition, signature, key)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Passport verification
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def verify_passport(
|
||||
self, passport: dict[str, Any], ta_public_key: str | None = None
|
||||
) -> bool:
|
||||
"""Verify a remote passport signature.
|
||||
|
||||
Args:
|
||||
passport: Passport dict to verify.
|
||||
ta_public_key: Trust Authority public key. Falls back to the
|
||||
configured ``ta_public_key`` if ``None``.
|
||||
|
||||
Returns:
|
||||
``True`` if the passport signature is valid.
|
||||
"""
|
||||
if not self.is_available():
|
||||
return True
|
||||
|
||||
key = ta_public_key or self._config.ta_public_key
|
||||
if key is None:
|
||||
return True
|
||||
|
||||
from mcp_secure import verify_passport_signature
|
||||
|
||||
return verify_passport_signature(passport, key)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Cleanup
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def destroy(self) -> None:
|
||||
"""Release resources held by the security manager."""
|
||||
if self._nonce_store is not None:
|
||||
self._nonce_store.destroy()
|
||||
self._nonce_store = None
|
||||
self._passport = None
|
||||
@@ -23,6 +23,7 @@ from crewai.mcp.config import (
|
||||
MCPServerSSE,
|
||||
MCPServerStdio,
|
||||
)
|
||||
from crewai.mcp.security import MCPSecurityConfig, MCPSecurityManager
|
||||
from crewai.mcp.transports.http import HTTPTransport
|
||||
from crewai.mcp.transports.sse import SSETransport
|
||||
from crewai.mcp.transports.stdio import StdioTransport
|
||||
@@ -324,9 +325,19 @@ class MCPToolResolver:
|
||||
from crewai.tools.mcp_native_tool import MCPNativeTool
|
||||
|
||||
discovery_transport, server_name = self._create_transport(mcp_config)
|
||||
|
||||
# Build a security manager when the config has security settings
|
||||
security_manager: MCPSecurityManager | None = None
|
||||
security_config: MCPSecurityConfig | None = getattr(
|
||||
mcp_config, "security", None
|
||||
)
|
||||
if security_config is not None and MCPSecurityManager.is_available():
|
||||
security_manager = MCPSecurityManager(security_config)
|
||||
|
||||
discovery_client = MCPClient(
|
||||
transport=discovery_transport,
|
||||
cache_tools_list=mcp_config.cache_tools_list,
|
||||
security_manager=security_manager,
|
||||
)
|
||||
|
||||
async def _setup_client_and_list_tools() -> list[dict[str, Any]]:
|
||||
@@ -404,6 +415,7 @@ class MCPToolResolver:
|
||||
return MCPClient(
|
||||
transport=transport,
|
||||
cache_tools_list=mcp_config.cache_tools_list,
|
||||
security_manager=security_manager,
|
||||
)
|
||||
|
||||
tools = []
|
||||
|
||||
490
lib/crewai/tests/mcp/test_mcp_security.py
Normal file
490
lib/crewai/tests/mcp/test_mcp_security.py
Normal file
@@ -0,0 +1,490 @@
|
||||
"""Tests for MCP security integration (issue #4875).
|
||||
|
||||
Covers:
|
||||
- MCPSecurityConfig model validation
|
||||
- MCPSecurityManager key generation, passport creation, message signing,
|
||||
tool integrity verification, replay protection, and graceful degradation
|
||||
- Integration with MCPClient and MCPToolResolver via security_manager
|
||||
- Config models accepting optional security field
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.mcp.config import MCPServerHTTP, MCPServerSSE, MCPServerStdio
|
||||
from crewai.mcp.security import MCPSecurityConfig, MCPSecurityManager
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MCPSecurityConfig tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMCPSecurityConfig:
|
||||
"""Tests for the MCPSecurityConfig Pydantic model."""
|
||||
|
||||
def test_default_values(self):
|
||||
config = MCPSecurityConfig()
|
||||
assert config.agent_name == "crewai-agent"
|
||||
assert config.agent_version == "1.0.0"
|
||||
assert config.capabilities == []
|
||||
assert config.private_key is None
|
||||
assert config.public_key is None
|
||||
assert config.ta_private_key is None
|
||||
assert config.ta_public_key is None
|
||||
assert config.sign_messages is True
|
||||
assert config.verify_tool_signatures is True
|
||||
|
||||
def test_custom_values(self):
|
||||
config = MCPSecurityConfig(
|
||||
agent_name="researcher",
|
||||
agent_version="2.0.0",
|
||||
capabilities=["search", "summarize"],
|
||||
sign_messages=False,
|
||||
verify_tool_signatures=False,
|
||||
)
|
||||
assert config.agent_name == "researcher"
|
||||
assert config.agent_version == "2.0.0"
|
||||
assert config.capabilities == ["search", "summarize"]
|
||||
assert config.sign_messages is False
|
||||
assert config.verify_tool_signatures is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MCPSecurityManager tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMCPSecurityManager:
|
||||
"""Tests for the MCPSecurityManager class."""
|
||||
|
||||
def test_is_available(self):
|
||||
"""mcp-secure is installed in the test env so this should be True."""
|
||||
assert MCPSecurityManager.is_available() is True
|
||||
|
||||
def test_auto_generates_keys(self):
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
assert manager.public_key is not None
|
||||
assert manager._private_key is not None
|
||||
assert manager.passport is not None
|
||||
assert manager.passport_id is not None
|
||||
|
||||
def test_uses_provided_keys(self):
|
||||
from mcp_secure import generate_key_pair
|
||||
|
||||
keys = generate_key_pair()
|
||||
config = MCPSecurityConfig(
|
||||
agent_name="test-agent",
|
||||
private_key=keys["private_key"],
|
||||
public_key=keys["public_key"],
|
||||
)
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
assert manager._private_key == keys["private_key"]
|
||||
assert manager.public_key == keys["public_key"]
|
||||
|
||||
def test_passport_creation(self):
|
||||
config = MCPSecurityConfig(
|
||||
agent_name="researcher",
|
||||
agent_version="2.0.0",
|
||||
capabilities=["search"],
|
||||
)
|
||||
manager = MCPSecurityManager(config)
|
||||
passport = manager.passport
|
||||
|
||||
assert passport is not None
|
||||
assert passport["agent"]["name"] == "researcher"
|
||||
assert passport["agent"]["version"] == "2.0.0"
|
||||
assert "search" in passport["agent"]["capabilities"]
|
||||
|
||||
def test_passport_signed_by_ta(self):
|
||||
from mcp_secure import generate_key_pair, verify_passport_signature
|
||||
|
||||
ta_keys = generate_key_pair()
|
||||
config = MCPSecurityConfig(
|
||||
agent_name="test-agent",
|
||||
ta_private_key=ta_keys["private_key"],
|
||||
ta_public_key=ta_keys["public_key"],
|
||||
)
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
assert manager.passport is not None
|
||||
assert "signature" in manager.passport
|
||||
assert verify_passport_signature(manager.passport, ta_keys["public_key"])
|
||||
|
||||
# -- Message signing / verification ------------------------------------
|
||||
|
||||
def test_sign_message(self):
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
message = {
|
||||
"jsonrpc": "2.0",
|
||||
"method": "tools/call",
|
||||
"id": 1,
|
||||
"params": {"name": "read_file", "arguments": {"path": "/tmp/test"}},
|
||||
}
|
||||
envelope = manager.sign_message(message)
|
||||
|
||||
# Envelope must contain MCPS fields
|
||||
assert "mcps" in envelope
|
||||
assert "passport_id" in envelope
|
||||
assert "timestamp" in envelope
|
||||
assert "nonce" in envelope
|
||||
assert "signature" in envelope
|
||||
assert "message" in envelope
|
||||
|
||||
def test_sign_message_disabled(self):
|
||||
config = MCPSecurityConfig(agent_name="test-agent", sign_messages=False)
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
message = {"jsonrpc": "2.0", "method": "tools/list", "id": 1}
|
||||
result = manager.sign_message(message)
|
||||
|
||||
# Should return the original message unchanged
|
||||
assert result is message
|
||||
|
||||
def test_verify_own_message(self):
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1}
|
||||
envelope = manager.sign_message(message)
|
||||
|
||||
assert manager.verify_message(envelope, manager.public_key)
|
||||
|
||||
def test_verify_tampered_message_fails(self):
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1}
|
||||
envelope = manager.sign_message(message)
|
||||
|
||||
# Tamper with the message inside the envelope
|
||||
envelope["message"]["method"] = "tools/evil"
|
||||
|
||||
assert manager.verify_message(envelope, manager.public_key) is False
|
||||
|
||||
def test_replay_protection(self):
|
||||
"""Same nonce used twice should be detected as replay."""
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1}
|
||||
envelope = manager.sign_message(message)
|
||||
|
||||
# First verification should pass
|
||||
assert manager.verify_message(envelope, manager.public_key) is True
|
||||
# Second verification with same nonce should fail (replay)
|
||||
assert manager.verify_message(envelope, manager.public_key) is False
|
||||
|
||||
def test_unique_nonces(self):
|
||||
"""Each signed message should have a unique nonce."""
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1}
|
||||
envelope1 = manager.sign_message(message)
|
||||
envelope2 = manager.sign_message(message)
|
||||
|
||||
assert envelope1["nonce"] != envelope2["nonce"]
|
||||
|
||||
# -- Tool integrity ----------------------------------------------------
|
||||
|
||||
def test_sign_and_verify_tool(self):
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
tool = {
|
||||
"name": "read_file",
|
||||
"description": "Read a file from disk",
|
||||
"inputSchema": {"type": "object", "properties": {"path": {"type": "string"}}},
|
||||
}
|
||||
signature = manager.sign_tool(tool)
|
||||
assert signature is not None
|
||||
assert manager.verify_tool(tool, signature) is True
|
||||
|
||||
def test_verify_tampered_tool_fails(self):
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
tool = {
|
||||
"name": "read_file",
|
||||
"description": "Read a file from disk",
|
||||
"inputSchema": {"type": "object"},
|
||||
}
|
||||
signature = manager.sign_tool(tool)
|
||||
|
||||
# Tamper with the tool definition
|
||||
tool["description"] = "EVIL: delete all files"
|
||||
assert manager.verify_tool(tool, signature) is False
|
||||
|
||||
def test_verify_tool_disabled(self):
|
||||
config = MCPSecurityConfig(
|
||||
agent_name="test-agent", verify_tool_signatures=False
|
||||
)
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
tool = {"name": "anything", "description": "whatever"}
|
||||
# Should always return True when verification is disabled
|
||||
assert manager.verify_tool(tool, "invalid-signature") is True
|
||||
|
||||
# -- Passport verification ---------------------------------------------
|
||||
|
||||
def test_verify_passport(self):
|
||||
from mcp_secure import generate_key_pair
|
||||
|
||||
ta_keys = generate_key_pair()
|
||||
config = MCPSecurityConfig(
|
||||
agent_name="test-agent",
|
||||
ta_private_key=ta_keys["private_key"],
|
||||
ta_public_key=ta_keys["public_key"],
|
||||
)
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
assert manager.verify_passport(manager.passport, ta_keys["public_key"]) is True
|
||||
|
||||
# -- Cleanup -----------------------------------------------------------
|
||||
|
||||
def test_destroy(self):
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
assert manager.passport is not None
|
||||
manager.destroy()
|
||||
assert manager.passport is None
|
||||
|
||||
# -- Graceful degradation when mcp-secure not installed ----------------
|
||||
|
||||
def test_graceful_degradation_sign_message(self):
|
||||
"""sign_message returns the original message when mcp-secure is unavailable."""
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
|
||||
with patch("crewai.mcp.security._mcp_secure_available", return_value=False):
|
||||
manager = MCPSecurityManager(config)
|
||||
message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1}
|
||||
result = manager.sign_message(message)
|
||||
assert result is message
|
||||
|
||||
def test_graceful_degradation_verify_message(self):
|
||||
"""verify_message returns True when mcp-secure is unavailable."""
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
|
||||
with patch("crewai.mcp.security._mcp_secure_available", return_value=False):
|
||||
manager = MCPSecurityManager(config)
|
||||
assert manager.verify_message({}, "any-key") is True
|
||||
|
||||
def test_graceful_degradation_verify_tool(self):
|
||||
"""verify_tool returns True when mcp-secure is unavailable."""
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
|
||||
with patch("crewai.mcp.security._mcp_secure_available", return_value=False):
|
||||
manager = MCPSecurityManager(config)
|
||||
assert manager.verify_tool({"name": "x"}, "sig") is True
|
||||
|
||||
def test_graceful_degradation_sign_tool(self):
|
||||
"""sign_tool returns None when mcp-secure is unavailable."""
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
|
||||
with patch("crewai.mcp.security._mcp_secure_available", return_value=False):
|
||||
manager = MCPSecurityManager(config)
|
||||
assert manager.sign_tool({"name": "x"}) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config models accept security field
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestConfigSecurityField:
|
||||
"""Verify all MCP config models accept the optional security field."""
|
||||
|
||||
def test_stdio_config_with_security(self):
|
||||
security = MCPSecurityConfig(agent_name="stdio-agent")
|
||||
config = MCPServerStdio(
|
||||
command="python",
|
||||
args=["server.py"],
|
||||
security=security,
|
||||
)
|
||||
assert config.security is not None
|
||||
assert config.security.agent_name == "stdio-agent"
|
||||
|
||||
def test_http_config_with_security(self):
|
||||
security = MCPSecurityConfig(agent_name="http-agent")
|
||||
config = MCPServerHTTP(
|
||||
url="https://example.com/mcp",
|
||||
security=security,
|
||||
)
|
||||
assert config.security is not None
|
||||
assert config.security.agent_name == "http-agent"
|
||||
|
||||
def test_sse_config_with_security(self):
|
||||
security = MCPSecurityConfig(agent_name="sse-agent")
|
||||
config = MCPServerSSE(
|
||||
url="https://example.com/mcp/sse",
|
||||
security=security,
|
||||
)
|
||||
assert config.security is not None
|
||||
assert config.security.agent_name == "sse-agent"
|
||||
|
||||
def test_config_without_security(self):
|
||||
"""Security is optional -- configs without it should still work."""
|
||||
config = MCPServerHTTP(url="https://example.com/mcp")
|
||||
assert config.security is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MCPClient integration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMCPClientSecurityIntegration:
|
||||
"""Verify MCPClient passes signed envelopes when security_manager is set."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_signs_message(self):
|
||||
"""call_tool should attach _mcps_envelope when security is enabled."""
|
||||
from crewai.mcp.client import MCPClient
|
||||
|
||||
config = MCPSecurityConfig(agent_name="test-agent")
|
||||
manager = MCPSecurityManager(config)
|
||||
|
||||
transport = MagicMock()
|
||||
transport.connected = True
|
||||
transport.transport_type = MagicMock()
|
||||
transport.transport_type.value = "http"
|
||||
transport.url = "https://example.com/mcp"
|
||||
|
||||
client = MCPClient(transport=transport, security_manager=manager)
|
||||
client._initialized = True
|
||||
|
||||
# Mock session.call_tool
|
||||
mock_result = MagicMock()
|
||||
mock_result.isError = False
|
||||
mock_result.content = [MagicMock(text="success")]
|
||||
client._session = MagicMock()
|
||||
client._session.call_tool = AsyncMock(return_value=mock_result)
|
||||
|
||||
result = await client.call_tool("read_file", {"path": "/tmp/test"})
|
||||
|
||||
# Verify call_tool was called with arguments containing _mcps_envelope
|
||||
call_args = client._session.call_tool.call_args
|
||||
actual_args = call_args[0][1] # second positional arg = arguments dict
|
||||
assert "_mcps_envelope" in actual_args
|
||||
envelope = actual_args["_mcps_envelope"]
|
||||
assert "signature" in envelope
|
||||
assert "nonce" in envelope
|
||||
assert "passport_id" in envelope
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_without_security(self):
|
||||
"""call_tool should NOT attach _mcps_envelope when security is None."""
|
||||
from crewai.mcp.client import MCPClient
|
||||
|
||||
transport = MagicMock()
|
||||
transport.connected = True
|
||||
transport.transport_type = MagicMock()
|
||||
transport.transport_type.value = "http"
|
||||
transport.url = "https://example.com/mcp"
|
||||
|
||||
client = MCPClient(transport=transport, security_manager=None)
|
||||
client._initialized = True
|
||||
|
||||
mock_result = MagicMock()
|
||||
mock_result.isError = False
|
||||
mock_result.content = [MagicMock(text="success")]
|
||||
client._session = MagicMock()
|
||||
client._session.call_tool = AsyncMock(return_value=mock_result)
|
||||
|
||||
await client.call_tool("read_file", {"path": "/tmp/test"})
|
||||
|
||||
call_args = client._session.call_tool.call_args
|
||||
actual_args = call_args[0][1]
|
||||
assert "_mcps_envelope" not in actual_args
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MCPToolResolver integration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestToolResolverSecurityIntegration:
|
||||
"""Verify MCPToolResolver creates a security manager from config."""
|
||||
|
||||
def test_resolver_creates_security_manager(self):
|
||||
"""When config has security, resolver should pass security_manager to MCPClient."""
|
||||
security = MCPSecurityConfig(agent_name="test-agent")
|
||||
http_config = MCPServerHTTP(
|
||||
url="https://example.com/mcp",
|
||||
security=security,
|
||||
)
|
||||
|
||||
mock_tool_definitions = [
|
||||
{
|
||||
"name": "test_tool",
|
||||
"description": "A test tool",
|
||||
"inputSchema": {"type": "object"},
|
||||
}
|
||||
]
|
||||
|
||||
with patch("crewai.mcp.tool_resolver.MCPClient") as mock_client_class:
|
||||
mock_client = AsyncMock()
|
||||
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
|
||||
mock_client.connected = False
|
||||
mock_client.connect = AsyncMock()
|
||||
mock_client.disconnect = AsyncMock()
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
from crewai.agent.core import Agent
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
mcps=[http_config],
|
||||
)
|
||||
tools = agent.get_mcp_tools([http_config])
|
||||
|
||||
# The MCPClient constructor should have received a security_manager
|
||||
call_kwargs = mock_client_class.call_args.kwargs
|
||||
assert "security_manager" in call_kwargs
|
||||
assert call_kwargs["security_manager"] is not None
|
||||
assert isinstance(call_kwargs["security_manager"], MCPSecurityManager)
|
||||
|
||||
def test_resolver_no_security_when_not_configured(self):
|
||||
"""When config has no security, security_manager should be None."""
|
||||
http_config = MCPServerHTTP(url="https://example.com/mcp")
|
||||
|
||||
mock_tool_definitions = [
|
||||
{
|
||||
"name": "test_tool",
|
||||
"description": "A test tool",
|
||||
"inputSchema": {"type": "object"},
|
||||
}
|
||||
]
|
||||
|
||||
with patch("crewai.mcp.tool_resolver.MCPClient") as mock_client_class:
|
||||
mock_client = AsyncMock()
|
||||
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
|
||||
mock_client.connected = False
|
||||
mock_client.connect = AsyncMock()
|
||||
mock_client.disconnect = AsyncMock()
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
from crewai.agent.core import Agent
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
mcps=[http_config],
|
||||
)
|
||||
agent.get_mcp_tools([http_config])
|
||||
|
||||
call_kwargs = mock_client_class.call_args.kwargs
|
||||
assert call_kwargs.get("security_manager") is None
|
||||
Reference in New Issue
Block a user