feat: Add MCP progress notifications and middleware headers support

Implements progress reporting and HTTP headers support for MCP tool integration
to address issue #3797.

Changes:
- Add MCPToolProgressEvent to event system for real-time progress tracking
- Extend MCPToolWrapper to support progress callbacks and event emission
- Add mcp_progress_enabled flag to Agent for opt-in progress notifications
- Add mcp_server_headers to Agent for middleware authentication/tracking
- Thread progress and headers configuration through Agent._get_external_mcp_tools
- Add comprehensive test coverage for progress and headers features
- Update MCP DSL documentation with progress and headers examples

Features:
- Progress notifications emitted as MCPToolProgressEvent via event bus
- Optional progress callback for custom progress handling
- HTTP headers passthrough for authentication and middleware integration
- Agent and task context included in progress events
- Opt-in design ensures backward compatibility

Tests:
- Unit tests for MCPToolWrapper progress and headers functionality
- Integration tests for Agent MCP configuration
- Mock-based tests to avoid network dependencies

Documentation:
- Added Progress Notifications section with examples
- Added Middleware Support with Headers section
- Included complete examples for common use cases

Fixes #3797

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-10-26 09:45:50 +00:00
parent 494ed7e671
commit 873d501401
8 changed files with 996 additions and 3 deletions

View File

@@ -339,6 +339,225 @@ mcps=["https://mcp.example.com/mcp?api_key=valid_key"]
# Ensure query parameters are properly URL encoded
```
## Progress Notifications
CrewAI supports progress notifications from MCP servers during long-running tool executions. This provides real-time visibility into tool execution status and enables precise monitoring of complex operations.
### Enabling Progress Notifications
Enable progress tracking by setting `mcp_progress_enabled=True` on your agent:
```python
from crewai import Agent
from crewai.events import crewai_event_bus, MCPToolProgressEvent
agent = Agent(
role="Data Processing Specialist",
goal="Process large datasets efficiently",
backstory="Expert at handling long-running data operations with real-time monitoring",
mcps=["https://data-processor.example.com/mcp"],
mcp_progress_enabled=True
)
```
### Listening to Progress Events
Progress notifications are emitted as `MCPToolProgressEvent` through the CrewAI event bus:
```python
def handle_progress(source, event: MCPToolProgressEvent):
print(f"Tool: {event.tool_name}")
print(f"Progress: {event.progress}/{event.total or '?'}")
print(f"Message: {event.message}")
print(f"Agent: {event.agent_role}")
crewai_event_bus.register(MCPToolProgressEvent, handle_progress)
result = crew.kickoff()
```
### Progress Event Fields
The `MCPToolProgressEvent` provides detailed progress information:
- `tool_name`: Name of the MCP tool being executed
- `server_name`: Name of the MCP server
- `progress`: Current progress value
- `total`: Total progress value (optional)
- `message`: Progress message from the server (optional)
- `agent_id`: ID of the agent executing the tool
- `agent_role`: Role of the agent
- `task_id`: ID of the task being executed (if available)
- `task_name`: Name of the task (if available)
### Complete Progress Monitoring Example
```python
from crewai import Agent, Task, Crew, Process
from crewai.events import crewai_event_bus, MCPToolProgressEvent
progress_updates = []
def track_progress(source, event: MCPToolProgressEvent):
progress_updates.append({
"tool": event.tool_name,
"progress": event.progress,
"total": event.total,
"message": event.message,
"timestamp": event.timestamp
})
if event.total:
percentage = (event.progress / event.total) * 100
print(f"[{event.agent_role}] {event.tool_name}: {percentage:.1f}% - {event.message}")
else:
print(f"[{event.agent_role}] {event.tool_name}: {event.progress} - {event.message}")
crewai_event_bus.register(MCPToolProgressEvent, track_progress)
agent = Agent(
role="Large-Scale Data Analyst",
goal="Analyze massive datasets with progress tracking",
backstory="Specialist in processing large-scale data operations with real-time monitoring",
mcps=["https://analytics.example.com/mcp"],
mcp_progress_enabled=True
)
task = Task(
description="Process and analyze the complete customer dataset",
expected_output="Comprehensive analysis report with insights",
agent=agent
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff()
print(f"Total progress updates received: {len(progress_updates)}")
```
## Middleware Support with Headers
CrewAI provides precise control over MCP server communication through custom HTTP headers. This enables authentication, request tracking, and integration with server-side middleware for enhanced security and monitoring.
### Configuring Headers
Pass custom headers to MCP servers using `mcp_server_headers`:
```python
from crewai import Agent
agent = Agent(
role="Secure API Consumer",
goal="Access protected MCP services securely",
backstory="Security-conscious agent with proper authentication credentials",
mcps=["https://secure-api.example.com/mcp"],
mcp_server_headers={
"Authorization": "Bearer your_access_token",
"X-Client-ID": "crewai-client-123",
"X-Request-Source": "production-crew"
}
)
```
### Common Header Use Cases
#### Authentication
```python
import os
agent = Agent(
role="Authenticated Researcher",
goal="Access premium research tools",
backstory="Researcher with authenticated access to premium data sources",
mcps=["https://premium-research.example.com/mcp"],
mcp_server_headers={
"Authorization": f"Bearer {os.getenv('RESEARCH_API_TOKEN')}",
"X-API-Key": os.getenv("RESEARCH_API_KEY")
}
)
```
#### Request Tracking
```python
import uuid
request_id = str(uuid.uuid4())
agent = Agent(
role="Tracked Operations Agent",
goal="Execute operations with full traceability",
backstory="Agent designed for auditable operations with request tracking",
mcps=["https://tracked-service.example.com/mcp"],
mcp_server_headers={
"X-Request-ID": request_id,
"X-Client-Version": "crewai-2.0",
"X-Environment": "production"
}
)
```
#### Rate Limiting and Quotas
```python
agent = Agent(
role="Quota-Managed Agent",
goal="Operate within API quotas and rate limits",
backstory="Agent configured for efficient API usage within quota constraints",
mcps=["https://rate-limited-api.example.com/mcp"],
mcp_server_headers={
"X-Client-ID": "crew-client-001",
"X-Priority": "high",
"X-Quota-Group": "premium-tier"
}
)
```
### Combining Progress and Headers
For complex use cases requiring both progress monitoring and middleware integration:
```python
from crewai import Agent, Task, Crew
from crewai.events import crewai_event_bus, MCPToolProgressEvent
import os
def monitor_progress(source, event: MCPToolProgressEvent):
print(f"Progress: {event.tool_name} - {event.progress}/{event.total}")
crewai_event_bus.register(MCPToolProgressEvent, monitor_progress)
agent = Agent(
role="Enterprise Data Processor",
goal="Process enterprise data with full monitoring and security",
backstory="Enterprise-grade agent with authenticated access and progress tracking",
mcps=["https://enterprise-api.example.com/mcp"],
mcp_progress_enabled=True,
mcp_server_headers={
"Authorization": f"Bearer {os.getenv('ENTERPRISE_TOKEN')}",
"X-Client-ID": "enterprise-crew-001",
"X-Request-Source": "production",
"X-Enable-Progress": "true"
}
)
task = Task(
description="Process quarterly financial data with real-time progress updates",
expected_output="Complete financial analysis with processing metrics",
agent=agent
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
```
## Advanced: MCPServerAdapter
For complex scenarios requiring manual connection management, use the `MCPServerAdapter` class from `crewai-tools`. Using a Python context manager (`with` statement) is the recommended approach as it automatically handles starting and stopping the connection to the MCP server.