Lorenze/feat mcp first class support (#3850)

* WIP transport support mcp

* refactor: streamline MCP tool loading and error handling

* linted

* Self type from typing with typing_extensions in MCP transport modules

* added tests for mcp setup

* added tests for mcp setup

* docs: enhance MCP overview with detailed integration examples and structured configurations

* feat: implement MCP event handling and logging in event listener and client

- Added MCP event types and handlers for connection and tool execution events.
- Enhanced MCPClient to emit events on connection status and tool execution.
- Updated ConsoleFormatter to handle MCP event logging.
- Introduced new MCP event types for better integration and monitoring.
This commit is contained in:
Lorenze Jay
2025-11-06 17:45:16 -08:00
committed by GitHub
parent 9e5906c52f
commit 6f36d7003b
20 changed files with 2841 additions and 41 deletions

View File

@@ -11,9 +11,13 @@ The [Model Context Protocol](https://modelcontextprotocol.io/introduction) (MCP)
CrewAI offers **two approaches** for MCP integration:
### Simple DSL Integration** (Recommended)
### 🚀 **Simple DSL Integration** (Recommended)
Use the `mcps` field directly on agents for seamless MCP tool integration:
Use the `mcps` field directly on agents for seamless MCP tool integration. The DSL supports both **string references** (for quick setup) and **structured configurations** (for full control).
#### String-Based References (Quick Setup)
Perfect for remote HTTPS servers and CrewAI AMP marketplace:
```python
from crewai import Agent
@@ -32,6 +36,46 @@ agent = Agent(
# MCP tools are now automatically available to your agent!
```
#### Structured Configurations (Full Control)
For complete control over connection settings, tool filtering, and all transport types:
```python
from crewai import Agent
from crewai.mcp import MCPServerStdio, MCPServerHTTP, MCPServerSSE
from crewai.mcp.filters import create_static_tool_filter
agent = Agent(
role="Advanced Research Analyst",
goal="Research with full control over MCP connections",
backstory="Expert researcher with advanced tool access",
mcps=[
# Stdio transport for local servers
MCPServerStdio(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"],
env={"API_KEY": "your_key"},
tool_filter=create_static_tool_filter(
allowed_tool_names=["read_file", "list_directory"]
),
cache_tools_list=True,
),
# HTTP/Streamable HTTP transport for remote servers
MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer your_token"},
streamable=True,
cache_tools_list=True,
),
# SSE transport for real-time streaming
MCPServerSSE(
url="https://stream.example.com/mcp/sse",
headers={"Authorization": "Bearer your_token"},
),
]
)
```
### 🔧 **Advanced: MCPServerAdapter** (For Complex Scenarios)
For advanced use cases requiring manual connection management, the `crewai-tools` library provides the `MCPServerAdapter` class.
@@ -68,12 +112,14 @@ uv pip install 'crewai-tools[mcp]'
## Quick Start: Simple DSL Integration
The easiest way to integrate MCP servers is using the `mcps` field on your agents:
The easiest way to integrate MCP servers is using the `mcps` field on your agents. You can use either string references or structured configurations.
### Quick Start with String References
```python
from crewai import Agent, Task, Crew
# Create agent with MCP tools
# Create agent with MCP tools using string references
research_agent = Agent(
role="Research Analyst",
goal="Find and analyze information using advanced search tools",
@@ -96,13 +142,53 @@ crew = Crew(agents=[research_agent], tasks=[research_task])
result = crew.kickoff()
```
### Quick Start with Structured Configurations
```python
from crewai import Agent, Task, Crew
from crewai.mcp import MCPServerStdio, MCPServerHTTP, MCPServerSSE
# Create agent with structured MCP configurations
research_agent = Agent(
role="Research Analyst",
goal="Find and analyze information using advanced search tools",
backstory="Expert researcher with access to multiple data sources",
mcps=[
# Local stdio server
MCPServerStdio(
command="python",
args=["local_server.py"],
env={"API_KEY": "your_key"},
),
# Remote HTTP server
MCPServerHTTP(
url="https://api.research.com/mcp",
headers={"Authorization": "Bearer your_token"},
),
]
)
# Create task
research_task = Task(
description="Research the latest developments in AI agent frameworks",
expected_output="Comprehensive research report with citations",
agent=research_agent
)
# Create and run crew
crew = Crew(agents=[research_agent], tasks=[research_task])
result = crew.kickoff()
```
That's it! The MCP tools are automatically discovered and available to your agent.
## MCP Reference Formats
The `mcps` field supports various reference formats for maximum flexibility:
The `mcps` field supports both **string references** (for quick setup) and **structured configurations** (for full control). You can mix both formats in the same list.
### External MCP Servers
### String-Based References
#### External MCP Servers
```python
mcps=[
@@ -117,7 +203,7 @@ mcps=[
]
```
### CrewAI AMP Marketplace
#### CrewAI AMP Marketplace
```python
mcps=[
@@ -133,17 +219,166 @@ mcps=[
]
```
### Mixed References
### Structured Configurations
#### Stdio Transport (Local Servers)
Perfect for local MCP servers that run as processes:
```python
from crewai.mcp import MCPServerStdio
from crewai.mcp.filters import create_static_tool_filter
mcps=[
"https://external-api.com/mcp", # External server
"https://weather.service.com/mcp#forecast", # Specific external tool
"crewai-amp:financial-insights", # AMP service
"crewai-amp:data-analysis#sentiment_tool" # Specific AMP tool
MCPServerStdio(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"],
env={"API_KEY": "your_key"},
tool_filter=create_static_tool_filter(
allowed_tool_names=["read_file", "write_file"]
),
cache_tools_list=True,
),
# Python-based server
MCPServerStdio(
command="python",
args=["path/to/server.py"],
env={"UV_PYTHON": "3.12", "API_KEY": "your_key"},
),
]
```
#### HTTP/Streamable HTTP Transport (Remote Servers)
For remote MCP servers over HTTP/HTTPS:
```python
from crewai.mcp import MCPServerHTTP
mcps=[
# Streamable HTTP (default)
MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer your_token"},
streamable=True,
cache_tools_list=True,
),
# Standard HTTP
MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer your_token"},
streamable=False,
),
]
```
#### SSE Transport (Real-Time Streaming)
For remote servers using Server-Sent Events:
```python
from crewai.mcp import MCPServerSSE
mcps=[
MCPServerSSE(
url="https://stream.example.com/mcp/sse",
headers={"Authorization": "Bearer your_token"},
cache_tools_list=True,
),
]
```
### Mixed References
You can combine string references and structured configurations:
```python
from crewai.mcp import MCPServerStdio, MCPServerHTTP
mcps=[
# String references
"https://external-api.com/mcp", # External server
"crewai-amp:financial-insights", # AMP service
# Structured configurations
MCPServerStdio(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"],
),
MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer token"},
),
]
```
### Tool Filtering
Structured configurations support advanced tool filtering:
```python
from crewai.mcp import MCPServerStdio
from crewai.mcp.filters import create_static_tool_filter, create_dynamic_tool_filter, ToolFilterContext
# Static filtering (allow/block lists)
static_filter = create_static_tool_filter(
allowed_tool_names=["read_file", "write_file"],
blocked_tool_names=["delete_file"],
)
# Dynamic filtering (context-aware)
def dynamic_filter(context: ToolFilterContext, tool: dict) -> bool:
# Block dangerous tools for certain agent roles
if context.agent.role == "Code Reviewer":
if "delete" in tool.get("name", "").lower():
return False
return True
mcps=[
MCPServerStdio(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"],
tool_filter=static_filter, # or dynamic_filter
),
]
```
## Configuration Parameters
Each transport type supports specific configuration options:
### MCPServerStdio Parameters
- **`command`** (required): Command to execute (e.g., `"python"`, `"node"`, `"npx"`, `"uvx"`)
- **`args`** (optional): List of command arguments (e.g., `["server.py"]` or `["-y", "@mcp/server"]`)
- **`env`** (optional): Dictionary of environment variables to pass to the process
- **`tool_filter`** (optional): Tool filter function for filtering available tools
- **`cache_tools_list`** (optional): Whether to cache the tool list for faster subsequent access (default: `False`)
### MCPServerHTTP Parameters
- **`url`** (required): Server URL (e.g., `"https://api.example.com/mcp"`)
- **`headers`** (optional): Dictionary of HTTP headers for authentication or other purposes
- **`streamable`** (optional): Whether to use streamable HTTP transport (default: `True`)
- **`tool_filter`** (optional): Tool filter function for filtering available tools
- **`cache_tools_list`** (optional): Whether to cache the tool list for faster subsequent access (default: `False`)
### MCPServerSSE Parameters
- **`url`** (required): Server URL (e.g., `"https://api.example.com/mcp/sse"`)
- **`headers`** (optional): Dictionary of HTTP headers for authentication or other purposes
- **`tool_filter`** (optional): Tool filter function for filtering available tools
- **`cache_tools_list`** (optional): Whether to cache the tool list for faster subsequent access (default: `False`)
### Common Parameters
All transport types support:
- **`tool_filter`**: Filter function to control which tools are available. Can be:
- `None` (default): All tools are available
- Static filter: Created with `create_static_tool_filter()` for allow/block lists
- Dynamic filter: Created with `create_dynamic_tool_filter()` for context-aware filtering
- **`cache_tools_list`**: When `True`, caches the tool list after first discovery to improve performance on subsequent connections
## Key Features
- 🔄 **Automatic Tool Discovery**: Tools are automatically discovered and integrated
@@ -152,26 +387,47 @@ mcps=[
- 🛡️ **Error Resilience**: Graceful handling of unavailable servers
- ⏱️ **Timeout Protection**: Built-in timeouts prevent hanging connections
- 📊 **Transparent Integration**: Works seamlessly with existing CrewAI features
- 🔧 **Full Transport Support**: Stdio, HTTP/Streamable HTTP, and SSE transports
- 🎯 **Advanced Filtering**: Static and dynamic tool filtering capabilities
- 🔐 **Flexible Authentication**: Support for headers, environment variables, and query parameters
## Error Handling
The MCP DSL integration is designed to be resilient:
The MCP DSL integration is designed to be resilient and handles failures gracefully:
```python
from crewai import Agent
from crewai.mcp import MCPServerStdio, MCPServerHTTP
agent = Agent(
role="Resilient Agent",
goal="Continue working despite server issues",
backstory="Agent that handles failures gracefully",
mcps=[
# String references
"https://reliable-server.com/mcp", # Will work
"https://unreachable-server.com/mcp", # Will be skipped gracefully
"https://slow-server.com/mcp", # Will timeout gracefully
"crewai-amp:working-service" # Will work
"crewai-amp:working-service", # Will work
# Structured configs
MCPServerStdio(
command="python",
args=["reliable_server.py"], # Will work
),
MCPServerHTTP(
url="https://slow-server.com/mcp", # Will timeout gracefully
),
]
)
# Agent will use tools from working servers and log warnings for failing ones
```
All connection errors are handled gracefully:
- **Connection failures**: Logged as warnings, agent continues with available tools
- **Timeout errors**: Connections timeout after 30 seconds (configurable)
- **Authentication errors**: Logged clearly for debugging
- **Invalid configurations**: Validation errors are raised at agent creation time
## Advanced: MCPServerAdapter
For complex scenarios requiring manual connection management, use the `MCPServerAdapter` class from `crewai-tools`. Using a Python context manager (`with` statement) is the recommended approach as it automatically handles starting and stopping the connection to the MCP server.