diff --git a/lib/crewai/src/crewai/mcp/__init__.py b/lib/crewai/src/crewai/mcp/__init__.py index e078919fd..eb19f586d 100644 --- a/lib/crewai/src/crewai/mcp/__init__.py +++ b/lib/crewai/src/crewai/mcp/__init__.py @@ -18,6 +18,7 @@ from crewai.mcp.filters import ( create_dynamic_tool_filter, create_static_tool_filter, ) +from crewai.mcp.security import MCPSecurityConfig, MCPSecurityManager from crewai.mcp.tool_resolver import MCPToolResolver from crewai.mcp.transports.base import BaseTransport, TransportType @@ -25,6 +26,8 @@ from crewai.mcp.transports.base import BaseTransport, TransportType __all__ = [ "BaseTransport", "MCPClient", + "MCPSecurityConfig", + "MCPSecurityManager", "MCPServerConfig", "MCPServerHTTP", "MCPServerSSE", diff --git a/lib/crewai/src/crewai/mcp/client.py b/lib/crewai/src/crewai/mcp/client.py index 2b5d75371..f25a77112 100644 --- a/lib/crewai/src/crewai/mcp/client.py +++ b/lib/crewai/src/crewai/mcp/client.py @@ -27,6 +27,7 @@ from crewai.events.types.mcp_events import ( MCPToolExecutionFailedEvent, MCPToolExecutionStartedEvent, ) +from crewai.mcp.security import MCPSecurityManager from crewai.mcp.transports.base import BaseTransport from crewai.mcp.transports.http import HTTPTransport from crewai.mcp.transports.sse import SSETransport @@ -77,6 +78,7 @@ class MCPClient: max_retries: int = MCP_MAX_RETRIES, cache_tools_list: bool = False, logger: logging.Logger | None = None, + security_manager: MCPSecurityManager | None = None, ) -> None: """Initialize MCP client. @@ -88,6 +90,8 @@ class MCPClient: max_retries: Maximum retry attempts for operations. cache_tools_list: Whether to cache tool list results. logger: Optional logger instance. + security_manager: Optional security manager for message signing + and tool integrity verification. """ self.transport = transport self.connect_timeout = connect_timeout @@ -95,6 +99,7 @@ class MCPClient: self.discovery_timeout = discovery_timeout self.max_retries = max_retries self.cache_tools_list = cache_tools_list + self.security_manager = security_manager # self._logger = logger or logging.getLogger(__name__) self._session: Any = None self._initialized = False @@ -439,6 +444,10 @@ class MCPClient: ) -> Any: """Call a tool on the MCP server. + When a :class:`MCPSecurityManager` is attached, the outgoing JSON-RPC + message is signed before transmission so the server can verify the + caller's identity and detect tampering. + Args: tool_name: Name of the tool to call. arguments: Tool arguments. @@ -452,6 +461,21 @@ class MCPClient: arguments = arguments or {} cleaned_arguments = self._clean_tool_arguments(arguments) + # Sign the outgoing message when a security manager is present + if self.security_manager is not None: + message = { + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": tool_name, + "arguments": cleaned_arguments, + }, + } + signed_envelope = self.security_manager.sign_message(message) + # Attach the signed envelope as metadata so the server can verify + if signed_envelope is not message: + cleaned_arguments["_mcps_envelope"] = signed_envelope + # Get server info for events server_name, server_url, transport_type = self._get_server_info() diff --git a/lib/crewai/src/crewai/mcp/config.py b/lib/crewai/src/crewai/mcp/config.py index 775f9403d..934fb8cac 100644 --- a/lib/crewai/src/crewai/mcp/config.py +++ b/lib/crewai/src/crewai/mcp/config.py @@ -4,9 +4,12 @@ This module provides Pydantic models for configuring MCP servers with various transport types, similar to OpenAI's Agents SDK. """ +from __future__ import annotations + from pydantic import BaseModel, Field from crewai.mcp.filters import ToolFilter +from crewai.mcp.security import MCPSecurityConfig class MCPServerStdio(BaseModel): @@ -48,6 +51,13 @@ class MCPServerStdio(BaseModel): default=False, description="Whether to cache the tool list for faster subsequent access.", ) + security: MCPSecurityConfig | None = Field( + default=None, + description=( + "Optional security configuration for cryptographic identity, " + "message signing, and tool integrity verification." + ), + ) class MCPServerHTTP(BaseModel): @@ -85,6 +95,13 @@ class MCPServerHTTP(BaseModel): default=False, description="Whether to cache the tool list for faster subsequent access.", ) + security: MCPSecurityConfig | None = Field( + default=None, + description=( + "Optional security configuration for cryptographic identity, " + "message signing, and tool integrity verification." + ), + ) class MCPServerSSE(BaseModel): @@ -118,6 +135,13 @@ class MCPServerSSE(BaseModel): default=False, description="Whether to cache the tool list for faster subsequent access.", ) + security: MCPSecurityConfig | None = Field( + default=None, + description=( + "Optional security configuration for cryptographic identity, " + "message signing, and tool integrity verification." + ), + ) # Type alias for all MCP server configurations diff --git a/lib/crewai/src/crewai/mcp/security.py b/lib/crewai/src/crewai/mcp/security.py new file mode 100644 index 000000000..d0a9c68fe --- /dev/null +++ b/lib/crewai/src/crewai/mcp/security.py @@ -0,0 +1,360 @@ +"""MCP security layer for CrewAI agents. + +This module provides cryptographic identity, message signing, tool integrity +verification, and replay protection for MCP tool calls, addressing the security +gaps described in OWASP MCP Top 10. + +It wraps the ``mcp-secure`` library (optional dependency) and exposes: + +- :class:`MCPSecurityConfig` -- Pydantic model for security settings. +- :class:`MCPSecurityManager` -- Stateful manager handling key generation, + passport creation/signing, message signing/verification, tool integrity + checks, and nonce-based replay protection. + +When ``mcp-secure`` is not installed, :meth:`MCPSecurityManager.is_available` +returns ``False`` and all operations gracefully degrade to no-ops. + +Example: + ```python + from crewai.mcp.security import MCPSecurityConfig, MCPSecurityManager + + config = MCPSecurityConfig( + agent_name="my-agent", + agent_version="1.0.0", + capabilities=["read", "write"], + ) + manager = MCPSecurityManager(config) + + # Sign a tool call message + message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1} + envelope = manager.sign_message(message) + + # Verify a tool definition + tool = {"name": "read_file", "description": "...", "inputSchema": {}} + signature = "..." # from server + is_valid = manager.verify_tool(tool, signature) + ``` +""" + +from __future__ import annotations + +import logging +from typing import Any + +from pydantic import BaseModel, Field + + +logger = logging.getLogger(__name__) + + +def _mcp_secure_available() -> bool: + """Check whether the ``mcp-secure`` package is importable.""" + try: + import mcp_secure # noqa: F401 + + return True + except ImportError: + return False + + +class MCPSecurityConfig(BaseModel): + """Configuration for MCP security features. + + All fields have sensible defaults so that a minimal configuration like + ``MCPSecurityConfig()`` auto-generates keys and creates an unsigned + passport. For production use, supply a Trust Authority key pair so that + passports are cryptographically signed and verifiable. + + Example: + ```python + # Minimal -- keys auto-generated, passport self-signed + config = MCPSecurityConfig(agent_name="researcher") + + # Production -- TA-signed passport, tool verification enabled + config = MCPSecurityConfig( + agent_name="researcher", + agent_version="2.0.0", + capabilities=["search", "summarize"], + ta_public_key="", + ta_private_key="", + verify_tool_signatures=True, + sign_messages=True, + ) + ``` + """ + + agent_name: str = Field( + default="crewai-agent", + description="Human-readable name for the agent passport.", + ) + agent_version: str = Field( + default="1.0.0", + description="Semantic version included in the agent passport.", + ) + capabilities: list[str] = Field( + default_factory=list, + description="List of capability strings for the agent passport.", + ) + + # Agent key pair -- auto-generated when left as ``None``. + private_key: str | None = Field( + default=None, + description=( + "PEM-encoded ECDSA P-256 private key for signing messages. " + "Auto-generated if not provided." + ), + ) + public_key: str | None = Field( + default=None, + description=( + "PEM-encoded ECDSA P-256 public key matching the private key. " + "Auto-generated if not provided." + ), + ) + + # Trust Authority key pair -- used to sign and verify passports. + ta_private_key: str | None = Field( + default=None, + description=( + "PEM-encoded private key of the Trust Authority. " + "When provided, passports are TA-signed." + ), + ) + ta_public_key: str | None = Field( + default=None, + description=( + "PEM-encoded public key of the Trust Authority. " + "When provided, incoming passports can be verified." + ), + ) + + sign_messages: bool = Field( + default=True, + description="Whether to sign outgoing MCP JSON-RPC messages.", + ) + verify_tool_signatures: bool = Field( + default=True, + description="Whether to verify tool definition signatures from the server.", + ) + + +class MCPSecurityManager: + """Manages MCP security operations for a single agent. + + Handles key generation, passport lifecycle, message signing/verification, + tool integrity checks, and replay protection via nonce tracking. + + The manager is safe to share across MCP clients that belong to the same + agent; each call to :meth:`sign_message` generates a unique nonce. + """ + + def __init__(self, config: MCPSecurityConfig) -> None: + self._config = config + self._passport: dict[str, Any] | None = None + self._nonce_store: Any | None = None + self._private_key: str | None = config.private_key + self._public_key: str | None = config.public_key + + if _mcp_secure_available(): + self._initialize() + + @staticmethod + def is_available() -> bool: + """Return ``True`` when the ``mcp-secure`` package is installed.""" + return _mcp_secure_available() + + @property + def passport(self) -> dict[str, Any] | None: + """Return the current agent passport, or ``None`` if unavailable.""" + return self._passport + + @property + def passport_id(self) -> str | None: + """Return the passport ID, or ``None`` if no passport exists.""" + if self._passport is None: + return None + return self._passport.get("passport_id") + + @property + def public_key(self) -> str | None: + """Return the agent's public key.""" + return self._public_key + + # ------------------------------------------------------------------ + # Initialization + # ------------------------------------------------------------------ + + def _initialize(self) -> None: + """Generate keys (if needed), create passport, and set up nonce store.""" + from mcp_secure import ( + NonceStore, + create_passport, + generate_key_pair, + sign_passport, + ) + + # Generate agent keys if not provided + if self._private_key is None or self._public_key is None: + keys = generate_key_pair() + self._private_key = keys["private_key"] + self._public_key = keys["public_key"] + + # Create passport + passport = create_passport( + name=self._config.agent_name, + version=self._config.agent_version, + public_key=self._public_key, + capabilities=self._config.capabilities, + ) + + # Sign with Trust Authority key if available + if self._config.ta_private_key is not None: + passport = sign_passport(passport, self._config.ta_private_key) + + self._passport = passport + self._nonce_store = NonceStore() + + # ------------------------------------------------------------------ + # Message signing / verification + # ------------------------------------------------------------------ + + def sign_message(self, message: dict[str, Any]) -> dict[str, Any]: + """Sign an outgoing MCP JSON-RPC message. + + Returns the original *message* unchanged when security is unavailable + or message signing is disabled. + + Args: + message: The JSON-RPC message dict to sign. + + Returns: + A signed envelope dict (when signing is active) or the original + message (when signing is inactive). + """ + if not self.is_available() or not self._config.sign_messages: + return message + + if self._passport is None or self._private_key is None: + return message + + from mcp_secure import sign_message + + return sign_message(message, self._passport["passport_id"], self._private_key) + + def verify_message(self, envelope: dict[str, Any], sender_public_key: str) -> bool: + """Verify an incoming signed MCP message envelope. + + Args: + envelope: The signed envelope dict. + sender_public_key: PEM-encoded public key of the sender. + + Returns: + ``True`` if the envelope signature and nonce are valid. + """ + if not self.is_available(): + return True + + from mcp_secure import verify_message + + result = verify_message(envelope, sender_public_key) + if not result.get("valid", False): + logger.warning("MCP message verification failed: %s", result.get("error")) + return False + + # Replay protection + nonce = envelope.get("nonce") + if nonce is not None and self._nonce_store is not None: + if not self._nonce_store.check(nonce): + logger.warning("MCP message replay detected (duplicate nonce)") + return False + + return True + + # ------------------------------------------------------------------ + # Tool integrity + # ------------------------------------------------------------------ + + def sign_tool(self, tool_definition: dict[str, Any]) -> str | None: + """Sign a tool definition. + + Args: + tool_definition: Tool definition dict (name, description, inputSchema). + + Returns: + Signature string, or ``None`` when security is unavailable. + """ + if not self.is_available() or self._private_key is None: + return None + + from mcp_secure import sign_tool + + return sign_tool(tool_definition, self._private_key) + + def verify_tool( + self, + tool_definition: dict[str, Any], + signature: str, + signer_public_key: str | None = None, + ) -> bool: + """Verify a tool definition signature. + + Args: + tool_definition: Tool definition dict. + signature: Signature string to verify. + signer_public_key: PEM-encoded public key of the signer. + Falls back to the agent's own public key if ``None``. + + Returns: + ``True`` if the signature is valid, ``False`` otherwise. + Always returns ``True`` when security is unavailable or + tool verification is disabled. + """ + if not self.is_available() or not self._config.verify_tool_signatures: + return True + + key = signer_public_key or self._public_key + if key is None: + return True + + from mcp_secure import verify_tool + + return verify_tool(tool_definition, signature, key) + + # ------------------------------------------------------------------ + # Passport verification + # ------------------------------------------------------------------ + + def verify_passport( + self, passport: dict[str, Any], ta_public_key: str | None = None + ) -> bool: + """Verify a remote passport signature. + + Args: + passport: Passport dict to verify. + ta_public_key: Trust Authority public key. Falls back to the + configured ``ta_public_key`` if ``None``. + + Returns: + ``True`` if the passport signature is valid. + """ + if not self.is_available(): + return True + + key = ta_public_key or self._config.ta_public_key + if key is None: + return True + + from mcp_secure import verify_passport_signature + + return verify_passport_signature(passport, key) + + # ------------------------------------------------------------------ + # Cleanup + # ------------------------------------------------------------------ + + def destroy(self) -> None: + """Release resources held by the security manager.""" + if self._nonce_store is not None: + self._nonce_store.destroy() + self._nonce_store = None + self._passport = None diff --git a/lib/crewai/src/crewai/mcp/tool_resolver.py b/lib/crewai/src/crewai/mcp/tool_resolver.py index 2ef7364ac..84fc5d307 100644 --- a/lib/crewai/src/crewai/mcp/tool_resolver.py +++ b/lib/crewai/src/crewai/mcp/tool_resolver.py @@ -23,6 +23,7 @@ from crewai.mcp.config import ( MCPServerSSE, MCPServerStdio, ) +from crewai.mcp.security import MCPSecurityConfig, MCPSecurityManager from crewai.mcp.transports.http import HTTPTransport from crewai.mcp.transports.sse import SSETransport from crewai.mcp.transports.stdio import StdioTransport @@ -324,9 +325,19 @@ class MCPToolResolver: from crewai.tools.mcp_native_tool import MCPNativeTool discovery_transport, server_name = self._create_transport(mcp_config) + + # Build a security manager when the config has security settings + security_manager: MCPSecurityManager | None = None + security_config: MCPSecurityConfig | None = getattr( + mcp_config, "security", None + ) + if security_config is not None and MCPSecurityManager.is_available(): + security_manager = MCPSecurityManager(security_config) + discovery_client = MCPClient( transport=discovery_transport, cache_tools_list=mcp_config.cache_tools_list, + security_manager=security_manager, ) async def _setup_client_and_list_tools() -> list[dict[str, Any]]: @@ -404,6 +415,7 @@ class MCPToolResolver: return MCPClient( transport=transport, cache_tools_list=mcp_config.cache_tools_list, + security_manager=security_manager, ) tools = [] diff --git a/lib/crewai/tests/mcp/test_mcp_security.py b/lib/crewai/tests/mcp/test_mcp_security.py new file mode 100644 index 000000000..8ab7198e4 --- /dev/null +++ b/lib/crewai/tests/mcp/test_mcp_security.py @@ -0,0 +1,490 @@ +"""Tests for MCP security integration (issue #4875). + +Covers: +- MCPSecurityConfig model validation +- MCPSecurityManager key generation, passport creation, message signing, + tool integrity verification, replay protection, and graceful degradation +- Integration with MCPClient and MCPToolResolver via security_manager +- Config models accepting optional security field +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from crewai.mcp.config import MCPServerHTTP, MCPServerSSE, MCPServerStdio +from crewai.mcp.security import MCPSecurityConfig, MCPSecurityManager + + +# --------------------------------------------------------------------------- +# MCPSecurityConfig tests +# --------------------------------------------------------------------------- + + +class TestMCPSecurityConfig: + """Tests for the MCPSecurityConfig Pydantic model.""" + + def test_default_values(self): + config = MCPSecurityConfig() + assert config.agent_name == "crewai-agent" + assert config.agent_version == "1.0.0" + assert config.capabilities == [] + assert config.private_key is None + assert config.public_key is None + assert config.ta_private_key is None + assert config.ta_public_key is None + assert config.sign_messages is True + assert config.verify_tool_signatures is True + + def test_custom_values(self): + config = MCPSecurityConfig( + agent_name="researcher", + agent_version="2.0.0", + capabilities=["search", "summarize"], + sign_messages=False, + verify_tool_signatures=False, + ) + assert config.agent_name == "researcher" + assert config.agent_version == "2.0.0" + assert config.capabilities == ["search", "summarize"] + assert config.sign_messages is False + assert config.verify_tool_signatures is False + + +# --------------------------------------------------------------------------- +# MCPSecurityManager tests +# --------------------------------------------------------------------------- + + +class TestMCPSecurityManager: + """Tests for the MCPSecurityManager class.""" + + def test_is_available(self): + """mcp-secure is installed in the test env so this should be True.""" + assert MCPSecurityManager.is_available() is True + + def test_auto_generates_keys(self): + config = MCPSecurityConfig(agent_name="test-agent") + manager = MCPSecurityManager(config) + + assert manager.public_key is not None + assert manager._private_key is not None + assert manager.passport is not None + assert manager.passport_id is not None + + def test_uses_provided_keys(self): + from mcp_secure import generate_key_pair + + keys = generate_key_pair() + config = MCPSecurityConfig( + agent_name="test-agent", + private_key=keys["private_key"], + public_key=keys["public_key"], + ) + manager = MCPSecurityManager(config) + + assert manager._private_key == keys["private_key"] + assert manager.public_key == keys["public_key"] + + def test_passport_creation(self): + config = MCPSecurityConfig( + agent_name="researcher", + agent_version="2.0.0", + capabilities=["search"], + ) + manager = MCPSecurityManager(config) + passport = manager.passport + + assert passport is not None + assert passport["agent"]["name"] == "researcher" + assert passport["agent"]["version"] == "2.0.0" + assert "search" in passport["agent"]["capabilities"] + + def test_passport_signed_by_ta(self): + from mcp_secure import generate_key_pair, verify_passport_signature + + ta_keys = generate_key_pair() + config = MCPSecurityConfig( + agent_name="test-agent", + ta_private_key=ta_keys["private_key"], + ta_public_key=ta_keys["public_key"], + ) + manager = MCPSecurityManager(config) + + assert manager.passport is not None + assert "signature" in manager.passport + assert verify_passport_signature(manager.passport, ta_keys["public_key"]) + + # -- Message signing / verification ------------------------------------ + + def test_sign_message(self): + config = MCPSecurityConfig(agent_name="test-agent") + manager = MCPSecurityManager(config) + + message = { + "jsonrpc": "2.0", + "method": "tools/call", + "id": 1, + "params": {"name": "read_file", "arguments": {"path": "/tmp/test"}}, + } + envelope = manager.sign_message(message) + + # Envelope must contain MCPS fields + assert "mcps" in envelope + assert "passport_id" in envelope + assert "timestamp" in envelope + assert "nonce" in envelope + assert "signature" in envelope + assert "message" in envelope + + def test_sign_message_disabled(self): + config = MCPSecurityConfig(agent_name="test-agent", sign_messages=False) + manager = MCPSecurityManager(config) + + message = {"jsonrpc": "2.0", "method": "tools/list", "id": 1} + result = manager.sign_message(message) + + # Should return the original message unchanged + assert result is message + + def test_verify_own_message(self): + config = MCPSecurityConfig(agent_name="test-agent") + manager = MCPSecurityManager(config) + + message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1} + envelope = manager.sign_message(message) + + assert manager.verify_message(envelope, manager.public_key) + + def test_verify_tampered_message_fails(self): + config = MCPSecurityConfig(agent_name="test-agent") + manager = MCPSecurityManager(config) + + message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1} + envelope = manager.sign_message(message) + + # Tamper with the message inside the envelope + envelope["message"]["method"] = "tools/evil" + + assert manager.verify_message(envelope, manager.public_key) is False + + def test_replay_protection(self): + """Same nonce used twice should be detected as replay.""" + config = MCPSecurityConfig(agent_name="test-agent") + manager = MCPSecurityManager(config) + + message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1} + envelope = manager.sign_message(message) + + # First verification should pass + assert manager.verify_message(envelope, manager.public_key) is True + # Second verification with same nonce should fail (replay) + assert manager.verify_message(envelope, manager.public_key) is False + + def test_unique_nonces(self): + """Each signed message should have a unique nonce.""" + config = MCPSecurityConfig(agent_name="test-agent") + manager = MCPSecurityManager(config) + + message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1} + envelope1 = manager.sign_message(message) + envelope2 = manager.sign_message(message) + + assert envelope1["nonce"] != envelope2["nonce"] + + # -- Tool integrity ---------------------------------------------------- + + def test_sign_and_verify_tool(self): + config = MCPSecurityConfig(agent_name="test-agent") + manager = MCPSecurityManager(config) + + tool = { + "name": "read_file", + "description": "Read a file from disk", + "inputSchema": {"type": "object", "properties": {"path": {"type": "string"}}}, + } + signature = manager.sign_tool(tool) + assert signature is not None + assert manager.verify_tool(tool, signature) is True + + def test_verify_tampered_tool_fails(self): + config = MCPSecurityConfig(agent_name="test-agent") + manager = MCPSecurityManager(config) + + tool = { + "name": "read_file", + "description": "Read a file from disk", + "inputSchema": {"type": "object"}, + } + signature = manager.sign_tool(tool) + + # Tamper with the tool definition + tool["description"] = "EVIL: delete all files" + assert manager.verify_tool(tool, signature) is False + + def test_verify_tool_disabled(self): + config = MCPSecurityConfig( + agent_name="test-agent", verify_tool_signatures=False + ) + manager = MCPSecurityManager(config) + + tool = {"name": "anything", "description": "whatever"} + # Should always return True when verification is disabled + assert manager.verify_tool(tool, "invalid-signature") is True + + # -- Passport verification --------------------------------------------- + + def test_verify_passport(self): + from mcp_secure import generate_key_pair + + ta_keys = generate_key_pair() + config = MCPSecurityConfig( + agent_name="test-agent", + ta_private_key=ta_keys["private_key"], + ta_public_key=ta_keys["public_key"], + ) + manager = MCPSecurityManager(config) + + assert manager.verify_passport(manager.passport, ta_keys["public_key"]) is True + + # -- Cleanup ----------------------------------------------------------- + + def test_destroy(self): + config = MCPSecurityConfig(agent_name="test-agent") + manager = MCPSecurityManager(config) + + assert manager.passport is not None + manager.destroy() + assert manager.passport is None + + # -- Graceful degradation when mcp-secure not installed ---------------- + + def test_graceful_degradation_sign_message(self): + """sign_message returns the original message when mcp-secure is unavailable.""" + config = MCPSecurityConfig(agent_name="test-agent") + + with patch("crewai.mcp.security._mcp_secure_available", return_value=False): + manager = MCPSecurityManager(config) + message = {"jsonrpc": "2.0", "method": "tools/call", "id": 1} + result = manager.sign_message(message) + assert result is message + + def test_graceful_degradation_verify_message(self): + """verify_message returns True when mcp-secure is unavailable.""" + config = MCPSecurityConfig(agent_name="test-agent") + + with patch("crewai.mcp.security._mcp_secure_available", return_value=False): + manager = MCPSecurityManager(config) + assert manager.verify_message({}, "any-key") is True + + def test_graceful_degradation_verify_tool(self): + """verify_tool returns True when mcp-secure is unavailable.""" + config = MCPSecurityConfig(agent_name="test-agent") + + with patch("crewai.mcp.security._mcp_secure_available", return_value=False): + manager = MCPSecurityManager(config) + assert manager.verify_tool({"name": "x"}, "sig") is True + + def test_graceful_degradation_sign_tool(self): + """sign_tool returns None when mcp-secure is unavailable.""" + config = MCPSecurityConfig(agent_name="test-agent") + + with patch("crewai.mcp.security._mcp_secure_available", return_value=False): + manager = MCPSecurityManager(config) + assert manager.sign_tool({"name": "x"}) is None + + +# --------------------------------------------------------------------------- +# Config models accept security field +# --------------------------------------------------------------------------- + + +class TestConfigSecurityField: + """Verify all MCP config models accept the optional security field.""" + + def test_stdio_config_with_security(self): + security = MCPSecurityConfig(agent_name="stdio-agent") + config = MCPServerStdio( + command="python", + args=["server.py"], + security=security, + ) + assert config.security is not None + assert config.security.agent_name == "stdio-agent" + + def test_http_config_with_security(self): + security = MCPSecurityConfig(agent_name="http-agent") + config = MCPServerHTTP( + url="https://example.com/mcp", + security=security, + ) + assert config.security is not None + assert config.security.agent_name == "http-agent" + + def test_sse_config_with_security(self): + security = MCPSecurityConfig(agent_name="sse-agent") + config = MCPServerSSE( + url="https://example.com/mcp/sse", + security=security, + ) + assert config.security is not None + assert config.security.agent_name == "sse-agent" + + def test_config_without_security(self): + """Security is optional -- configs without it should still work.""" + config = MCPServerHTTP(url="https://example.com/mcp") + assert config.security is None + + +# --------------------------------------------------------------------------- +# MCPClient integration +# --------------------------------------------------------------------------- + + +class TestMCPClientSecurityIntegration: + """Verify MCPClient passes signed envelopes when security_manager is set.""" + + @pytest.mark.asyncio + async def test_call_tool_signs_message(self): + """call_tool should attach _mcps_envelope when security is enabled.""" + from crewai.mcp.client import MCPClient + + config = MCPSecurityConfig(agent_name="test-agent") + manager = MCPSecurityManager(config) + + transport = MagicMock() + transport.connected = True + transport.transport_type = MagicMock() + transport.transport_type.value = "http" + transport.url = "https://example.com/mcp" + + client = MCPClient(transport=transport, security_manager=manager) + client._initialized = True + + # Mock session.call_tool + mock_result = MagicMock() + mock_result.isError = False + mock_result.content = [MagicMock(text="success")] + client._session = MagicMock() + client._session.call_tool = AsyncMock(return_value=mock_result) + + result = await client.call_tool("read_file", {"path": "/tmp/test"}) + + # Verify call_tool was called with arguments containing _mcps_envelope + call_args = client._session.call_tool.call_args + actual_args = call_args[0][1] # second positional arg = arguments dict + assert "_mcps_envelope" in actual_args + envelope = actual_args["_mcps_envelope"] + assert "signature" in envelope + assert "nonce" in envelope + assert "passport_id" in envelope + + @pytest.mark.asyncio + async def test_call_tool_without_security(self): + """call_tool should NOT attach _mcps_envelope when security is None.""" + from crewai.mcp.client import MCPClient + + transport = MagicMock() + transport.connected = True + transport.transport_type = MagicMock() + transport.transport_type.value = "http" + transport.url = "https://example.com/mcp" + + client = MCPClient(transport=transport, security_manager=None) + client._initialized = True + + mock_result = MagicMock() + mock_result.isError = False + mock_result.content = [MagicMock(text="success")] + client._session = MagicMock() + client._session.call_tool = AsyncMock(return_value=mock_result) + + await client.call_tool("read_file", {"path": "/tmp/test"}) + + call_args = client._session.call_tool.call_args + actual_args = call_args[0][1] + assert "_mcps_envelope" not in actual_args + + +# --------------------------------------------------------------------------- +# MCPToolResolver integration +# --------------------------------------------------------------------------- + + +class TestToolResolverSecurityIntegration: + """Verify MCPToolResolver creates a security manager from config.""" + + def test_resolver_creates_security_manager(self): + """When config has security, resolver should pass security_manager to MCPClient.""" + security = MCPSecurityConfig(agent_name="test-agent") + http_config = MCPServerHTTP( + url="https://example.com/mcp", + security=security, + ) + + mock_tool_definitions = [ + { + "name": "test_tool", + "description": "A test tool", + "inputSchema": {"type": "object"}, + } + ] + + with patch("crewai.mcp.tool_resolver.MCPClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions) + mock_client.connected = False + mock_client.connect = AsyncMock() + mock_client.disconnect = AsyncMock() + mock_client_class.return_value = mock_client + + from crewai.agent.core import Agent + + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + mcps=[http_config], + ) + tools = agent.get_mcp_tools([http_config]) + + # The MCPClient constructor should have received a security_manager + call_kwargs = mock_client_class.call_args.kwargs + assert "security_manager" in call_kwargs + assert call_kwargs["security_manager"] is not None + assert isinstance(call_kwargs["security_manager"], MCPSecurityManager) + + def test_resolver_no_security_when_not_configured(self): + """When config has no security, security_manager should be None.""" + http_config = MCPServerHTTP(url="https://example.com/mcp") + + mock_tool_definitions = [ + { + "name": "test_tool", + "description": "A test tool", + "inputSchema": {"type": "object"}, + } + ] + + with patch("crewai.mcp.tool_resolver.MCPClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions) + mock_client.connected = False + mock_client.connect = AsyncMock() + mock_client.disconnect = AsyncMock() + mock_client_class.return_value = mock_client + + from crewai.agent.core import Agent + + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + mcps=[http_config], + ) + tools = agent.get_mcp_tools([http_config]) + + call_kwargs = mock_client_class.call_args.kwargs + assert call_kwargs.get("security_manager") is None