Compare commits

...

1 Commits

Author SHA1 Message Date
Devin AI
1d1c68792a feat: Add SSL verification option (verify parameter) to MCPServerHTTP and MCPServerSSE
This commit adds a 'verify' parameter to MCPServerHTTP and MCPServerSSE
configurations, allowing users to disable SSL certificate verification
or specify a custom CA bundle path when connecting to MCP servers.

This is useful for enterprise environments where:
- Corporate proxies intercept HTTPS traffic with self-signed certificates
- Internal certificate authorities are not in the default trust store
- Development/staging environments use self-signed certificates

Changes:
- Add 'verify' parameter to MCPServerHTTP and MCPServerSSE config classes
  (default: True, accepts bool or str for CA bundle path)
- Add 'verify' parameter to HTTPTransport and SSETransport classes
- Create custom httpx_client_factory that preserves MCP defaults while
  allowing SSL verification customization
- Update agent/core.py to pass verify parameter when creating transports
- Add comprehensive tests for the new functionality

Closes #4100

Co-Authored-By: João <joao@crewai.com>
2025-12-16 15:18:17 +00:00
8 changed files with 4682 additions and 4169 deletions

View File

@@ -912,12 +912,14 @@ class Agent(BaseAgent):
url=mcp_config.url,
headers=mcp_config.headers,
streamable=mcp_config.streamable,
verify=mcp_config.verify,
)
server_name = self._extract_server_name(mcp_config.url)
elif isinstance(mcp_config, MCPServerSSE):
transport = SSETransport(
url=mcp_config.url,
headers=mcp_config.headers,
verify=mcp_config.verify,
)
server_name = self._extract_server_name(mcp_config.url)
else:

View File

@@ -63,6 +63,18 @@ class MCPServerHTTP(BaseModel):
headers={"Authorization": "Bearer ..."},
cache_tools_list=True,
)
# Disable SSL verification (for self-signed certificates)
mcp_server = MCPServerHTTP(
url="https://internal-server.example.com/mcp",
verify=False,
)
# Use custom CA bundle
mcp_server = MCPServerHTTP(
url="https://internal-server.example.com/mcp",
verify="/path/to/ca-bundle.crt",
)
```
"""
@@ -77,6 +89,11 @@ class MCPServerHTTP(BaseModel):
default=True,
description="Whether to use streamable HTTP transport (default: True).",
)
verify: bool | str = Field(
default=True,
description="SSL certificate verification. Set to False to disable verification, "
"or provide a path to a CA bundle file.",
)
tool_filter: ToolFilter | None = Field(
default=None,
description="Optional tool filter for filtering available tools.",
@@ -99,6 +116,18 @@ class MCPServerSSE(BaseModel):
url="https://api.example.com/mcp/sse",
headers={"Authorization": "Bearer ..."},
)
# Disable SSL verification (for self-signed certificates)
mcp_server = MCPServerSSE(
url="https://internal-server.example.com/mcp/sse",
verify=False,
)
# Use custom CA bundle
mcp_server = MCPServerSSE(
url="https://internal-server.example.com/mcp/sse",
verify="/path/to/ca-bundle.crt",
)
```
"""
@@ -110,6 +139,11 @@ class MCPServerSSE(BaseModel):
default=None,
description="Optional HTTP headers for authentication or other purposes.",
)
verify: bool | str = Field(
default=True,
description="SSL certificate verification. Set to False to disable verification, "
"or provide a path to a CA bundle file.",
)
tool_filter: ToolFilter | None = Field(
default=None,
description="Optional tool filter for filtering available tools.",

View File

@@ -3,6 +3,7 @@
import asyncio
from typing import Any
import httpx
from typing_extensions import Self
@@ -16,6 +17,48 @@ except ImportError:
from crewai.mcp.transports.base import BaseTransport, TransportType
def _create_httpx_client_factory(
verify: bool | str,
) -> Any:
"""Create a custom httpx client factory with SSL verification settings.
This factory preserves MCP's default client settings (follow_redirects, timeout)
while allowing customization of SSL verification.
Args:
verify: SSL verification setting. True for default verification,
False to disable, or a path to a CA bundle file.
Returns:
A factory function compatible with MCP's McpHttpClientFactory protocol.
"""
def factory(
headers: dict[str, str] | None = None,
timeout: httpx.Timeout | None = None,
auth: httpx.Auth | None = None,
) -> httpx.AsyncClient:
kwargs: dict[str, Any] = {
"follow_redirects": True,
"verify": verify,
}
if timeout is None:
kwargs["timeout"] = httpx.Timeout(30.0)
else:
kwargs["timeout"] = timeout
if headers is not None:
kwargs["headers"] = headers
if auth is not None:
kwargs["auth"] = auth
return httpx.AsyncClient(**kwargs)
return factory
class HTTPTransport(BaseTransport):
"""HTTP/Streamable HTTP transport for connecting to remote MCP servers.
@@ -30,6 +73,12 @@ class HTTPTransport(BaseTransport):
)
async with transport:
# Use transport...
# With SSL verification disabled
transport = HTTPTransport(
url="https://internal-server.example.com/mcp",
verify=False
)
```
"""
@@ -38,6 +87,7 @@ class HTTPTransport(BaseTransport):
url: str,
headers: dict[str, str] | None = None,
streamable: bool = True,
verify: bool | str = True,
**kwargs: Any,
) -> None:
"""Initialize HTTP transport.
@@ -46,12 +96,15 @@ class HTTPTransport(BaseTransport):
url: Server URL (e.g., "https://api.example.com/mcp").
headers: Optional HTTP headers.
streamable: Whether to use streamable HTTP (default: True).
verify: SSL certificate verification. Set to False to disable,
or provide a path to a CA bundle file (default: True).
**kwargs: Additional transport options.
"""
super().__init__(**kwargs)
self.url = url
self.headers = headers or {}
self.streamable = streamable
self.verify = verify
self._transport_context: Any = None
@property
@@ -75,11 +128,17 @@ class HTTPTransport(BaseTransport):
try:
from mcp.client.streamable_http import streamablehttp_client
self._transport_context = streamablehttp_client(
self.url,
headers=self.headers if self.headers else None,
terminate_on_close=True,
)
client_kwargs: dict[str, Any] = {
"headers": self.headers if self.headers else None,
"terminate_on_close": True,
}
if self.verify is not True:
client_kwargs["httpx_client_factory"] = _create_httpx_client_factory(
self.verify
)
self._transport_context = streamablehttp_client(self.url, **client_kwargs)
try:
read, write, _ = await asyncio.wait_for(

View File

@@ -2,11 +2,54 @@
from typing import Any
import httpx
from typing_extensions import Self
from crewai.mcp.transports.base import BaseTransport, TransportType
def _create_httpx_client_factory(
verify: bool | str,
) -> Any:
"""Create a custom httpx client factory with SSL verification settings.
This factory preserves MCP's default client settings (follow_redirects, timeout)
while allowing customization of SSL verification.
Args:
verify: SSL verification setting. True for default verification,
False to disable, or a path to a CA bundle file.
Returns:
A factory function compatible with MCP's McpHttpClientFactory protocol.
"""
def factory(
headers: dict[str, str] | None = None,
timeout: httpx.Timeout | None = None,
auth: httpx.Auth | None = None,
) -> httpx.AsyncClient:
kwargs: dict[str, Any] = {
"follow_redirects": True,
"verify": verify,
}
if timeout is None:
kwargs["timeout"] = httpx.Timeout(30.0)
else:
kwargs["timeout"] = timeout
if headers is not None:
kwargs["headers"] = headers
if auth is not None:
kwargs["auth"] = auth
return httpx.AsyncClient(**kwargs)
return factory
class SSETransport(BaseTransport):
"""SSE transport for connecting to remote MCP servers.
@@ -21,6 +64,12 @@ class SSETransport(BaseTransport):
)
async with transport:
# Use transport...
# With SSL verification disabled
transport = SSETransport(
url="https://internal-server.example.com/mcp/sse",
verify=False
)
```
"""
@@ -28,6 +77,7 @@ class SSETransport(BaseTransport):
self,
url: str,
headers: dict[str, str] | None = None,
verify: bool | str = True,
**kwargs: Any,
) -> None:
"""Initialize SSE transport.
@@ -35,11 +85,14 @@ class SSETransport(BaseTransport):
Args:
url: Server URL (e.g., "https://api.example.com/mcp/sse").
headers: Optional HTTP headers.
verify: SSL certificate verification. Set to False to disable,
or provide a path to a CA bundle file (default: True).
**kwargs: Additional transport options.
"""
super().__init__(**kwargs)
self.url = url
self.headers = headers or {}
self.verify = verify
self._transport_context: Any = None
@property
@@ -63,10 +116,16 @@ class SSETransport(BaseTransport):
try:
from mcp.client.sse import sse_client
self._transport_context = sse_client(
self.url,
headers=self.headers if self.headers else None,
)
client_kwargs: dict[str, Any] = {
"headers": self.headers if self.headers else None,
}
if self.verify is not True:
client_kwargs["httpx_client_factory"] = _create_httpx_client_factory(
self.verify
)
self._transport_context = sse_client(self.url, **client_kwargs)
read, write = await self._transport_context.__aenter__()

View File

@@ -0,0 +1,73 @@
"""Tests for HTTP transport."""
import pytest
from crewai.mcp.transports.http import HTTPTransport, _create_httpx_client_factory
def test_http_transport_verify_default():
"""Test HTTPTransport has verify=True by default."""
transport = HTTPTransport(url="http://localhost:9999/mcp")
assert transport.verify is True
def test_http_transport_verify_false():
"""Test HTTPTransport with verify=False."""
transport = HTTPTransport(
url="http://localhost:9999/mcp",
verify=False,
)
assert transport.verify is False
def test_http_transport_verify_ca_bundle():
"""Test HTTPTransport with custom CA bundle path."""
transport = HTTPTransport(
url="http://localhost:9999/mcp",
verify="/path/to/ca-bundle.crt",
)
assert transport.verify == "/path/to/ca-bundle.crt"
def test_http_transport_streamable_default():
"""Test HTTPTransport has streamable=True by default."""
transport = HTTPTransport(url="http://localhost:9999/mcp")
assert transport.streamable is True
def test_create_httpx_client_factory_returns_async_client():
"""Test _create_httpx_client_factory returns an AsyncClient."""
import httpx
factory = _create_httpx_client_factory(verify=False)
client = factory()
assert isinstance(client, httpx.AsyncClient)
def test_create_httpx_client_factory_preserves_mcp_defaults():
"""Test _create_httpx_client_factory preserves MCP default settings."""
factory = _create_httpx_client_factory(verify=False)
client = factory(headers={"X-Test": "value"})
assert client.follow_redirects is True
assert client.timeout.connect == 30.0
assert client.headers.get("X-Test") == "value"
def test_create_httpx_client_factory_with_custom_timeout():
"""Test _create_httpx_client_factory respects custom timeout."""
import httpx
factory = _create_httpx_client_factory(verify=False)
custom_timeout = httpx.Timeout(60.0)
client = factory(timeout=custom_timeout)
assert client.timeout.connect == 60.0
def test_create_httpx_client_factory_with_auth():
"""Test _create_httpx_client_factory passes auth parameter."""
import httpx
factory = _create_httpx_client_factory(verify=False)
auth = httpx.BasicAuth(username="user", password="pass")
client = factory(auth=auth)
assert client.auth is not None

View File

@@ -198,3 +198,187 @@ async def test_mcp_tool_execution_in_async_context(mock_tool_definitions):
assert result == "test result"
mock_client.call_tool.assert_called()
def test_http_mcp_config_verify_default():
"""Test MCPServerHTTP has verify=True by default."""
http_config = MCPServerHTTP(url="https://api.example.com/mcp")
assert http_config.verify is True
def test_http_mcp_config_verify_false():
"""Test MCPServerHTTP with verify=False."""
http_config = MCPServerHTTP(
url="https://api.example.com/mcp",
verify=False,
)
assert http_config.verify is False
def test_http_mcp_config_verify_ca_bundle():
"""Test MCPServerHTTP with custom CA bundle path."""
http_config = MCPServerHTTP(
url="https://api.example.com/mcp",
verify="/path/to/ca-bundle.crt",
)
assert http_config.verify == "/path/to/ca-bundle.crt"
def test_sse_mcp_config_verify_default():
"""Test MCPServerSSE has verify=True by default."""
sse_config = MCPServerSSE(url="https://api.example.com/mcp/sse")
assert sse_config.verify is True
def test_sse_mcp_config_verify_false():
"""Test MCPServerSSE with verify=False."""
sse_config = MCPServerSSE(
url="https://api.example.com/mcp/sse",
verify=False,
)
assert sse_config.verify is False
def test_sse_mcp_config_verify_ca_bundle():
"""Test MCPServerSSE with custom CA bundle path."""
sse_config = MCPServerSSE(
url="https://api.example.com/mcp/sse",
verify="/path/to/ca-bundle.crt",
)
assert sse_config.verify == "/path/to/ca-bundle.crt"
def test_agent_with_http_mcp_config_verify_false(mock_tool_definitions):
"""Test agent setup with MCPServerHTTP configuration with verify=False."""
http_config = MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer test_token"},
verify=False,
)
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
mcps=[http_config],
)
with patch("crewai.agent.core.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = agent.get_mcp_tools([http_config])
assert len(tools) == 2
assert all(isinstance(tool, BaseTool) for tool in tools)
mock_client_class.assert_called_once()
call_args = mock_client_class.call_args
transport = call_args.kwargs["transport"]
assert transport.url == "https://api.example.com/mcp"
assert transport.headers == {"Authorization": "Bearer test_token"}
assert transport.verify is False
def test_agent_with_http_mcp_config_verify_ca_bundle(mock_tool_definitions):
"""Test agent setup with MCPServerHTTP configuration with custom CA bundle."""
http_config = MCPServerHTTP(
url="https://api.example.com/mcp",
verify="/path/to/ca-bundle.crt",
)
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
mcps=[http_config],
)
with patch("crewai.agent.core.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = agent.get_mcp_tools([http_config])
assert len(tools) == 2
mock_client_class.assert_called_once()
call_args = mock_client_class.call_args
transport = call_args.kwargs["transport"]
assert transport.verify == "/path/to/ca-bundle.crt"
def test_agent_with_sse_mcp_config_verify_false(mock_tool_definitions):
"""Test agent setup with MCPServerSSE configuration with verify=False."""
sse_config = MCPServerSSE(
url="https://api.example.com/mcp/sse",
headers={"Authorization": "Bearer test_token"},
verify=False,
)
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
mcps=[sse_config],
)
with patch("crewai.agent.core.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = agent.get_mcp_tools([sse_config])
assert len(tools) == 2
assert all(isinstance(tool, BaseTool) for tool in tools)
mock_client_class.assert_called_once()
call_args = mock_client_class.call_args
transport = call_args.kwargs["transport"]
assert transport.url == "https://api.example.com/mcp/sse"
assert transport.headers == {"Authorization": "Bearer test_token"}
assert transport.verify is False
def test_agent_with_sse_mcp_config_verify_ca_bundle(mock_tool_definitions):
"""Test agent setup with MCPServerSSE configuration with custom CA bundle."""
sse_config = MCPServerSSE(
url="https://api.example.com/mcp/sse",
verify="/path/to/ca-bundle.crt",
)
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
mcps=[sse_config],
)
with patch("crewai.agent.core.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = agent.get_mcp_tools([sse_config])
assert len(tools) == 2
mock_client_class.assert_called_once()
call_args = mock_client_class.call_args
transport = call_args.kwargs["transport"]
assert transport.verify == "/path/to/ca-bundle.crt"

View File

@@ -2,7 +2,7 @@
import pytest
from crewai.mcp.transports.sse import SSETransport
from crewai.mcp.transports.sse import SSETransport, _create_httpx_client_factory
@pytest.mark.asyncio
@@ -19,4 +19,46 @@ async def test_sse_transport_connect_does_not_pass_invalid_args():
with pytest.raises(ConnectionError) as exc_info:
await transport.connect()
assert "unexpected keyword argument" not in str(exc_info.value)
assert "unexpected keyword argument" not in str(exc_info.value)
def test_sse_transport_verify_default():
"""Test SSETransport has verify=True by default."""
transport = SSETransport(url="http://localhost:9999/sse")
assert transport.verify is True
def test_sse_transport_verify_false():
"""Test SSETransport with verify=False."""
transport = SSETransport(
url="http://localhost:9999/sse",
verify=False,
)
assert transport.verify is False
def test_sse_transport_verify_ca_bundle():
"""Test SSETransport with custom CA bundle path."""
transport = SSETransport(
url="http://localhost:9999/sse",
verify="/path/to/ca-bundle.crt",
)
assert transport.verify == "/path/to/ca-bundle.crt"
def test_create_httpx_client_factory_returns_async_client():
"""Test _create_httpx_client_factory returns an AsyncClient."""
import httpx
factory = _create_httpx_client_factory(verify=False)
client = factory()
assert isinstance(client, httpx.AsyncClient)
def test_create_httpx_client_factory_preserves_mcp_defaults():
"""Test _create_httpx_client_factory preserves MCP default settings."""
factory = _create_httpx_client_factory(verify=False)
client = factory(headers={"X-Test": "value"})
assert client.follow_redirects is True
assert client.timeout.connect == 30.0
assert client.headers.get("X-Test") == "value"

8376
uv.lock generated

File diff suppressed because it is too large Load Diff