Files
crewAI/docs/en/mcp/dsl-integration.mdx
Devin AI 873d501401 feat: Add MCP progress notifications and middleware headers support
Implements progress reporting and HTTP headers support for MCP tool integration
to address issue #3797.

Changes:
- Add MCPToolProgressEvent to event system for real-time progress tracking
- Extend MCPToolWrapper to support progress callbacks and event emission
- Add mcp_progress_enabled flag to Agent for opt-in progress notifications
- Add mcp_server_headers to Agent for middleware authentication/tracking
- Thread progress and headers configuration through Agent._get_external_mcp_tools
- Add comprehensive test coverage for progress and headers features
- Update MCP DSL documentation with progress and headers examples

Features:
- Progress notifications emitted as MCPToolProgressEvent via event bus
- Optional progress callback for custom progress handling
- HTTP headers passthrough for authentication and middleware integration
- Agent and task context included in progress events
- Opt-in design ensures backward compatibility

Tests:
- Unit tests for MCPToolWrapper progress and headers functionality
- Integration tests for Agent MCP configuration
- Mock-based tests to avoid network dependencies

Documentation:
- Added Progress Notifications section with examples
- Added Middleware Support with Headers section
- Included complete examples for common use cases

Fixes #3797

Co-Authored-By: João <joao@crewai.com>
2025-10-26 09:45:50 +00:00

564 lines
16 KiB
Plaintext

---
title: MCP DSL Integration
description: Learn how to use CrewAI's simple DSL syntax to integrate MCP servers directly with your agents using the mcps field.
icon: code
mode: "wide"
---
## Overview
CrewAI's MCP DSL (Domain Specific Language) integration provides the **simplest way** to connect your agents to MCP (Model Context Protocol) servers. Just add an `mcps` field to your agent and CrewAI handles all the complexity automatically.
<Info>
This is the **recommended approach** for most MCP use cases. For advanced scenarios requiring manual connection management, see [MCPServerAdapter](/en/mcp/overview#advanced-mcpserveradapter).
</Info>
## Basic Usage
Add MCP servers to your agent using the `mcps` field:
```python
from crewai import Agent
agent = Agent(
role="Research Assistant",
goal="Help with research and analysis tasks",
backstory="Expert assistant with access to advanced research tools",
mcps=[
"https://mcp.exa.ai/mcp?api_key=your_key&profile=research"
]
)
# MCP tools are now automatically available!
# No need for manual connection management or tool configuration
```
## Supported Reference Formats
### External MCP Remote Servers
```python
# Basic HTTPS server
"https://api.example.com/mcp"
# Server with authentication
"https://mcp.exa.ai/mcp?api_key=your_key&profile=your_profile"
# Server with custom path
"https://services.company.com/api/v1/mcp"
```
### Specific Tool Selection
Use the `#` syntax to select specific tools from a server:
```python
# Get only the forecast tool from weather server
"https://weather.api.com/mcp#get_forecast"
# Get only the search tool from Exa
"https://mcp.exa.ai/mcp?api_key=your_key#web_search_exa"
```
### CrewAI AMP Marketplace
Access tools from the CrewAI AMP marketplace:
```python
# Full service with all tools
"crewai-amp:financial-data"
# Specific tool from AMP service
"crewai-amp:research-tools#pubmed_search"
# Multiple AMP services
mcps=[
"crewai-amp:weather-insights",
"crewai-amp:market-analysis",
"crewai-amp:social-media-monitoring"
]
```
## Complete Example
Here's a complete example using multiple MCP servers:
```python
from crewai import Agent, Task, Crew, Process
# Create agent with multiple MCP sources
multi_source_agent = Agent(
role="Multi-Source Research Analyst",
goal="Conduct comprehensive research using multiple data sources",
backstory="""Expert researcher with access to web search, weather data,
financial information, and academic research tools""",
mcps=[
# External MCP servers
"https://mcp.exa.ai/mcp?api_key=your_exa_key&profile=research",
"https://weather.api.com/mcp#get_current_conditions",
# CrewAI AMP marketplace
"crewai-amp:financial-insights",
"crewai-amp:academic-research#pubmed_search",
"crewai-amp:market-intelligence#competitor_analysis"
]
)
# Create comprehensive research task
research_task = Task(
description="""Research the impact of AI agents on business productivity.
Include current weather impacts on remote work, financial market trends,
and recent academic publications on AI agent frameworks.""",
expected_output="""Comprehensive report covering:
1. AI agent business impact analysis
2. Weather considerations for remote work
3. Financial market trends related to AI
4. Academic research citations and insights
5. Competitive landscape analysis""",
agent=multi_source_agent
)
# Create and execute crew
research_crew = Crew(
agents=[multi_source_agent],
tasks=[research_task],
process=Process.sequential,
verbose=True
)
result = research_crew.kickoff()
print(f"Research completed with {len(multi_source_agent.mcps)} MCP data sources")
```
## Tool Naming and Organization
CrewAI automatically handles tool naming to prevent conflicts:
```python
# Original MCP server has tools: "search", "analyze"
# CrewAI creates tools: "mcp_exa_ai_search", "mcp_exa_ai_analyze"
agent = Agent(
role="Tool Organization Demo",
goal="Show how tool naming works",
backstory="Demonstrates automatic tool organization",
mcps=[
"https://mcp.exa.ai/mcp?api_key=key", # Tools: mcp_exa_ai_*
"https://weather.service.com/mcp", # Tools: weather_service_com_*
"crewai-amp:financial-data" # Tools: financial_data_*
]
)
# Each server's tools get unique prefixes based on the server name
# This prevents naming conflicts between different MCP servers
```
## Error Handling and Resilience
The MCP DSL is designed to be robust and user-friendly:
### Graceful Server Failures
```python
agent = Agent(
role="Resilient Researcher",
goal="Research despite server issues",
backstory="Experienced researcher who adapts to available tools",
mcps=[
"https://primary-server.com/mcp", # Primary data source
"https://backup-server.com/mcp", # Backup if primary fails
"https://unreachable-server.com/mcp", # Will be skipped with warning
"crewai-amp:reliable-service" # Reliable AMP service
]
)
# Agent will:
# 1. Successfully connect to working servers
# 2. Log warnings for failing servers
# 3. Continue with available tools
# 4. Not crash or hang on server failures
```
### Timeout Protection
All MCP operations have built-in timeouts:
- **Connection timeout**: 10 seconds
- **Tool execution timeout**: 30 seconds
- **Discovery timeout**: 15 seconds
```python
# These servers will timeout gracefully if unresponsive
mcps=[
"https://slow-server.com/mcp", # Will timeout after 10s if unresponsive
"https://overloaded-api.com/mcp" # Will timeout if discovery takes > 15s
]
```
## Performance Features
### Automatic Caching
Tool schemas are cached for 5 minutes to improve performance:
```python
# First agent creation - discovers tools from server
agent1 = Agent(role="First", goal="Test", backstory="Test",
mcps=["https://api.example.com/mcp"])
# Second agent creation (within 5 minutes) - uses cached tool schemas
agent2 = Agent(role="Second", goal="Test", backstory="Test",
mcps=["https://api.example.com/mcp"]) # Much faster!
```
### On-Demand Connections
Tool connections are established only when tools are actually used:
```python
# Agent creation is fast - no MCP connections made yet
agent = Agent(
role="On-Demand Agent",
goal="Use tools efficiently",
backstory="Efficient agent that connects only when needed",
mcps=["https://api.example.com/mcp"]
)
# MCP connection is made only when a tool is actually executed
# This minimizes connection overhead and improves startup performance
```
## Integration with Existing Features
MCP tools work seamlessly with other CrewAI features:
```python
from crewai.tools import BaseTool
class CustomTool(BaseTool):
name: str = "custom_analysis"
description: str = "Custom analysis tool"
def _run(self, **kwargs):
return "Custom analysis result"
agent = Agent(
role="Full-Featured Agent",
goal="Use all available tool types",
backstory="Agent with comprehensive tool access",
# All tool types work together
tools=[CustomTool()], # Custom tools
apps=["gmail", "slack"], # Platform integrations
mcps=[ # MCP servers
"https://mcp.exa.ai/mcp?api_key=key",
"crewai-amp:research-tools"
],
verbose=True,
max_iter=15
)
```
## Best Practices
### 1. Use Specific Tools When Possible
```python
# Good - only get the tools you need
mcps=["https://weather.api.com/mcp#get_forecast"]
# Less efficient - gets all tools from server
mcps=["https://weather.api.com/mcp"]
```
### 2. Handle Authentication Securely
```python
import os
# Store API keys in environment variables
exa_key = os.getenv("EXA_API_KEY")
exa_profile = os.getenv("EXA_PROFILE")
agent = Agent(
role="Secure Agent",
goal="Use MCP tools securely",
backstory="Security-conscious agent",
mcps=[f"https://mcp.exa.ai/mcp?api_key={exa_key}&profile={exa_profile}"]
)
```
### 3. Plan for Server Failures
```python
# Always include backup options
mcps=[
"https://primary-api.com/mcp", # Primary choice
"https://backup-api.com/mcp", # Backup option
"crewai-amp:reliable-service" # AMP fallback
]
```
### 4. Use Descriptive Agent Roles
```python
agent = Agent(
role="Weather-Enhanced Market Analyst",
goal="Analyze markets considering weather impacts",
backstory="Financial analyst with access to weather data for agricultural market insights",
mcps=[
"https://weather.service.com/mcp#get_forecast",
"crewai-amp:financial-data#stock_analysis"
]
)
```
## Troubleshooting
### Common Issues
**No tools discovered:**
```python
# Check your MCP server URL and authentication
# Verify the server is running and accessible
mcps=["https://mcp.example.com/mcp?api_key=valid_key"]
```
**Connection timeouts:**
```python
# Server may be slow or overloaded
# CrewAI will log warnings and continue with other servers
# Check server status or try backup servers
```
**Authentication failures:**
```python
# Verify API keys and credentials
# Check server documentation for required parameters
# Ensure query parameters are properly URL encoded
```
## Progress Notifications
CrewAI supports progress notifications from MCP servers during long-running tool executions. This provides real-time visibility into tool execution status and enables precise monitoring of complex operations.
### Enabling Progress Notifications
Enable progress tracking by setting `mcp_progress_enabled=True` on your agent:
```python
from crewai import Agent
from crewai.events import crewai_event_bus, MCPToolProgressEvent
agent = Agent(
role="Data Processing Specialist",
goal="Process large datasets efficiently",
backstory="Expert at handling long-running data operations with real-time monitoring",
mcps=["https://data-processor.example.com/mcp"],
mcp_progress_enabled=True
)
```
### Listening to Progress Events
Progress notifications are emitted as `MCPToolProgressEvent` through the CrewAI event bus:
```python
def handle_progress(source, event: MCPToolProgressEvent):
print(f"Tool: {event.tool_name}")
print(f"Progress: {event.progress}/{event.total or '?'}")
print(f"Message: {event.message}")
print(f"Agent: {event.agent_role}")
crewai_event_bus.register(MCPToolProgressEvent, handle_progress)
result = crew.kickoff()
```
### Progress Event Fields
The `MCPToolProgressEvent` provides detailed progress information:
- `tool_name`: Name of the MCP tool being executed
- `server_name`: Name of the MCP server
- `progress`: Current progress value
- `total`: Total progress value (optional)
- `message`: Progress message from the server (optional)
- `agent_id`: ID of the agent executing the tool
- `agent_role`: Role of the agent
- `task_id`: ID of the task being executed (if available)
- `task_name`: Name of the task (if available)
### Complete Progress Monitoring Example
```python
from crewai import Agent, Task, Crew, Process
from crewai.events import crewai_event_bus, MCPToolProgressEvent
progress_updates = []
def track_progress(source, event: MCPToolProgressEvent):
progress_updates.append({
"tool": event.tool_name,
"progress": event.progress,
"total": event.total,
"message": event.message,
"timestamp": event.timestamp
})
if event.total:
percentage = (event.progress / event.total) * 100
print(f"[{event.agent_role}] {event.tool_name}: {percentage:.1f}% - {event.message}")
else:
print(f"[{event.agent_role}] {event.tool_name}: {event.progress} - {event.message}")
crewai_event_bus.register(MCPToolProgressEvent, track_progress)
agent = Agent(
role="Large-Scale Data Analyst",
goal="Analyze massive datasets with progress tracking",
backstory="Specialist in processing large-scale data operations with real-time monitoring",
mcps=["https://analytics.example.com/mcp"],
mcp_progress_enabled=True
)
task = Task(
description="Process and analyze the complete customer dataset",
expected_output="Comprehensive analysis report with insights",
agent=agent
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff()
print(f"Total progress updates received: {len(progress_updates)}")
```
## Middleware Support with Headers
CrewAI provides precise control over MCP server communication through custom HTTP headers. This enables authentication, request tracking, and integration with server-side middleware for enhanced security and monitoring.
### Configuring Headers
Pass custom headers to MCP servers using `mcp_server_headers`:
```python
from crewai import Agent
agent = Agent(
role="Secure API Consumer",
goal="Access protected MCP services securely",
backstory="Security-conscious agent with proper authentication credentials",
mcps=["https://secure-api.example.com/mcp"],
mcp_server_headers={
"Authorization": "Bearer your_access_token",
"X-Client-ID": "crewai-client-123",
"X-Request-Source": "production-crew"
}
)
```
### Common Header Use Cases
#### Authentication
```python
import os
agent = Agent(
role="Authenticated Researcher",
goal="Access premium research tools",
backstory="Researcher with authenticated access to premium data sources",
mcps=["https://premium-research.example.com/mcp"],
mcp_server_headers={
"Authorization": f"Bearer {os.getenv('RESEARCH_API_TOKEN')}",
"X-API-Key": os.getenv("RESEARCH_API_KEY")
}
)
```
#### Request Tracking
```python
import uuid
request_id = str(uuid.uuid4())
agent = Agent(
role="Tracked Operations Agent",
goal="Execute operations with full traceability",
backstory="Agent designed for auditable operations with request tracking",
mcps=["https://tracked-service.example.com/mcp"],
mcp_server_headers={
"X-Request-ID": request_id,
"X-Client-Version": "crewai-2.0",
"X-Environment": "production"
}
)
```
#### Rate Limiting and Quotas
```python
agent = Agent(
role="Quota-Managed Agent",
goal="Operate within API quotas and rate limits",
backstory="Agent configured for efficient API usage within quota constraints",
mcps=["https://rate-limited-api.example.com/mcp"],
mcp_server_headers={
"X-Client-ID": "crew-client-001",
"X-Priority": "high",
"X-Quota-Group": "premium-tier"
}
)
```
### Combining Progress and Headers
For complex use cases requiring both progress monitoring and middleware integration:
```python
from crewai import Agent, Task, Crew
from crewai.events import crewai_event_bus, MCPToolProgressEvent
import os
def monitor_progress(source, event: MCPToolProgressEvent):
print(f"Progress: {event.tool_name} - {event.progress}/{event.total}")
crewai_event_bus.register(MCPToolProgressEvent, monitor_progress)
agent = Agent(
role="Enterprise Data Processor",
goal="Process enterprise data with full monitoring and security",
backstory="Enterprise-grade agent with authenticated access and progress tracking",
mcps=["https://enterprise-api.example.com/mcp"],
mcp_progress_enabled=True,
mcp_server_headers={
"Authorization": f"Bearer {os.getenv('ENTERPRISE_TOKEN')}",
"X-Client-ID": "enterprise-crew-001",
"X-Request-Source": "production",
"X-Enable-Progress": "true"
}
)
task = Task(
description="Process quarterly financial data with real-time progress updates",
expected_output="Complete financial analysis with processing metrics",
agent=agent
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
```
## Advanced: MCPServerAdapter
For complex scenarios requiring manual connection management, use the `MCPServerAdapter` class from `crewai-tools`. Using a Python context manager (`with` statement) is the recommended approach as it automatically handles starting and stopping the connection to the MCP server.