Compare commits

...

3 Commits

Author SHA1 Message Date
Thiago Moretto
230d72e5b9 Add initial doc part of keyed mcp server params 2025-07-18 10:37:16 -04:00
Thiago Moretto
1c48e134a9 Merge branch 'main' into tm-multiple-mcp-servers-crew-base 2025-07-18 10:11:12 -03:00
Thiago Moretto
db90371c22 Adding support to multiple mcp servers on @CrewBase 2025-07-18 09:01:13 -04:00
3 changed files with 124 additions and 18 deletions

View File

@@ -172,6 +172,60 @@ def another_agent(self):
)
```
### Using Multiple MCP Servers with CrewBase
You can configure multiple MCP servers and assign different servers and tools to different agents. Use a dictionary to pass multiple named MCP servers.
```python
@CrewBase
class CrewWithMultipleMCP:
# ... define your agents and tasks config file ...
# MCP servers keyed by server name
mcp_server_params = {
"web_tools": {
"url": "http://localhost:8000/mcp",
"transport": "streamable-http"
},
"data_tools": {
"url":
"http://localhost:8001/sse",
"transport": "sse"
},
"local_tools": StdioServerParameters(
command="python3",
args=["servers/local_server.py"],
env={"UV_PYTHON": "3.12", **os.environ},
)
}
@agent
def web_researcher(self):
# Use tools from specific server
return Agent(
config=self.agents_config["web_researcher"],
tools=self.get_mcp_tools(server="web_tools")
)
@agent
def data_analyst(self):
# Use specific tools from specific server
return Agent(
config=self.agents_config["data_analyst"],
tools=self.get_mcp_tools("analyze_csv", "create_chart", server="data_tools")
)
@agent
def multi_tool_agent(self):
# Use tools from all servers
return Agent(
config=self.agents_config["multi_tool_agent"],
tools=self.get_mcp_tools() # No server specified = all tools
)
# ... rest of your crew setup ...
```
## Explore MCP Integrations
<CardGroup cols={2}>

View File

@@ -1,7 +1,7 @@
import inspect
import logging
from pathlib import Path
from typing import Any, Callable, Dict, TypeVar, cast, List
from typing import Any, Callable, Dict, TypeVar, cast, List, Union
from crewai.tools import BaseTool
import yaml
@@ -28,7 +28,8 @@ def CrewBase(cls: T) -> T:
)
original_tasks_config_path = getattr(cls, "tasks_config", "config/tasks.yaml")
mcp_server_params: Any = getattr(cls, "mcp_server_params", None)
mcp_server_params: Union[list[str | dict[str, str]], dict[str, str], None] = getattr(cls, "mcp_server_params", None)
_mcp_server_adapter: Union[dict[str, Any], Any, None] = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -67,36 +68,57 @@ def CrewBase(cls: T) -> T:
self._original_functions, "is_kickoff"
)
# Add close mcp server method to after kickoff
bound_method = self._create_close_mcp_server_method()
self._after_kickoff['_close_mcp_server'] = bound_method
# Add close mcp servers method to after kickoff
bound_method = self._create_close_mcp_servers_method()
self._after_kickoff['_close_mcp_servers'] = bound_method
def _create_close_mcp_server_method(self):
def _close_mcp_server(self, instance, outputs):
adapter = getattr(self, '_mcp_server_adapter', None)
if adapter is not None:
def _create_close_mcp_servers_method(self):
def _close_mcp_servers(self, instance, outputs):
if self._mcp_server_adapter is None:
return outputs
for adapter in self._mcp_server_adapter.values():
try:
adapter.stop()
except Exception as e:
logging.warning(f"Error stopping MCP server: {e}")
return outputs
_close_mcp_server.is_after_kickoff = True
_close_mcp_servers.is_after_kickoff = True
import types
return types.MethodType(_close_mcp_server, self)
return types.MethodType(_close_mcp_servers, self)
def get_mcp_tools(self, *tool_names: list[str]) -> List[BaseTool]:
def get_mcp_tools(self, *tool_names: list[str], server: str | None = None) -> List[BaseTool]:
if not self.mcp_server_params:
return []
from crewai_tools import MCPServerAdapter
adapter = getattr(self, '_mcp_server_adapter', None)
if not adapter:
self._mcp_server_adapter = MCPServerAdapter(self.mcp_server_params)
if isinstance(self.mcp_server_params, list):
if self._mcp_server_adapter is None:
self._mcp_server_adapter = MCPServerAdapter(self.mcp_server_params)
if server is not None and len(self.mcp_server_params) > 1:
logging.warning("Using list of MCP server parameters. To use server parameter, please use a dictionary of MCP server parameters.")
# Type assertion: when mcp_server_params is a list, _mcp_server_adapter is a single MCPServerAdapter
adapter = cast(Any, self._mcp_server_adapter)
return adapter.tools.filter_by_names(tool_names or None)
return self._mcp_server_adapter.tools.filter_by_names(tool_names or None)
# Separated MCP adapters for each server.
elif isinstance(self.mcp_server_params, dict):
if self._mcp_server_adapter is None:
self._mcp_server_adapter = {}
aggregated_tools = []
for server_name, params in self.mcp_server_params.items():
if server is not None and server_name != server:
continue
adapter = self._mcp_server_adapter.get(server_name, None)
if not adapter:
self._mcp_server_adapter[server_name] = MCPServerAdapter(params)
aggregated_tools.extend(
self._mcp_server_adapter[server_name].tools.filter_by_names(tool_names or None))
return aggregated_tools
def load_configurations(self):

View File

@@ -87,7 +87,7 @@ class InternalCrew:
@CrewBase
class InternalCrewWithMCP(InternalCrew):
mcp_server_params = {"host": "localhost", "port": 8000}
mcp_server_params = [{"url": "localhost", "port": 8000}]
@agent
def reporting_analyst(self):
@@ -97,6 +97,19 @@ class InternalCrewWithMCP(InternalCrew):
def researcher(self):
return Agent(config=self.agents_config["researcher"], tools=self.get_mcp_tools("simple_tool")) # type: ignore[index]
@CrewBase
class InternalCrewWithMultipleMCP(InternalCrew):
mcp_server_params = {"mcp1": {"url": "localhost", "port": 8000}, "mcp2": {"url": "localhost", "port": 8001}}
@agent
def reporting_analyst(self):
return Agent(config=self.agents_config["reporting_analyst"], tools=self.get_mcp_tools(server="mcp1")) # type: ignore[index]
@agent
def researcher(self):
return Agent(config=self.agents_config["researcher"], tools=self.get_mcp_tools("simple_tool", server="mcp2")) # type: ignore[index]
def test_agent_memoization():
crew = SimpleCrew()
first_call_result = crew.simple_agent()
@@ -270,4 +283,21 @@ def test_internal_crew_with_mcp():
assert crew.reporting_analyst().tools == [simple_tool, another_simple_tool]
assert crew.researcher().tools == [simple_tool]
adapter_mock.assert_called_once_with({"host": "localhost", "port": 8000})
adapter_mock.assert_called_once_with([{"url": "localhost", "port": 8000}])
def test_internal_crew_with_multiple_mcp():
from crewai_tools import MCPServerAdapter
from crewai_tools.adapters.mcp_adapter import ToolCollection
from unittest.mock import call
mock = Mock(spec=MCPServerAdapter)
mock.tools = ToolCollection([simple_tool, another_simple_tool])
with patch("crewai_tools.MCPServerAdapter", return_value=mock) as adapter_mock:
crew = InternalCrewWithMultipleMCP()
assert crew.reporting_analyst().tools == [simple_tool, another_simple_tool]
assert crew.researcher().tools == [simple_tool]
adapter_mock.assert_has_calls([
call({"url": "localhost", "port": 8000}),
call({"url": "localhost", "port": 8001})
], any_order=True)