mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-04 13:48:31 +00:00
151 lines
5.7 KiB
Plaintext
151 lines
5.7 KiB
Plaintext
---
|
|
title: SSE Transport
|
|
description: Learn how to connect CrewAI to remote MCP servers using Server-Sent Events (SSE) for real-time communication.
|
|
icon: wifi
|
|
---
|
|
|
|
## Overview
|
|
|
|
Server-Sent Events (SSE) provide a standard way for a web server to send updates to a client over a single, long-lived HTTP connection. In the context of MCP, SSE is used for remote servers to stream data (like tool responses) to your CrewAI application in real-time.
|
|
|
|
## Key Concepts
|
|
|
|
- **Remote Servers**: SSE is suitable for MCP servers hosted remotely.
|
|
- **Unidirectional Stream**: Typically, SSE is a one-way communication channel from server to client.
|
|
- **`MCPServerAdapter` Configuration**: For SSE, you'll provide the server's URL and specify the transport type.
|
|
|
|
## Connecting via SSE
|
|
|
|
You can connect to an SSE-based MCP server using two main approaches for managing the connection lifecycle:
|
|
|
|
### 1. Fully Managed Connection (Recommended)
|
|
|
|
Using a Python context manager (`with` statement) is the recommended approach. It automatically handles establishing and closing the connection to the SSE MCP server.
|
|
|
|
```python
|
|
from crewai import Agent, Task, Crew, Process
|
|
from crewai_tools import MCPServerAdapter
|
|
|
|
server_params = {
|
|
"url": "http://localhost:8000/sse", # Replace with your actual SSE server URL
|
|
"transport": "sse"
|
|
}
|
|
|
|
# Using MCPServerAdapter with a context manager
|
|
try:
|
|
with MCPServerAdapter(server_params) as tools:
|
|
print(f"Available tools from SSE MCP server: {[tool.name for tool in tools]}")
|
|
|
|
# Example: Using a tool from the SSE MCP server
|
|
sse_agent = Agent(
|
|
role="Remote Service User",
|
|
goal="Utilize a tool provided by a remote SSE MCP server.",
|
|
backstory="An AI agent that connects to external services via SSE.",
|
|
tools=tools,
|
|
reasoning=True,
|
|
verbose=True,
|
|
)
|
|
|
|
sse_task = Task(
|
|
description="Fetch real-time stock updates for 'AAPL' using an SSE tool.",
|
|
expected_output="The latest stock price for AAPL.",
|
|
agent=sse_agent,
|
|
markdown=True
|
|
)
|
|
|
|
sse_crew = Crew(
|
|
agents=[sse_agent],
|
|
tasks=[sse_task],
|
|
verbose=True,
|
|
process=Process.sequential
|
|
)
|
|
|
|
if tools: # Only kickoff if tools were loaded
|
|
result = sse_crew.kickoff() # Add inputs={'stock_symbol': 'AAPL'} if tool requires it
|
|
print("\nCrew Task Result (SSE - Managed):\n", result)
|
|
else:
|
|
print("Skipping crew kickoff as tools were not loaded (check server connection).")
|
|
|
|
except Exception as e:
|
|
print(f"Error connecting to or using SSE MCP server (Managed): {e}")
|
|
print("Ensure the SSE MCP server is running and accessible at the specified URL.")
|
|
|
|
```
|
|
|
|
<Note>
|
|
Replace `"http://localhost:8000/sse"` with the actual URL of your SSE MCP server.
|
|
</Note>
|
|
|
|
### 2. Manual Connection Lifecycle
|
|
|
|
If you need finer-grained control, you can manage the `MCPServerAdapter` connection lifecycle manually.
|
|
|
|
<Info>
|
|
You **MUST** call `mcp_server_adapter.stop()` to ensure the connection is closed and resources are released. Using a `try...finally` block is highly recommended.
|
|
</Info>
|
|
|
|
```python
|
|
from crewai import Agent, Task, Crew, Process
|
|
from crewai_tools import MCPServerAdapter
|
|
|
|
server_params = {
|
|
"url": "http://localhost:8000/sse", # Replace with your actual SSE server URL
|
|
"transport": "sse"
|
|
}
|
|
|
|
mcp_server_adapter = None
|
|
try:
|
|
mcp_server_adapter = MCPServerAdapter(server_params)
|
|
mcp_server_adapter.start()
|
|
tools = mcp_server_adapter.tools
|
|
print(f"Available tools (manual SSE): {[tool.name for tool in tools]}")
|
|
|
|
manual_sse_agent = Agent(
|
|
role="Remote Data Analyst",
|
|
goal="Analyze data fetched from a remote SSE MCP server using manual connection management.",
|
|
backstory="An AI skilled in handling SSE connections explicitly.",
|
|
tools=tools,
|
|
verbose=True
|
|
)
|
|
|
|
analysis_task = Task(
|
|
description="Fetch and analyze the latest user activity trends from the SSE server.",
|
|
expected_output="A summary report of user activity trends.",
|
|
agent=manual_sse_agent
|
|
)
|
|
|
|
analysis_crew = Crew(
|
|
agents=[manual_sse_agent],
|
|
tasks=[analysis_task],
|
|
verbose=True,
|
|
process=Process.sequential
|
|
)
|
|
|
|
result = analysis_crew.kickoff()
|
|
print("\nCrew Task Result (SSE - Manual):\n", result)
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred during manual SSE MCP integration: {e}")
|
|
print("Ensure the SSE MCP server is running and accessible.")
|
|
finally:
|
|
if mcp_server_adapter and mcp_server_adapter.is_connected:
|
|
print("Stopping SSE MCP server connection (manual)...")
|
|
mcp_server_adapter.stop() # **Crucial: Ensure stop is called**
|
|
elif mcp_server_adapter:
|
|
print("SSE MCP server adapter was not connected. No stop needed or start failed.")
|
|
|
|
```
|
|
|
|
## Security Considerations for SSE
|
|
|
|
<Warning>
|
|
**DNS Rebinding Attacks**: SSE transports can be vulnerable to DNS rebinding attacks if the MCP server is not properly secured. This could allow malicious websites to interact with local or intranet-based MCP servers.
|
|
</Warning>
|
|
|
|
To mitigate this risk:
|
|
- MCP server implementations should **validate `Origin` headers** on incoming SSE connections.
|
|
- When running local SSE MCP servers for development, **bind only to `localhost` (`127.0.0.1`)** rather than all network interfaces (`0.0.0.0`).
|
|
- Implement **proper authentication** for all SSE connections if they expose sensitive tools or data.
|
|
|
|
For a comprehensive overview of security best practices, please refer to our [Security Considerations](./security.mdx) page and the official [MCP Transport Security documentation](https://modelcontextprotocol.io/docs/concepts/transports#security-considerations).
|