Compare commits

..

3 Commits

Author SHA1 Message Date
Joao Moura
d509dc74b0 fix: improve error handling for HumanFeedbackPending in flow execution
Refined the flow error management to emit a paused event for HumanFeedbackPending exceptions instead of treating them as failures. This enhancement allows the flow to better manage human feedback scenarios, ensuring that the execution state is preserved and appropriately handled without signaling an error. Regular failure events are still emitted for other exceptions, maintaining robust error reporting.
2026-01-07 22:23:14 -08:00
Joao Moura
4640c2c67c fix: handle HumanFeedbackPending in flow error management
Updated the flow error handling to treat HumanFeedbackPending as expected control flow rather than an error. This change ensures that the flow can appropriately manage human feedback scenarios without signaling an error, improving the robustness of the flow execution.
2026-01-07 19:39:31 -08:00
Lorenze Jay
9a212b8e29 feat: bump versions to 1.8.0 (#4199)
* feat: bump versions to 1.8.0

* bump 1.8.0
2026-01-07 15:36:46 -08:00
11 changed files with 58 additions and 1093 deletions

View File

@@ -12,7 +12,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.7.2",
"crewai==1.8.0",
"lancedb~=0.5.4",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",

View File

@@ -291,4 +291,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.7.2"
__version__ = "1.8.0"

View File

@@ -49,7 +49,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.7.2",
"crewai-tools==1.8.0",
]
embeddings = [
"tiktoken~=0.8.0"

View File

@@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.7.2"
__version__ = "1.8.0"
_telemetry_submitted = False

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.7.2"
"crewai[tools]==1.8.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.7.2"
"crewai[tools]==1.8.0"
]
[project.scripts]

View File

@@ -1203,7 +1203,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
result = self.kickoff(inputs=inputs)
result_holder.append(result)
except Exception as e:
signal_error(state, e)
# HumanFeedbackPending is expected control flow, not an error
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
signal_error(state, e)
finally:
self.stream = True
signal_end(state)
@@ -1258,7 +1264,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
result = await self.kickoff_async(inputs=inputs)
result_holder.append(result)
except Exception as e:
signal_error(state, e, is_async=True)
# HumanFeedbackPending is expected control flow, not an error
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
signal_error(state, e, is_async=True)
finally:
self.stream = True
signal_end(state, is_async=True)
@@ -1590,29 +1602,45 @@ class Flow(Generic[T], metaclass=FlowMeta):
return result
except Exception as e:
if not self.suppress_flow_events:
# Check if this is a HumanFeedbackPending exception (paused, not failed)
from crewai.flow.async_feedback.types import HumanFeedbackPending
# Check if this is a HumanFeedbackPending exception (paused, not failed)
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed)
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed)
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
self._persistence = SQLiteFlowPersistence()
self._persistence = SQLiteFlowPersistence()
# Regular failure
future = crewai_event_bus.emit(
self,
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
if future:
self._event_futures.append(future)
# Emit paused event (not failed)
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
MethodExecutionPausedEvent(
type="method_execution_paused",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
flow_id=e.context.flow_id,
message=e.context.message,
emit=e.context.emit,
),
)
if future:
self._event_futures.append(future)
elif not self.suppress_flow_events:
# Regular failure - emit failed event
future = crewai_event_bus.emit(
self,
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
if future:
self._event_futures.append(future)
raise e
def _copy_and_serialize_state(self) -> dict[str, Any]:

View File

@@ -1,10 +1,9 @@
from crewai.tools.base_tool import BaseTool, EnvVar, tool
from crewai.tools.mcp_tool import MCPTool
__all__ = [
"BaseTool",
"EnvVar",
"MCPTool",
"tool",
]

View File

@@ -1,619 +0,0 @@
"""MCPTool - User-friendly interface for MCP server tool integration.
This module provides a simple API for connecting to MCP servers and
getting tools that can be used with CrewAI agents.
Example:
```python
from crewai.tools import MCPTool
# Connect to MCP servers by name (runs via npx)
cloud_tools = MCPTool.from_server("@anthropic/mcp-server-filesystem")
# Connect to MCP servers by URL
api_tools = MCPTool.from_server("https://api.example.com/mcp")
# Use tools with an agent
agent = Agent(
role='Cloud Engineer',
tools=[*cloud_tools, *api_tools]
)
```
"""
from __future__ import annotations
import asyncio
import shutil
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel, Field, create_model
from crewai.tools.base_tool import BaseTool
if TYPE_CHECKING:
from crewai.mcp import (
MCPServerConfig,
MCPServerStdio,
ToolFilter,
)
MCP_CONNECTION_TIMEOUT = 30
MCP_TOOL_EXECUTION_TIMEOUT = 30
MCP_DISCOVERY_TIMEOUT = 30
class MCPTool:
"""Factory class for creating tools from MCP servers.
This class provides a simple interface for connecting to MCP servers
and retrieving tools that can be used with CrewAI agents.
Example:
```python
from crewai.tools import MCPTool
# Get tools from an MCP server by name
tools = MCPTool.from_server("@anthropic/mcp-server-filesystem")
# Get tools from an MCP server by URL
tools = MCPTool.from_server("https://api.example.com/mcp")
# Get tools with custom configuration
tools = MCPTool.from_server(
MCPServerStdio(
command="npx",
args=["-y", "@anthropic/mcp-server-filesystem"],
env={"HOME": "/home/user"}
)
)
```
"""
@classmethod
def from_server(
cls,
server: str | MCPServerConfig,
*,
env: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
tool_filter: ToolFilter | None = None,
cache_tools_list: bool = False,
) -> list[BaseTool]:
"""Get tools from an MCP server.
This method connects to an MCP server, discovers available tools,
and returns them as CrewAI BaseTool instances.
Args:
server: MCP server specification. Can be:
- A server name/package (e.g., "@anthropic/mcp-server-filesystem")
which will be run via npx or uvx
- A URL (e.g., "https://api.example.com/mcp") for HTTP/SSE servers
- An MCPServerConfig object (MCPServerStdio, MCPServerHTTP, MCPServerSSE)
env: Environment variables for stdio servers (ignored for HTTP/SSE).
headers: HTTP headers for HTTP/SSE servers (ignored for stdio).
tool_filter: Optional filter for available tools.
cache_tools_list: Whether to cache the tool list for faster subsequent access.
Returns:
List of BaseTool instances from the MCP server.
Raises:
ValueError: If the server specification is invalid.
ConnectionError: If connection to the MCP server fails.
ImportError: If the MCP library is not installed.
Example:
```python
# By server name (runs via npx)
tools = MCPTool.from_server("@anthropic/mcp-server-filesystem")
# By URL
tools = MCPTool.from_server("https://api.example.com/mcp")
# With environment variables
tools = MCPTool.from_server(
"@anthropic/mcp-server-filesystem",
env={"HOME": "/home/user"}
)
# With HTTP headers
tools = MCPTool.from_server(
"https://api.example.com/mcp",
headers={"Authorization": "Bearer token"}
)
# With tool filter
tools = MCPTool.from_server(
"@anthropic/mcp-server-filesystem",
tool_filter=lambda tools: [t for t in tools if "read" in t["name"]]
)
```
"""
config = cls._resolve_server_config(
server,
env=env,
headers=headers,
tool_filter=tool_filter,
cache_tools_list=cache_tools_list,
)
return cls._get_tools_from_config(config)
@classmethod
def _resolve_server_config(
cls,
server: str | MCPServerConfig,
*,
env: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
tool_filter: ToolFilter | None = None,
cache_tools_list: bool = False,
) -> MCPServerConfig:
"""Resolve server specification to MCPServerConfig.
Args:
server: Server name, URL, or MCPServerConfig.
env: Environment variables for stdio servers.
headers: HTTP headers for HTTP/SSE servers.
tool_filter: Optional tool filter.
cache_tools_list: Whether to cache tools list.
Returns:
MCPServerConfig instance.
"""
from crewai.mcp import MCPServerHTTP, MCPServerSSE, MCPServerStdio
if isinstance(server, (MCPServerStdio, MCPServerHTTP, MCPServerSSE)):
return server
if not isinstance(server, str):
raise ValueError(
f"Invalid server type: {type(server)}. "
"Must be a string (server name or URL) or MCPServerConfig."
)
if server.startswith(("http://", "https://")):
return MCPServerHTTP(
url=server,
headers=headers,
tool_filter=tool_filter,
cache_tools_list=cache_tools_list,
)
return cls._create_stdio_config(
server,
env=env,
tool_filter=tool_filter,
cache_tools_list=cache_tools_list,
)
@classmethod
def _create_stdio_config(
cls,
server_name: str,
*,
env: dict[str, str] | None = None,
tool_filter: ToolFilter | None = None,
cache_tools_list: bool = False,
) -> MCPServerStdio:
"""Create stdio config for a server name.
Determines whether to use npx or uvx based on the server name
and available executables.
Args:
server_name: MCP server package name.
env: Environment variables.
tool_filter: Optional tool filter.
cache_tools_list: Whether to cache tools list.
Returns:
MCPServerStdio configuration.
"""
from crewai.mcp import MCPServerStdio
if server_name.startswith(("@", "mcp-server-")):
command, args = cls._get_npx_command(server_name)
elif server_name.endswith("-mcp-server") or "-mcp" in server_name:
command, args = cls._get_uvx_command(server_name)
else:
command, args = cls._detect_runner(server_name)
return MCPServerStdio(
command=command,
args=args,
env=env,
tool_filter=tool_filter,
cache_tools_list=cache_tools_list,
)
@classmethod
def _get_npx_command(cls, server_name: str) -> tuple[str, list[str]]:
"""Get npx command for running an npm package.
Args:
server_name: NPM package name.
Returns:
Tuple of (command, args).
"""
npx_path = shutil.which("npx")
if npx_path is None:
raise ValueError(
f"npx not found. Please install Node.js to use MCP server: {server_name}"
)
return "npx", ["-y", server_name]
@classmethod
def _get_uvx_command(cls, server_name: str) -> tuple[str, list[str]]:
"""Get uvx command for running a Python package.
Args:
server_name: Python package name.
Returns:
Tuple of (command, args).
"""
uvx_path = shutil.which("uvx")
if uvx_path is None:
raise ValueError(
f"uvx not found. Please install uv to use MCP server: {server_name}"
)
return "uvx", [server_name]
@classmethod
def _detect_runner(cls, server_name: str) -> tuple[str, list[str]]:
"""Detect the appropriate runner for a server name.
Tries npx first, then uvx.
Args:
server_name: Server package name.
Returns:
Tuple of (command, args).
"""
npx_path = shutil.which("npx")
if npx_path is not None:
return "npx", ["-y", server_name]
uvx_path = shutil.which("uvx")
if uvx_path is not None:
return "uvx", [server_name]
raise ValueError(
f"Neither npx nor uvx found. Please install Node.js or uv to use MCP server: {server_name}"
)
@classmethod
def _get_tools_from_config(cls, config: MCPServerConfig) -> list[BaseTool]:
"""Get tools from an MCP server configuration.
Args:
config: MCP server configuration.
Returns:
List of BaseTool instances.
"""
try:
asyncio.get_running_loop()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
asyncio.run, cls._get_tools_from_config_async(config)
)
return future.result()
except RuntimeError:
return asyncio.run(cls._get_tools_from_config_async(config))
@classmethod
async def _get_tools_from_config_async(
cls, config: MCPServerConfig
) -> list[BaseTool]:
"""Async implementation of getting tools from config.
Args:
config: MCP server configuration.
Returns:
List of BaseTool instances.
"""
from crewai.mcp import MCPClient
transport = cls._create_transport(config)
client = MCPClient(
transport,
connect_timeout=MCP_CONNECTION_TIMEOUT,
execution_timeout=MCP_TOOL_EXECUTION_TIMEOUT,
discovery_timeout=MCP_DISCOVERY_TIMEOUT,
cache_tools_list=config.cache_tools_list,
)
try:
await client.connect()
tool_schemas = await client.list_tools()
if config.tool_filter is not None:
tool_schemas = config.tool_filter(tool_schemas)
server_name = cls._extract_server_name(config)
tools = []
for schema in tool_schemas:
tool = cls._create_tool_from_schema(
schema, config, server_name, client.transport
)
tools.append(tool)
return tools
finally:
await client.disconnect()
@classmethod
def _create_transport(cls, config: MCPServerConfig) -> Any:
"""Create transport from config.
Args:
config: MCP server configuration.
Returns:
Transport instance.
"""
from crewai.mcp import MCPServerHTTP, MCPServerSSE, MCPServerStdio
from crewai.mcp.transports.http import HTTPTransport
from crewai.mcp.transports.sse import SSETransport
from crewai.mcp.transports.stdio import StdioTransport
if isinstance(config, MCPServerStdio):
return StdioTransport(
command=config.command,
args=config.args,
env=config.env,
)
if isinstance(config, MCPServerHTTP):
return HTTPTransport(
url=config.url,
headers=config.headers,
)
if isinstance(config, MCPServerSSE):
return SSETransport(
url=config.url,
headers=config.headers,
)
raise ValueError(f"Unsupported MCP server config type: {type(config)}")
@classmethod
def _extract_server_name(cls, config: MCPServerConfig) -> str:
"""Extract a human-readable server name from config.
Args:
config: MCP server configuration.
Returns:
Server name string.
"""
from crewai.mcp import MCPServerHTTP, MCPServerSSE, MCPServerStdio
if isinstance(config, MCPServerStdio):
if config.args:
last_arg = config.args[-1]
if last_arg.startswith("@"):
parts = last_arg.split("/")
return parts[-1] if len(parts) > 1 else last_arg.replace("@", "")
return last_arg.replace("-", "_").replace(".", "_")
return config.command
if isinstance(config, (MCPServerHTTP, MCPServerSSE)):
from urllib.parse import urlparse
parsed = urlparse(config.url)
return parsed.netloc.replace(".", "_").replace("-", "_").replace(":", "_")
return "mcp_server"
@classmethod
def _create_tool_from_schema(
cls,
schema: dict[str, Any],
config: MCPServerConfig,
server_name: str,
transport: Any,
) -> BaseTool:
"""Create a BaseTool from an MCP tool schema.
Args:
schema: Tool schema from MCP server.
config: MCP server configuration.
server_name: Server name for prefixing.
transport: Transport instance for tool execution.
Returns:
BaseTool instance.
"""
tool_name = schema.get("name", "unknown_tool")
description = schema.get("description", f"Tool {tool_name} from {server_name}")
input_schema = schema.get("inputSchema", {})
prefixed_name = f"{server_name}_{tool_name}"
args_schema = cls._json_schema_to_pydantic(tool_name, input_schema)
return _MCPToolInstance(
name=prefixed_name,
description=description,
args_schema=args_schema,
config=config,
original_tool_name=tool_name,
server_name=server_name,
)
@classmethod
def _json_schema_to_pydantic(
cls, tool_name: str, json_schema: dict[str, Any]
) -> type[BaseModel]:
"""Convert JSON schema to Pydantic model.
Args:
tool_name: Tool name for the model.
json_schema: JSON schema from MCP server.
Returns:
Pydantic model class.
"""
properties = json_schema.get("properties", {})
required = set(json_schema.get("required", []))
fields: dict[str, Any] = {}
for prop_name, prop_schema in properties.items():
python_type = cls._json_type_to_python(prop_schema)
description = prop_schema.get("description", "")
if prop_name in required:
fields[prop_name] = (python_type, Field(description=description))
else:
fields[prop_name] = (
python_type | None,
Field(default=None, description=description),
)
model_name = f"{tool_name.title().replace('_', '')}Args"
return create_model(model_name, **fields)
@classmethod
def _json_type_to_python(cls, prop_schema: dict[str, Any]) -> type:
"""Convert JSON schema type to Python type.
Args:
prop_schema: Property schema.
Returns:
Python type.
"""
json_type = prop_schema.get("type", "string")
type_mapping: dict[str, type] = {
"string": str,
"integer": int,
"number": float,
"boolean": bool,
"array": list,
"object": dict,
}
return type_mapping.get(json_type, str)
class _MCPToolInstance(BaseTool):
"""Internal tool instance for MCP tools.
This class wraps MCP tool execution with proper connection management.
"""
def __init__(
self,
name: str,
description: str,
args_schema: type[BaseModel],
config: MCPServerConfig,
original_tool_name: str,
server_name: str,
) -> None:
"""Initialize MCP tool instance.
Args:
name: Prefixed tool name.
description: Tool description.
args_schema: Pydantic model for arguments.
config: MCP server configuration.
original_tool_name: Original tool name on MCP server.
server_name: Server name.
"""
super().__init__(
name=name,
description=description,
args_schema=args_schema,
)
self._config = config
self._original_tool_name = original_tool_name
self._server_name = server_name
@property
def config(self) -> MCPServerConfig:
"""Get the MCP server configuration."""
return self._config
@property
def original_tool_name(self) -> str:
"""Get the original tool name."""
return self._original_tool_name
@property
def server_name(self) -> str:
"""Get the server name."""
return self._server_name
def _run(self, **kwargs: Any) -> str:
"""Execute the MCP tool.
Args:
**kwargs: Tool arguments.
Returns:
Tool execution result.
"""
try:
try:
asyncio.get_running_loop()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
asyncio.run, self._run_async(**kwargs)
)
return future.result()
except RuntimeError:
return asyncio.run(self._run_async(**kwargs))
except Exception as e:
raise RuntimeError(
f"Error executing MCP tool {self.original_tool_name}: {e!s}"
) from e
async def _run_async(self, **kwargs: Any) -> str:
"""Async implementation of tool execution.
Args:
**kwargs: Tool arguments.
Returns:
Tool execution result.
"""
from crewai.mcp import MCPClient
transport = MCPTool._create_transport(self._config)
client = MCPClient(
transport,
connect_timeout=MCP_CONNECTION_TIMEOUT,
execution_timeout=MCP_TOOL_EXECUTION_TIMEOUT,
discovery_timeout=MCP_DISCOVERY_TIMEOUT,
)
try:
await client.connect()
result = await client.call_tool(self._original_tool_name, kwargs)
if isinstance(result, str):
return result
if hasattr(result, "content") and result.content:
if isinstance(result.content, list) and len(result.content) > 0:
content_item = result.content[0]
if hasattr(content_item, "text"):
return str(content_item.text)
return str(content_item)
return str(result.content)
return str(result)
finally:
await client.disconnect()

View File

@@ -1,443 +0,0 @@
"""Tests for MCPTool class."""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from crewai.mcp.config import MCPServerHTTP, MCPServerSSE, MCPServerStdio
from crewai.tools import BaseTool, MCPTool
@pytest.fixture
def mock_tool_definitions():
"""Create mock MCP tool definitions (as returned by list_tools)."""
return [
{
"name": "search",
"description": "Search for information",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"limit": {"type": "integer", "description": "Max results"},
},
"required": ["query"],
},
},
{
"name": "read_file",
"description": "Read a file from the filesystem",
"inputSchema": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
},
"required": ["path"],
},
},
]
class TestMCPToolFromServer:
"""Tests for MCPTool.from_server() method."""
def test_from_server_with_url(self, mock_tool_definitions):
"""Test from_server with an HTTP URL."""
with patch("crewai.mcp.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = MCPTool.from_server("https://api.example.com/mcp")
assert len(tools) == 2
assert all(isinstance(tool, BaseTool) for tool in tools)
tool_names = [tool.name for tool in tools]
assert "api_example_com_search" in tool_names
assert "api_example_com_read_file" in tool_names
def test_from_server_with_headers(self, mock_tool_definitions):
"""Test from_server with HTTP headers."""
with patch("crewai.mcp.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = MCPTool.from_server(
"https://api.example.com/mcp",
headers={"Authorization": "Bearer token"},
)
assert len(tools) == 2
mock_client_class.assert_called_once()
def test_from_server_with_mcp_server_http_config(self, mock_tool_definitions):
"""Test from_server with MCPServerHTTP configuration."""
with patch("crewai.mcp.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
config = MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer token"},
)
tools = MCPTool.from_server(config)
assert len(tools) == 2
assert all(isinstance(tool, BaseTool) for tool in tools)
def test_from_server_with_mcp_server_sse_config(self, mock_tool_definitions):
"""Test from_server with MCPServerSSE configuration."""
with patch("crewai.mcp.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
config = MCPServerSSE(
url="https://api.example.com/mcp/sse",
headers={"Authorization": "Bearer token"},
)
tools = MCPTool.from_server(config)
assert len(tools) == 2
assert all(isinstance(tool, BaseTool) for tool in tools)
def test_from_server_with_mcp_server_stdio_config(self, mock_tool_definitions):
"""Test from_server with MCPServerStdio configuration."""
with patch("crewai.mcp.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
config = MCPServerStdio(
command="python",
args=["server.py"],
env={"API_KEY": "test_key"},
)
tools = MCPTool.from_server(config)
assert len(tools) == 2
assert all(isinstance(tool, BaseTool) for tool in tools)
def test_from_server_with_tool_filter(self, mock_tool_definitions):
"""Test from_server with tool filter."""
with patch("crewai.mcp.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
def filter_search_only(tools):
return [t for t in tools if "search" in t["name"]]
config = MCPServerHTTP(
url="https://api.example.com/mcp",
tool_filter=filter_search_only,
)
tools = MCPTool.from_server(config)
assert len(tools) == 1
assert "search" in tools[0].name
def test_from_server_with_npx_server_name(self, mock_tool_definitions):
"""Test from_server with npm package name (uses npx)."""
with (
patch("crewai.mcp.MCPClient") as mock_client_class,
patch("shutil.which") as mock_which,
):
mock_which.return_value = "/usr/bin/npx"
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = MCPTool.from_server("@anthropic/mcp-server-filesystem")
assert len(tools) == 2
assert all(isinstance(tool, BaseTool) for tool in tools)
def test_from_server_with_uvx_server_name(self, mock_tool_definitions):
"""Test from_server with Python package name (uses uvx)."""
with (
patch("crewai.mcp.MCPClient") as mock_client_class,
patch("shutil.which") as mock_which,
):
def which_side_effect(cmd):
if cmd == "uvx":
return "/usr/bin/uvx"
return None
mock_which.side_effect = which_side_effect
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = MCPTool.from_server("ibmcloud-mcp-server")
assert len(tools) == 2
assert all(isinstance(tool, BaseTool) for tool in tools)
class TestMCPToolResolveServerConfig:
"""Tests for MCPTool._resolve_server_config() method."""
def test_resolve_http_url(self):
"""Test resolving HTTP URL to MCPServerHTTP config."""
config = MCPTool._resolve_server_config("https://api.example.com/mcp")
assert isinstance(config, MCPServerHTTP)
assert config.url == "https://api.example.com/mcp"
def test_resolve_http_url_with_headers(self):
"""Test resolving HTTP URL with headers."""
config = MCPTool._resolve_server_config(
"https://api.example.com/mcp",
headers={"Authorization": "Bearer token"},
)
assert isinstance(config, MCPServerHTTP)
assert config.headers == {"Authorization": "Bearer token"}
def test_resolve_mcp_server_config_passthrough(self):
"""Test that MCPServerConfig objects are passed through."""
original_config = MCPServerHTTP(url="https://api.example.com/mcp")
config = MCPTool._resolve_server_config(original_config)
assert config is original_config
def test_resolve_invalid_type_raises_error(self):
"""Test that invalid server type raises ValueError."""
with pytest.raises(ValueError, match="Invalid server type"):
MCPTool._resolve_server_config(123)
class TestMCPToolCreateStdioConfig:
"""Tests for MCPTool._create_stdio_config() method."""
def test_create_stdio_config_npm_package(self):
"""Test creating stdio config for npm package."""
with patch("shutil.which") as mock_which:
mock_which.return_value = "/usr/bin/npx"
config = MCPTool._create_stdio_config("@anthropic/mcp-server-filesystem")
assert isinstance(config, MCPServerStdio)
assert config.command == "npx"
assert config.args == ["-y", "@anthropic/mcp-server-filesystem"]
def test_create_stdio_config_python_package(self):
"""Test creating stdio config for Python package."""
with patch("shutil.which") as mock_which:
def which_side_effect(cmd):
if cmd == "uvx":
return "/usr/bin/uvx"
return None
mock_which.side_effect = which_side_effect
config = MCPTool._create_stdio_config("ibmcloud-mcp-server")
assert isinstance(config, MCPServerStdio)
assert config.command == "uvx"
assert config.args == ["ibmcloud-mcp-server"]
def test_create_stdio_config_with_env(self):
"""Test creating stdio config with environment variables."""
with patch("shutil.which") as mock_which:
mock_which.return_value = "/usr/bin/npx"
config = MCPTool._create_stdio_config(
"@anthropic/mcp-server-filesystem",
env={"HOME": "/home/user"},
)
assert config.env == {"HOME": "/home/user"}
def test_create_stdio_config_no_runner_raises_error(self):
"""Test that missing npx/uvx raises ValueError."""
with patch("shutil.which") as mock_which:
mock_which.return_value = None
with pytest.raises(ValueError, match="Neither npx nor uvx found"):
MCPTool._create_stdio_config("some-server")
class TestMCPToolExtractServerName:
"""Tests for MCPTool._extract_server_name() method."""
def test_extract_server_name_from_stdio_npm(self):
"""Test extracting server name from npm package."""
config = MCPServerStdio(
command="npx",
args=["-y", "@anthropic/mcp-server-filesystem"],
)
name = MCPTool._extract_server_name(config)
assert name == "mcp-server-filesystem"
def test_extract_server_name_from_stdio_simple(self):
"""Test extracting server name from simple package."""
config = MCPServerStdio(
command="uvx",
args=["ibmcloud-mcp-server"],
)
name = MCPTool._extract_server_name(config)
assert name == "ibmcloud_mcp_server"
def test_extract_server_name_from_http(self):
"""Test extracting server name from HTTP URL."""
config = MCPServerHTTP(url="https://api.example.com/mcp")
name = MCPTool._extract_server_name(config)
assert name == "api_example_com"
def test_extract_server_name_from_sse(self):
"""Test extracting server name from SSE URL."""
config = MCPServerSSE(url="https://api.example.com/mcp/sse")
name = MCPTool._extract_server_name(config)
assert name == "api_example_com"
class TestMCPToolJsonSchemaConversion:
"""Tests for MCPTool JSON schema to Pydantic conversion."""
def test_json_schema_to_pydantic_basic(self):
"""Test converting basic JSON schema to Pydantic model."""
schema = {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
},
"required": ["query"],
}
model = MCPTool._json_schema_to_pydantic("search", schema)
assert model.__name__ == "SearchArgs"
assert "query" in model.model_fields
def test_json_schema_to_pydantic_with_optional(self):
"""Test converting JSON schema with optional fields."""
schema = {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"limit": {"type": "integer", "description": "Max results"},
},
"required": ["query"],
}
model = MCPTool._json_schema_to_pydantic("search", schema)
assert "query" in model.model_fields
assert "limit" in model.model_fields
def test_json_type_to_python_string(self):
"""Test converting JSON string type to Python."""
assert MCPTool._json_type_to_python({"type": "string"}) == str
def test_json_type_to_python_integer(self):
"""Test converting JSON integer type to Python."""
assert MCPTool._json_type_to_python({"type": "integer"}) == int
def test_json_type_to_python_number(self):
"""Test converting JSON number type to Python."""
assert MCPTool._json_type_to_python({"type": "number"}) == float
def test_json_type_to_python_boolean(self):
"""Test converting JSON boolean type to Python."""
assert MCPTool._json_type_to_python({"type": "boolean"}) == bool
def test_json_type_to_python_array(self):
"""Test converting JSON array type to Python."""
assert MCPTool._json_type_to_python({"type": "array"}) == list
def test_json_type_to_python_object(self):
"""Test converting JSON object type to Python."""
assert MCPTool._json_type_to_python({"type": "object"}) == dict
def test_json_type_to_python_unknown(self):
"""Test converting unknown JSON type defaults to str."""
assert MCPTool._json_type_to_python({"type": "unknown"}) == str
class TestMCPToolExecution:
"""Tests for MCP tool execution."""
def test_tool_execution_sync(self, mock_tool_definitions):
"""Test tool execution in synchronous context."""
with patch("crewai.mcp.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.call_tool = AsyncMock(return_value="search result")
mock_client_class.return_value = mock_client
tools = MCPTool.from_server("https://api.example.com/mcp")
assert len(tools) == 2
search_tool = next(t for t in tools if "search" in t.name)
result = search_tool.run(query="test query")
assert result == "search result"
@pytest.mark.asyncio
async def test_tool_execution_async(self, mock_tool_definitions):
"""Test tool execution in async context."""
with patch("crewai.mcp.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.call_tool = AsyncMock(return_value="search result")
mock_client_class.return_value = mock_client
tools = MCPTool.from_server("https://api.example.com/mcp")
assert len(tools) == 2
search_tool = next(t for t in tools if "search" in t.name)
result = search_tool.run(query="test query")
assert result == "search result"
class TestMCPToolIntegrationWithAgent:
"""Tests for MCPTool integration with Agent."""
def test_mcp_tool_with_agent(self, mock_tool_definitions):
"""Test using MCPTool.from_server() tools with an Agent."""
from crewai.agent.core import Agent
with patch("crewai.mcp.MCPClient") as mock_client_class:
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = MCPTool.from_server("https://api.example.com/mcp")
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=tools,
)
assert len(agent.tools) == 2
assert all(isinstance(tool, BaseTool) for tool in agent.tools)

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.7.2"
__version__ = "1.8.0"