Compare commits

..

2 Commits

Author SHA1 Message Date
github-actions[bot]
959534d506 chore: update tool specifications 2026-03-10 10:52:27 +00:00
Devin AI
3148a75684 fix: ignore extra fields (security_context) in MCP tool schemas
Fixes #4796

CrewAI's tool execution framework injects a security_context parameter
into tool arguments via _add_fingerprint_metadata(). MCP tools created
via create_model_from_schema() defaulted to ConfigDict(extra='forbid'),
causing Pydantic validation to fail with 'Extra inputs are not permitted'.

This change passes ConfigDict(extra='ignore') when creating Pydantic
models for MCP tool schemas, so that extra fields like security_context
are silently dropped during validation instead of raising errors.

Changes:
- lib/crewai-tools: MCPServerAdapter's CrewAIToolAdapter now creates
  args models with extra='ignore'
- lib/crewai: Native MCP tool resolver also uses extra='ignore'
- Added regression tests for both the adapter and schema utils

Co-Authored-By: João <joao@crewai.com>
2026-03-10 10:49:51 +00:00
18 changed files with 336 additions and 1420 deletions

View File

@@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, Any
from crewai.tools import BaseTool
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
from crewai.utilities.string_utils import sanitize_tool_name
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict
from crewai_tools.adapters.tool_collection import ToolCollection
@@ -51,7 +51,10 @@ try:
"""
tool_name = sanitize_tool_name(mcp_tool.name)
tool_description = mcp_tool.description or ""
args_model = create_model_from_schema(mcp_tool.inputSchema)
args_model = create_model_from_schema(
mcp_tool.inputSchema,
__config__=ConfigDict(extra="ignore"),
)
class CrewAIMCPTool(BaseTool):
name: str = tool_name

View File

@@ -221,6 +221,74 @@ def test_connect_timeout_with_filtered_tools(echo_server_script):
assert tools[0].run(text="timeout test") == "Echo: timeout test"
def test_mcp_tool_ignores_security_context(echo_server_script):
"""Test that MCP tools ignore extra fields like security_context.
This is a regression test for https://github.com/crewAIInc/crewAI/issues/4796.
CrewAI's tool execution framework injects a `security_context` parameter
(containing agent_fingerprint and metadata) into tool calls. MCP tools must
ignore this extra field instead of raising a Pydantic validation error.
"""
serverparams = StdioServerParameters(
command="uv", args=["run", "python", "-c", echo_server_script]
)
with MCPServerAdapter(serverparams) as tools:
echo_tool = tools[0]
# Simulate what CrewAI's tool_usage._add_fingerprint_metadata does:
# it adds security_context to the tool arguments before invocation.
result = echo_tool.run(
text="hello",
security_context={
"agent_fingerprint": {
"uuid_str": "test-uuid-12345",
"created_at": "2026-01-01T00:00:00",
},
"metadata": {},
},
)
assert result == "Echo: hello"
def test_mcp_tool_ignores_security_context_calc(echo_server_script):
"""Test that security_context is ignored for tools with multiple args."""
serverparams = StdioServerParameters(
command="uv", args=["run", "python", "-c", echo_server_script]
)
with MCPServerAdapter(serverparams) as tools:
calc_tool = tools[1]
result = calc_tool.run(
a=10,
b=20,
security_context={
"agent_fingerprint": {
"uuid_str": "test-uuid-67890",
"created_at": "2026-01-01T00:00:00",
},
"task_fingerprint": {
"uuid_str": "task-uuid-11111",
"created_at": "2026-01-01T00:00:00",
},
"metadata": {},
},
)
assert result == "30"
def test_mcp_tool_args_schema_allows_extra_fields(echo_server_script):
"""Test that the args_schema on MCP tools is configured to ignore extra fields."""
serverparams = StdioServerParameters(
command="uv", args=["run", "python", "-c", echo_server_script]
)
with MCPServerAdapter(serverparams) as tools:
echo_tool = tools[0]
# Validate that the schema accepts and ignores extra fields
schema = echo_tool.args_schema
validated = schema.model_validate(
{"text": "test", "security_context": {"agent_fingerprint": "abc"}}
)
assert validated.model_dump() == {"text": "test"}
@patch("crewai_tools.adapters.mcp_adapter.MCPAdapt")
def test_connect_timeout_passed_to_mcpadapt(mock_mcpadapt):
mock_adapter_instance = MagicMock()

View File

@@ -5664,10 +5664,6 @@
"title": "Bucket Name",
"type": "string"
},
"cluster": {
"description": "An instance of the Couchbase Cluster connected to the desired Couchbase server.",
"title": "Cluster"
},
"collection_name": {
"description": "The name of the Couchbase collection to search",
"title": "Collection Name",
@@ -5716,7 +5712,6 @@
}
},
"required": [
"cluster",
"collection_name",
"scope_name",
"bucket_name",
@@ -14460,13 +14455,9 @@
"properties": {
"config": {
"$ref": "#/$defs/OxylabsAmazonProductScraperConfig"
},
"oxylabs_api": {
"title": "Oxylabs Api"
}
},
"required": [
"oxylabs_api",
"config"
],
"title": "OxylabsAmazonProductScraperTool",
@@ -14689,13 +14680,9 @@
"properties": {
"config": {
"$ref": "#/$defs/OxylabsAmazonSearchScraperConfig"
},
"oxylabs_api": {
"title": "Oxylabs Api"
}
},
"required": [
"oxylabs_api",
"config"
],
"title": "OxylabsAmazonSearchScraperTool",
@@ -14931,13 +14918,9 @@
"properties": {
"config": {
"$ref": "#/$defs/OxylabsGoogleSearchScraperConfig"
},
"oxylabs_api": {
"title": "Oxylabs Api"
}
},
"required": [
"oxylabs_api",
"config"
],
"title": "OxylabsGoogleSearchScraperTool",
@@ -15121,13 +15104,9 @@
"properties": {
"config": {
"$ref": "#/$defs/OxylabsUniversalScraperConfig"
},
"oxylabs_api": {
"title": "Oxylabs Api"
}
},
"required": [
"oxylabs_api",
"config"
],
"title": "OxylabsUniversalScraperTool",
@@ -23229,26 +23208,6 @@
"description": "The Tavily API key. If not provided, it will be loaded from the environment variable TAVILY_API_KEY.",
"title": "Api Key"
},
"async_client": {
"anyOf": [
{},
{
"type": "null"
}
],
"default": null,
"title": "Async Client"
},
"client": {
"anyOf": [
{},
{
"type": "null"
}
],
"default": null,
"title": "Client"
},
"extract_depth": {
"default": "basic",
"description": "The depth of extraction. 'basic' for basic extraction, 'advanced' for advanced extraction.",
@@ -23384,26 +23343,6 @@
"description": "The Tavily API key. If not provided, it will be loaded from the environment variable TAVILY_API_KEY.",
"title": "Api Key"
},
"async_client": {
"anyOf": [
{},
{
"type": "null"
}
],
"default": null,
"title": "Async Client"
},
"client": {
"anyOf": [
{},
{
"type": "null"
}
],
"default": null,
"title": "Client"
},
"days": {
"default": 7,
"description": "The number of days to search back.",

View File

@@ -38,7 +38,7 @@ from crewai.utilities.string_utils import interpolate_only
_SLUG_RE: Final[re.Pattern[str]] = re.compile(
r"^(?:crewai-amp:)?[a-zA-Z0-9][a-zA-Z0-9_-]*(?:#[\w-]+)?$"
r"^(?:crewai-amp:)?[a-zA-Z0-9][a-zA-Z0-9_-]*(?:#\w+)?$"
)

View File

@@ -408,7 +408,7 @@ def human_feedback(
emit=list(emit) if emit else None,
default_outcome=default_outcome,
metadata=metadata or {},
llm=llm if isinstance(llm, str) else getattr(llm, "model", None),
llm=llm if isinstance(llm, str) else None,
)
# Determine effective provider:

View File

@@ -22,12 +22,7 @@ if TYPE_CHECKING:
try:
from anthropic import Anthropic, AsyncAnthropic, transform_schema
from anthropic.types import (
Message,
TextBlock,
ThinkingBlock,
ToolUseBlock,
)
from anthropic.types import Message, TextBlock, ThinkingBlock, ToolUseBlock
from anthropic.types.beta import BetaMessage, BetaTextBlock, BetaToolUseBlock
import httpx
except ImportError:
@@ -36,11 +31,6 @@ except ImportError:
) from None
TOOL_SEARCH_TOOL_TYPES: Final[tuple[str, ...]] = (
"tool_search_tool_regex_20251119",
"tool_search_tool_bm25_20251119",
)
ANTHROPIC_FILES_API_BETA: Final = "files-api-2025-04-14"
ANTHROPIC_STRUCTURED_OUTPUTS_BETA: Final = "structured-outputs-2025-11-13"
@@ -127,22 +117,6 @@ class AnthropicThinkingConfig(BaseModel):
budget_tokens: int | None = None
class AnthropicToolSearchConfig(BaseModel):
"""Configuration for Anthropic's server-side tool search.
When enabled, tools marked with defer_loading=True are not loaded into
context immediately. Instead, Claude uses the tool search tool to
dynamically discover and load relevant tools on-demand.
Attributes:
type: The tool search variant to use.
- "regex": Claude constructs regex patterns to search tool names/descriptions.
- "bm25": Claude uses natural language queries to search tools.
"""
type: Literal["regex", "bm25"] = "bm25"
class AnthropicCompletion(BaseLLM):
"""Anthropic native completion implementation.
@@ -166,7 +140,6 @@ class AnthropicCompletion(BaseLLM):
interceptor: BaseInterceptor[httpx.Request, httpx.Response] | None = None,
thinking: AnthropicThinkingConfig | None = None,
response_format: type[BaseModel] | None = None,
tool_search: AnthropicToolSearchConfig | bool | None = None,
**kwargs: Any,
):
"""Initialize Anthropic chat completion client.
@@ -186,10 +159,6 @@ class AnthropicCompletion(BaseLLM):
interceptor: HTTP interceptor for modifying requests/responses at transport level.
response_format: Pydantic model for structured output. When provided, responses
will be validated against this model schema.
tool_search: Enable Anthropic's server-side tool search. When True, uses "bm25"
variant by default. Pass an AnthropicToolSearchConfig to choose "regex" or
"bm25". When enabled, tools are automatically marked with defer_loading=True
and a tool search tool is injected into the tools list.
**kwargs: Additional parameters
"""
super().__init__(
@@ -221,13 +190,6 @@ class AnthropicCompletion(BaseLLM):
self.thinking = thinking
self.previous_thinking_blocks: list[ThinkingBlock] = []
self.response_format = response_format
# Tool search config
if tool_search is True:
self.tool_search = AnthropicToolSearchConfig()
elif isinstance(tool_search, AnthropicToolSearchConfig):
self.tool_search = tool_search
else:
self.tool_search = None
# Model-specific settings
self.is_claude_3 = "claude-3" in model.lower()
self.supports_tools = True
@@ -470,23 +432,10 @@ class AnthropicCompletion(BaseLLM):
# Handle tools for Claude 3+
if tools and self.supports_tools:
converted_tools = self._convert_tools_for_interference(tools)
# When tool_search is enabled and there are 2+ regular tools,
# inject the search tool and mark regular tools with defer_loading.
# With only 1 tool there's nothing to search — skip tool search
# entirely so the normal forced tool_choice optimisation still works.
regular_tools = [
t
for t in converted_tools
if t.get("type", "") not in TOOL_SEARCH_TOOL_TYPES
]
if self.tool_search is not None and len(regular_tools) >= 2:
converted_tools = self._apply_tool_search(converted_tools)
params["tools"] = converted_tools
if available_functions and len(regular_tools) == 1:
tool_name = regular_tools[0].get("name")
if available_functions and len(converted_tools) == 1:
tool_name = converted_tools[0].get("name")
if tool_name and tool_name in available_functions:
params["tool_choice"] = {"type": "tool", "name": tool_name}
@@ -505,12 +454,6 @@ class AnthropicCompletion(BaseLLM):
anthropic_tools = []
for tool in tools:
# Pass through tool search tool definitions unchanged
tool_type = tool.get("type", "")
if tool_type in TOOL_SEARCH_TOOL_TYPES:
anthropic_tools.append(tool)
continue
if "input_schema" in tool and "name" in tool and "description" in tool:
anthropic_tools.append(tool)
continue
@@ -523,15 +466,15 @@ class AnthropicCompletion(BaseLLM):
logging.error(f"Error converting tool to Anthropic format: {e}")
raise e
anthropic_tool: dict[str, Any] = {
anthropic_tool = {
"name": name,
"description": description,
}
if parameters and isinstance(parameters, dict):
anthropic_tool["input_schema"] = parameters
anthropic_tool["input_schema"] = parameters # type: ignore[assignment]
else:
anthropic_tool["input_schema"] = {
anthropic_tool["input_schema"] = { # type: ignore[assignment]
"type": "object",
"properties": {},
"required": [],
@@ -541,55 +484,6 @@ class AnthropicCompletion(BaseLLM):
return anthropic_tools
def _apply_tool_search(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Inject tool search tool and mark regular tools with defer_loading.
When tool_search is enabled, this method:
1. Adds the appropriate tool search tool definition (regex or bm25)
2. Marks all regular tools with defer_loading=True so they are only
loaded when Claude discovers them via search
Args:
tools: Converted tool definitions in Anthropic format.
Returns:
Updated tools list with tool search tool prepended and
regular tools marked as deferred.
"""
if self.tool_search is None:
return tools
# Check if a tool search tool is already present (user passed one manually)
has_search_tool = any(
t.get("type", "") in TOOL_SEARCH_TOOL_TYPES for t in tools
)
result: list[dict[str, Any]] = []
if not has_search_tool:
# Map config type to API type identifier
type_map = {
"regex": "tool_search_tool_regex_20251119",
"bm25": "tool_search_tool_bm25_20251119",
}
tool_type = type_map[self.tool_search.type]
# Tool search tool names follow the convention: tool_search_tool_{variant}
tool_name = f"tool_search_tool_{self.tool_search.type}"
result.append({"type": tool_type, "name": tool_name})
for tool in tools:
# Don't modify tool search tools
if tool.get("type", "") in TOOL_SEARCH_TOOL_TYPES:
result.append(tool)
continue
# Mark regular tools as deferred if not already set
if "defer_loading" not in tool:
tool = {**tool, "defer_loading": True}
result.append(tool)
return result
def _extract_thinking_block(
self, content_block: Any
) -> ThinkingBlock | dict[str, Any] | None:

View File

@@ -1781,7 +1781,6 @@ class BedrockCompletion(BaseLLM):
converse_messages: list[LLMMessage] = []
system_message: str | None = None
pending_tool_results: list[dict[str, Any]] = []
for message in formatted_messages:
role = message.get("role")
@@ -1795,56 +1794,53 @@ class BedrockCompletion(BaseLLM):
system_message += f"\n\n{content}"
else:
system_message = cast(str, content)
elif role == "assistant" and tool_calls:
# Convert OpenAI-style tool_calls to Bedrock toolUse format
bedrock_content = []
for tc in tool_calls:
func = tc.get("function", {})
tool_use_block = {
"toolUse": {
"toolUseId": tc.get("id", f"call_{id(tc)}"),
"name": func.get("name", ""),
"input": func.get("arguments", {})
if isinstance(func.get("arguments"), dict)
else json.loads(func.get("arguments", "{}") or "{}"),
}
}
bedrock_content.append(tool_use_block)
converse_messages.append(
{"role": "assistant", "content": bedrock_content}
)
elif role == "tool":
if not tool_call_id:
raise ValueError("Tool message missing required tool_call_id")
pending_tool_results.append(
converse_messages.append(
{
"toolResult": {
"toolUseId": tool_call_id,
"content": [{"text": str(content) if content else ""}],
}
"role": "user",
"content": [
{
"toolResult": {
"toolUseId": tool_call_id,
"content": [
{"text": str(content) if content else ""}
],
}
}
],
}
)
else:
if pending_tool_results:
converse_messages.append(
{"role": "user", "content": pending_tool_results}
)
pending_tool_results = []
if role == "assistant" and tool_calls:
# Convert OpenAI-style tool_calls to Bedrock toolUse format
bedrock_content = []
for tc in tool_calls:
func = tc.get("function", {})
tool_use_block = {
"toolUse": {
"toolUseId": tc.get("id", f"call_{id(tc)}"),
"name": func.get("name", ""),
"input": func.get("arguments", {})
if isinstance(func.get("arguments"), dict)
else json.loads(func.get("arguments", "{}") or "{}"),
}
}
bedrock_content.append(tool_use_block)
converse_messages.append(
{"role": "assistant", "content": bedrock_content}
)
# Convert to Converse API format with proper content structure
if isinstance(content, list):
# Already formatted as multimodal content blocks
converse_messages.append({"role": role, "content": content})
else:
# Convert to Converse API format with proper content structure
if isinstance(content, list):
# Already formatted as multimodal content blocks
converse_messages.append({"role": role, "content": content})
else:
# String content - wrap in text block
text_content = content if content else ""
converse_messages.append(
{"role": role, "content": [{"text": text_content}]}
)
if pending_tool_results:
converse_messages.append({"role": "user", "content": pending_tool_results})
# String content - wrap in text block
text_content = content if content else ""
converse_messages.append(
{"role": role, "content": [{"text": text_content}]}
)
# CRITICAL: Handle model-specific conversation requirements
# Cohere and some other models require conversation to end with user message

View File

@@ -22,7 +22,6 @@ from crewai.mcp.config import (
MCPServerSSE,
MCPServerStdio,
)
from crewai.utilities.string_utils import sanitize_tool_name
from crewai.mcp.transports.http import HTTPTransport
from crewai.mcp.transports.sse import SSETransport
from crewai.mcp.transports.stdio import StdioTransport
@@ -75,9 +74,10 @@ class MCPToolResolver:
elif isinstance(mcp_config, str):
amp_refs.append(self._parse_amp_ref(mcp_config))
else:
tools, clients = self._resolve_native(mcp_config)
tools, client = self._resolve_native(mcp_config)
all_tools.extend(tools)
self._clients.extend(clients)
if client:
self._clients.append(client)
if amp_refs:
tools, clients = self._resolve_amp(amp_refs)
@@ -131,7 +131,7 @@ class MCPToolResolver:
all_tools: list[BaseTool] = []
all_clients: list[Any] = []
resolved_cache: dict[str, tuple[list[BaseTool], list[Any]]] = {}
resolved_cache: dict[str, tuple[list[BaseTool], Any | None]] = {}
for slug in unique_slugs:
config_dict = amp_configs_map.get(slug)
@@ -149,9 +149,10 @@ class MCPToolResolver:
mcp_server_config = self._build_mcp_config_from_dict(config_dict)
try:
tools, clients = self._resolve_native(mcp_server_config)
resolved_cache[slug] = (tools, clients)
all_clients.extend(clients)
tools, client = self._resolve_native(mcp_server_config)
resolved_cache[slug] = (tools, client)
if client:
all_clients.append(client)
except Exception as e:
crewai_event_bus.emit(
self,
@@ -169,9 +170,8 @@ class MCPToolResolver:
slug_tools, _ = cached
if specific_tool:
sanitized = sanitize_tool_name(specific_tool)
all_tools.extend(
t for t in slug_tools if t.name.endswith(f"_{sanitized}")
t for t in slug_tools if t.name.endswith(f"_{specific_tool}")
)
else:
all_tools.extend(slug_tools)
@@ -198,6 +198,7 @@ class MCPToolResolver:
plus_api = PlusAPI(api_key=get_platform_integration_token())
response = plus_api.get_mcp_configs(slugs)
if response.status_code == 200:
configs: dict[str, dict[str, Any]] = response.json().get("configs", {})
return configs
@@ -217,7 +218,6 @@ class MCPToolResolver:
def _resolve_external(self, mcp_ref: str) -> list[BaseTool]:
"""Resolve an HTTPS MCP server URL into tools."""
from crewai.tools.base_tool import BaseTool
from crewai.tools.mcp_tool_wrapper import MCPToolWrapper
if "#" in mcp_ref:
@@ -227,7 +227,6 @@ class MCPToolResolver:
server_params = {"url": server_url}
server_name = self._extract_server_name(server_url)
sanitized_specific_tool = sanitize_tool_name(specific_tool) if specific_tool else None
try:
tool_schemas = self._get_mcp_tool_schemas(server_params)
@@ -240,7 +239,7 @@ class MCPToolResolver:
tools = []
for tool_name, schema in tool_schemas.items():
if sanitized_specific_tool and tool_name != sanitized_specific_tool:
if specific_tool and tool_name != specific_tool:
continue
try:
@@ -272,16 +271,14 @@ class MCPToolResolver:
)
return []
@staticmethod
def _create_transport(
mcp_config: MCPServerConfig,
) -> tuple[StdioTransport | HTTPTransport | SSETransport, str]:
"""Create a fresh transport instance from an MCP server config.
def _resolve_native(
self, mcp_config: MCPServerConfig
) -> tuple[list[BaseTool], Any | None]:
"""Resolve an ``MCPServerConfig`` into tools, returning the client for cleanup."""
from crewai.tools.base_tool import BaseTool
from crewai.tools.mcp_native_tool import MCPNativeTool
Returns a ``(transport, server_name)`` tuple. Each call produces an
independent transport so that parallel tool executions never share
state.
"""
transport: StdioTransport | HTTPTransport | SSETransport
if isinstance(mcp_config, MCPServerStdio):
transport = StdioTransport(
command=mcp_config.command,
@@ -295,54 +292,38 @@ class MCPToolResolver:
headers=mcp_config.headers,
streamable=mcp_config.streamable,
)
server_name = MCPToolResolver._extract_server_name(mcp_config.url)
server_name = self._extract_server_name(mcp_config.url)
elif isinstance(mcp_config, MCPServerSSE):
transport = SSETransport(
url=mcp_config.url,
headers=mcp_config.headers,
)
server_name = MCPToolResolver._extract_server_name(mcp_config.url)
server_name = self._extract_server_name(mcp_config.url)
else:
raise ValueError(f"Unsupported MCP server config type: {type(mcp_config)}")
return transport, server_name
def _resolve_native(
self, mcp_config: MCPServerConfig
) -> tuple[list[BaseTool], list[Any]]:
"""Resolve an ``MCPServerConfig`` into tools.
Returns ``(tools, clients)`` where *clients* is always empty for
native tools (clients are now created on-demand per invocation).
A ``client_factory`` closure is passed to each ``MCPNativeTool`` so
every call -- even concurrent calls to the *same* tool -- gets its
own ``MCPClient`` + transport with no shared mutable state.
"""
from crewai.tools.base_tool import BaseTool
from crewai.tools.mcp_native_tool import MCPNativeTool
discovery_transport, server_name = self._create_transport(mcp_config)
discovery_client = MCPClient(
transport=discovery_transport,
client = MCPClient(
transport=transport,
cache_tools_list=mcp_config.cache_tools_list,
)
async def _setup_client_and_list_tools() -> list[dict[str, Any]]:
try:
if not discovery_client.connected:
await discovery_client.connect()
if not client.connected:
await client.connect()
tools_list = await discovery_client.list_tools()
tools_list = await client.list_tools()
try:
await discovery_client.disconnect()
await client.disconnect()
await asyncio.sleep(0.1)
except Exception as e:
self._logger.log("error", f"Error during disconnect: {e}")
return tools_list
except Exception as e:
if discovery_client.connected:
await discovery_client.disconnect()
if client.connected:
await client.disconnect()
await asyncio.sleep(0.1)
raise RuntimeError(
f"Error during setup client and list tools: {e}"
@@ -395,13 +376,6 @@ class MCPToolResolver:
filtered_tools.append(tool)
tools_list = filtered_tools
def _client_factory() -> MCPClient:
transport, _ = self._create_transport(mcp_config)
return MCPClient(
transport=transport,
cache_tools_list=mcp_config.cache_tools_list,
)
tools = []
for tool_def in tools_list:
tool_name = tool_def.get("name", "")
@@ -422,7 +396,7 @@ class MCPToolResolver:
try:
native_tool = MCPNativeTool(
client_factory=_client_factory,
mcp_client=client,
tool_name=tool_name,
tool_schema=tool_schema,
server_name=server_name,
@@ -433,10 +407,10 @@ class MCPToolResolver:
self._logger.log("error", f"Failed to create native MCP tool: {e}")
continue
return cast(list[BaseTool], tools), []
return cast(list[BaseTool], tools), client
except Exception as e:
if discovery_client.connected:
asyncio.run(discovery_client.disconnect())
if client.connected:
asyncio.run(client.disconnect())
raise RuntimeError(f"Failed to get native MCP tools: {e}") from e
@@ -608,6 +582,8 @@ class MCPToolResolver:
@staticmethod
def _json_schema_to_pydantic(tool_name: str, json_schema: dict[str, Any]) -> type:
"""Convert JSON Schema to a Pydantic model for tool arguments."""
from pydantic import ConfigDict
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
model_name = f"{tool_name.replace('-', '_').replace(' ', '_')}Schema"
@@ -615,4 +591,5 @@ class MCPToolResolver:
json_schema,
model_name=model_name,
enrich_descriptions=True,
__config__=ConfigDict(extra="ignore"),
)

View File

@@ -1,30 +1,29 @@
"""Native MCP tool wrapper for CrewAI agents.
This module provides a tool wrapper that creates a fresh MCP client for every
invocation, ensuring safe parallel execution even when the same tool is called
concurrently by the executor.
This module provides a tool wrapper that reuses existing MCP client sessions
for better performance and connection management.
"""
import asyncio
from collections.abc import Callable
from typing import Any
from crewai.tools import BaseTool
class MCPNativeTool(BaseTool):
"""Native MCP tool that creates a fresh client per invocation.
"""Native MCP tool that reuses client sessions.
A ``client_factory`` callable produces an independent ``MCPClient`` +
transport for every ``_run_async`` call. This guarantees that parallel
invocations -- whether of the *same* tool or *different* tools from the
same server -- never share mutable connection state (which would cause
anyio cancel-scope errors).
This tool wrapper is used when agents connect to MCP servers using
structured configurations. It reuses existing client sessions for
better performance and proper connection lifecycle management.
Unlike MCPToolWrapper which connects on-demand, this tool uses
a shared MCP client instance that maintains a persistent connection.
"""
def __init__(
self,
client_factory: Callable[[], Any],
mcp_client: Any,
tool_name: str,
tool_schema: dict[str, Any],
server_name: str,
@@ -33,16 +32,19 @@ class MCPNativeTool(BaseTool):
"""Initialize native MCP tool.
Args:
client_factory: Zero-arg callable that returns a new MCPClient.
mcp_client: MCPClient instance with active session.
tool_name: Name of the tool (may be prefixed).
tool_schema: Schema information for the tool.
server_name: Name of the MCP server for prefixing.
original_tool_name: Original name of the tool on the MCP server.
"""
# Create tool name with server prefix to avoid conflicts
prefixed_name = f"{server_name}_{tool_name}"
# Handle args_schema properly - BaseTool expects a BaseModel subclass
args_schema = tool_schema.get("args_schema")
# Only pass args_schema if it's provided
kwargs = {
"name": prefixed_name,
"description": tool_schema.get(
@@ -55,9 +57,16 @@ class MCPNativeTool(BaseTool):
super().__init__(**kwargs)
self._client_factory = client_factory
# Set instance attributes after super().__init__
self._mcp_client = mcp_client
self._original_tool_name = original_tool_name or tool_name
self._server_name = server_name
# self._logger = logging.getLogger(__name__)
@property
def mcp_client(self) -> Any:
"""Get the MCP client instance."""
return self._mcp_client
@property
def original_tool_name(self) -> str:
@@ -99,26 +108,51 @@ class MCPNativeTool(BaseTool):
async def _run_async(self, **kwargs) -> str:
"""Async implementation of tool execution.
A fresh ``MCPClient`` is created for every invocation so that
concurrent calls never share transport or session state.
Args:
**kwargs: Arguments to pass to the MCP tool.
Returns:
Result from the MCP tool execution.
"""
client = self._client_factory()
await client.connect()
# Note: Since we use asyncio.run() which creates a new event loop each time,
# Always reconnect on-demand because asyncio.run() creates new event loops per call
# All MCP transport context managers (stdio, streamablehttp_client, sse_client)
# use anyio.create_task_group() which can't span different event loops
if self._mcp_client.connected:
await self._mcp_client.disconnect()
await self._mcp_client.connect()
try:
result = await client.call_tool(self.original_tool_name, kwargs)
finally:
await client.disconnect()
result = await self._mcp_client.call_tool(self.original_tool_name, kwargs)
except Exception as e:
error_str = str(e).lower()
if (
"not connected" in error_str
or "connection" in error_str
or "send" in error_str
):
await self._mcp_client.disconnect()
await self._mcp_client.connect()
# Retry the call
result = await self._mcp_client.call_tool(
self.original_tool_name, kwargs
)
else:
raise
finally:
# Always disconnect after tool call to ensure clean context manager lifecycle
# This prevents "exit cancel scope in different task" errors
# All transport context managers must be exited in the same event loop they were entered
await self._mcp_client.disconnect()
# Extract result content
if isinstance(result, str):
return result
# Handle various result formats
if hasattr(result, "content") and result.content:
if isinstance(result.content, list) and len(result.content) > 0:
content_item = result.content[0]

View File

@@ -2353,68 +2353,3 @@ def test_agent_without_apps_no_platform_tools():
tools = crew._prepare_tools(agent, task, [])
assert tools == []
def test_agent_mcps_accepts_slug_with_specific_tool():
"""Agent(mcps=["notion#get_page"]) must pass validation (_SLUG_RE)."""
agent = Agent(
role="MCP Agent",
goal="Test MCP validation",
backstory="Test agent",
mcps=["notion#get_page"],
)
assert agent.mcps == ["notion#get_page"]
def test_agent_mcps_accepts_slug_with_hyphenated_tool():
agent = Agent(
role="MCP Agent",
goal="Test MCP validation",
backstory="Test agent",
mcps=["notion#get-page"],
)
assert agent.mcps == ["notion#get-page"]
def test_agent_mcps_accepts_multiple_hash_refs():
agent = Agent(
role="MCP Agent",
goal="Test MCP validation",
backstory="Test agent",
mcps=["notion#get_page", "notion#search", "github#list_repos"],
)
assert len(agent.mcps) == 3
def test_agent_mcps_accepts_mixed_ref_types():
agent = Agent(
role="MCP Agent",
goal="Test MCP validation",
backstory="Test agent",
mcps=[
"notion#get_page",
"notion",
"https://mcp.example.com/api",
],
)
assert len(agent.mcps) == 3
def test_agent_mcps_rejects_hash_without_slug():
with pytest.raises(ValueError, match="Invalid MCP reference"):
Agent(
role="MCP Agent",
goal="Test MCP validation",
backstory="Test agent",
mcps=["#get_page"],
)
def test_agent_mcps_accepts_legacy_prefix_with_tool():
agent = Agent(
role="MCP Agent",
goal="Test MCP validation",
backstory="Test agent",
mcps=["crewai-amp:notion#get_page"],
)
assert agent.mcps == ["crewai-amp:notion#get_page"]

View File

@@ -1,137 +0,0 @@
interactions:
- request:
body: '{"max_tokens":4096,"messages":[{"role":"user","content":"What is the weather
in Tokyo?"}],"model":"claude-sonnet-4-5","stream":false,"tools":[{"type":"tool_search_tool_bm25_20251119","name":"tool_search_tool_bm25"},{"name":"get_weather","description":"Get
current weather conditions for a specified location","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for get_weather"}},"required":["input"]},"defer_loading":true},{"name":"search_files","description":"Search
through files in the workspace by name or content","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for search_files"}},"required":["input"]},"defer_loading":true},{"name":"read_database","description":"Read
records from a database table with optional filtering","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for read_database"}},"required":["input"]},"defer_loading":true},{"name":"write_database","description":"Write
or update records in a database table","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for write_database"}},"required":["input"]},"defer_loading":true},{"name":"send_email","description":"Send
an email message to one or more recipients","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for send_email"}},"required":["input"]},"defer_loading":true},{"name":"read_email","description":"Read
emails from inbox with filtering options","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for read_email"}},"required":["input"]},"defer_loading":true},{"name":"create_ticket","description":"Create
a new support ticket in the ticketing system","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for create_ticket"}},"required":["input"]},"defer_loading":true},{"name":"update_ticket","description":"Update
an existing support ticket status or description","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for update_ticket"}},"required":["input"]},"defer_loading":true},{"name":"list_users","description":"List
all users in the system with optional filters","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for list_users"}},"required":["input"]},"defer_loading":true},{"name":"get_user_profile","description":"Get
detailed profile information for a specific user","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for get_user_profile"}},"required":["input"]},"defer_loading":true},{"name":"deploy_service","description":"Deploy
a service to the specified environment","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for deploy_service"}},"required":["input"]},"defer_loading":true},{"name":"rollback_service","description":"Rollback
a service deployment to a previous version","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for rollback_service"}},"required":["input"]},"defer_loading":true},{"name":"get_service_logs","description":"Get
service logs filtered by time range and severity","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for get_service_logs"}},"required":["input"]},"defer_loading":true},{"name":"run_sql_query","description":"Run
a read-only SQL query against the analytics database","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for run_sql_query"}},"required":["input"]},"defer_loading":true},{"name":"create_dashboard","description":"Create
a new monitoring dashboard with widgets","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for create_dashboard"}},"required":["input"]},"defer_loading":true}]}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
anthropic-version:
- '2023-06-01'
connection:
- keep-alive
content-length:
- '3952'
content-type:
- application/json
host:
- api.anthropic.com
x-api-key:
- X-API-KEY-XXX
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 0.73.0
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
x-stainless-timeout:
- NOT_GIVEN
method: POST
uri: https://api.anthropic.com/v1/messages
response:
body:
string: '{"model":"claude-sonnet-4-5-20250929","id":"msg_01DAGCoL6C12u6yAgR1UqNAs","type":"message","role":"assistant","content":[{"type":"text","text":"I''ll
search for a weather-related tool to help you get the weather information
for Tokyo."},{"type":"server_tool_use","id":"srvtoolu_0176qgHeeBpSygYAnUzKHCfh","name":"tool_search_tool_bm25","input":{"query":"weather
Tokyo current conditions forecast"},"caller":{"type":"direct"}},{"type":"tool_search_tool_result","tool_use_id":"srvtoolu_0176qgHeeBpSygYAnUzKHCfh","content":{"type":"tool_search_tool_search_result","tool_references":[{"type":"tool_reference","tool_name":"get_weather"}]}},{"type":"text","text":"Great!
I found a weather tool. Let me get the current weather conditions for Tokyo."},{"type":"tool_use","id":"toolu_01R3FavQLuTrwNvEk9gMaViK","name":"get_weather","input":{"input":"Tokyo"},"caller":{"type":"direct"}}],"stop_reason":"tool_use","stop_sequence":null,"usage":{"input_tokens":1566,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":155,"service_tier":"standard","inference_geo":"not_available","server_tool_use":{"web_search_requests":0,"web_fetch_requests":0}}}'
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Security-Policy:
- CSP-FILTERED
Content-Type:
- application/json
Date:
- Sun, 08 Mar 2026 21:04:12 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Robots-Tag:
- none
anthropic-organization-id:
- ANTHROPIC-ORGANIZATION-ID-XXX
anthropic-ratelimit-input-tokens-limit:
- ANTHROPIC-RATELIMIT-INPUT-TOKENS-LIMIT-XXX
anthropic-ratelimit-input-tokens-remaining:
- ANTHROPIC-RATELIMIT-INPUT-TOKENS-REMAINING-XXX
anthropic-ratelimit-input-tokens-reset:
- ANTHROPIC-RATELIMIT-INPUT-TOKENS-RESET-XXX
anthropic-ratelimit-output-tokens-limit:
- ANTHROPIC-RATELIMIT-OUTPUT-TOKENS-LIMIT-XXX
anthropic-ratelimit-output-tokens-remaining:
- ANTHROPIC-RATELIMIT-OUTPUT-TOKENS-REMAINING-XXX
anthropic-ratelimit-output-tokens-reset:
- ANTHROPIC-RATELIMIT-OUTPUT-TOKENS-RESET-XXX
anthropic-ratelimit-requests-limit:
- '20000'
anthropic-ratelimit-requests-remaining:
- '19999'
anthropic-ratelimit-requests-reset:
- '2026-03-08T21:04:07Z'
anthropic-ratelimit-tokens-limit:
- ANTHROPIC-RATELIMIT-TOKENS-LIMIT-XXX
anthropic-ratelimit-tokens-remaining:
- ANTHROPIC-RATELIMIT-TOKENS-REMAINING-XXX
anthropic-ratelimit-tokens-reset:
- ANTHROPIC-RATELIMIT-TOKENS-RESET-XXX
cf-cache-status:
- DYNAMIC
request-id:
- REQUEST-ID-XXX
strict-transport-security:
- STS-XXX
vary:
- Accept-Encoding
x-envoy-upstream-service-time:
- '4330'
status:
code: 200
message: OK
version: 1

View File

@@ -1,112 +0,0 @@
interactions:
- request:
body: '{"max_tokens":4096,"messages":[{"role":"user","content":"What is the weather
in Tokyo?"}],"model":"claude-sonnet-4-5","stream":false,"tools":[{"name":"get_weather","description":"Get
current weather conditions for a specified location","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for get_weather"}},"required":["input"]}},{"name":"search_files","description":"Search
through files in the workspace by name or content","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for search_files"}},"required":["input"]}},{"name":"read_database","description":"Read
records from a database table with optional filtering","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for read_database"}},"required":["input"]}},{"name":"write_database","description":"Write
or update records in a database table","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for write_database"}},"required":["input"]}},{"name":"send_email","description":"Send
an email message to one or more recipients","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for send_email"}},"required":["input"]}},{"name":"read_email","description":"Read
emails from inbox with filtering options","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for read_email"}},"required":["input"]}},{"name":"create_ticket","description":"Create
a new support ticket in the ticketing system","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for create_ticket"}},"required":["input"]}},{"name":"update_ticket","description":"Update
an existing support ticket status or description","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for update_ticket"}},"required":["input"]}},{"name":"list_users","description":"List
all users in the system with optional filters","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for list_users"}},"required":["input"]}},{"name":"get_user_profile","description":"Get
detailed profile information for a specific user","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for get_user_profile"}},"required":["input"]}},{"name":"deploy_service","description":"Deploy
a service to the specified environment","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for deploy_service"}},"required":["input"]}},{"name":"rollback_service","description":"Rollback
a service deployment to a previous version","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for rollback_service"}},"required":["input"]}},{"name":"get_service_logs","description":"Get
service logs filtered by time range and severity","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for get_service_logs"}},"required":["input"]}},{"name":"run_sql_query","description":"Run
a read-only SQL query against the analytics database","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for run_sql_query"}},"required":["input"]}},{"name":"create_dashboard","description":"Create
a new monitoring dashboard with widgets","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for create_dashboard"}},"required":["input"]}}]}'
headers:
accept:
- application/json
anthropic-version:
- '2023-06-01'
connection:
- keep-alive
content-type:
- application/json
host:
- api.anthropic.com
method: POST
uri: https://api.anthropic.com/v1/messages
response:
body:
string: '{"model":"claude-sonnet-4-5-20250929","id":"msg_01NoSearch001","type":"message","role":"assistant","content":[{"type":"tool_use","id":"toolu_01NoSearch001","name":"get_weather","input":{"input":"Tokyo"}}],"stop_reason":"tool_use","stop_sequence":null,"usage":{"input_tokens":1943,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":54,"service_tier":"standard"}}'
headers:
Content-Type:
- application/json
status:
code: 200
message: OK
- request:
body: '{"max_tokens":4096,"messages":[{"role":"user","content":"What is the weather
in Tokyo?"}],"model":"claude-sonnet-4-5","stream":false,"tools":[{"type":"tool_search_tool_bm25_20251119","name":"tool_search_tool_bm25"},{"name":"get_weather","description":"Get
current weather conditions for a specified location","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for get_weather"}},"required":["input"]},"defer_loading":true},{"name":"search_files","description":"Search
through files in the workspace by name or content","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for search_files"}},"required":["input"]},"defer_loading":true},{"name":"read_database","description":"Read
records from a database table with optional filtering","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for read_database"}},"required":["input"]},"defer_loading":true},{"name":"write_database","description":"Write
or update records in a database table","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for write_database"}},"required":["input"]},"defer_loading":true},{"name":"send_email","description":"Send
an email message to one or more recipients","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for send_email"}},"required":["input"]},"defer_loading":true},{"name":"read_email","description":"Read
emails from inbox with filtering options","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for read_email"}},"required":["input"]},"defer_loading":true},{"name":"create_ticket","description":"Create
a new support ticket in the ticketing system","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for create_ticket"}},"required":["input"]},"defer_loading":true},{"name":"update_ticket","description":"Update
an existing support ticket status or description","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for update_ticket"}},"required":["input"]},"defer_loading":true},{"name":"list_users","description":"List
all users in the system with optional filters","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for list_users"}},"required":["input"]},"defer_loading":true},{"name":"get_user_profile","description":"Get
detailed profile information for a specific user","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for get_user_profile"}},"required":["input"]},"defer_loading":true},{"name":"deploy_service","description":"Deploy
a service to the specified environment","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for deploy_service"}},"required":["input"]},"defer_loading":true},{"name":"rollback_service","description":"Rollback
a service deployment to a previous version","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for rollback_service"}},"required":["input"]},"defer_loading":true},{"name":"get_service_logs","description":"Get
service logs filtered by time range and severity","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for get_service_logs"}},"required":["input"]},"defer_loading":true},{"name":"run_sql_query","description":"Run
a read-only SQL query against the analytics database","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for run_sql_query"}},"required":["input"]},"defer_loading":true},{"name":"create_dashboard","description":"Create
a new monitoring dashboard with widgets","input_schema":{"type":"object","properties":{"input":{"type":"string","description":"Input
for create_dashboard"}},"required":["input"]},"defer_loading":true}]}'
headers:
accept:
- application/json
anthropic-version:
- '2023-06-01'
connection:
- keep-alive
content-type:
- application/json
host:
- api.anthropic.com
method: POST
uri: https://api.anthropic.com/v1/messages
response:
body:
string: '{"model":"claude-sonnet-4-5-20250929","id":"msg_01WithSearch001","type":"message","role":"assistant","content":[{"type":"text","text":"I''ll search for a weather tool."},{"type":"server_tool_use","id":"srvtoolu_01Search001","name":"tool_search_tool_bm25","input":{"query":"weather conditions"},"caller":{"type":"direct"}},{"type":"tool_search_tool_result","tool_use_id":"srvtoolu_01Search001","content":{"type":"tool_search_tool_search_result","tool_references":[{"type":"tool_reference","tool_name":"get_weather"}]}},{"type":"text","text":"Found it. Let me get the weather for Tokyo."},{"type":"tool_use","id":"toolu_01WithSearch001","name":"get_weather","input":{"input":"Tokyo"},"caller":{"type":"direct"}}],"stop_reason":"tool_use","stop_sequence":null,"usage":{"input_tokens":1566,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":155,"service_tier":"standard"}}'
headers:
Content-Type:
- application/json
status:
code: 200
message: OK
version: 1

View File

@@ -1121,345 +1121,3 @@ def test_anthropic_cached_prompt_tokens_with_tools():
assert usage.successful_requests == 2
# The second call should have cached prompt tokens
assert usage.cached_prompt_tokens > 0
# ---- Tool Search Tool Tests ----
def test_tool_search_true_injects_bm25_and_defer_loading():
"""tool_search=True should inject bm25 tool search and defer all tools."""
llm = LLM(model="anthropic/claude-sonnet-4-5", tool_search=True)
crewai_tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a location",
"parameters": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location"],
},
},
},
{
"type": "function",
"function": {
"name": "calculator",
"description": "Perform math calculations",
"parameters": {
"type": "object",
"properties": {"expression": {"type": "string"}},
"required": ["expression"],
},
},
},
]
formatted_messages, system_message = llm._format_messages_for_anthropic(
[{"role": "user", "content": "Hello"}]
)
params = llm._prepare_completion_params(
formatted_messages, system_message, crewai_tools
)
tools = params["tools"]
# Should have 3 tools: tool_search + 2 regular
assert len(tools) == 3
# First tool should be the bm25 tool search tool
assert tools[0]["type"] == "tool_search_tool_bm25_20251119"
assert tools[0]["name"] == "tool_search_tool_bm25"
assert "input_schema" not in tools[0]
# All regular tools should have defer_loading=True
for t in tools[1:]:
assert t.get("defer_loading") is True, f"Tool {t['name']} missing defer_loading"
def test_tool_search_regex_config():
"""tool_search with regex config should use regex variant."""
from crewai.llms.providers.anthropic.completion import AnthropicToolSearchConfig
config = AnthropicToolSearchConfig(type="regex")
llm = LLM(model="anthropic/claude-sonnet-4-5", tool_search=config)
crewai_tools = [
{
"type": "function",
"function": {
"name": "tool_a",
"description": "First tool",
"parameters": {
"type": "object",
"properties": {"q": {"type": "string"}},
"required": ["q"],
},
},
},
{
"type": "function",
"function": {
"name": "tool_b",
"description": "Second tool",
"parameters": {
"type": "object",
"properties": {"q": {"type": "string"}},
"required": ["q"],
},
},
},
]
formatted_messages, system_message = llm._format_messages_for_anthropic(
[{"role": "user", "content": "Hello"}]
)
params = llm._prepare_completion_params(
formatted_messages, system_message, crewai_tools
)
tools = params["tools"]
assert tools[0]["type"] == "tool_search_tool_regex_20251119"
assert tools[0]["name"] == "tool_search_tool_regex"
def test_tool_search_disabled_by_default():
"""tool_search=None (default) should NOT inject anything."""
llm = LLM(model="anthropic/claude-sonnet-4-5")
crewai_tools = [
{
"type": "function",
"function": {
"name": "test_tool",
"description": "A test tool",
"parameters": {
"type": "object",
"properties": {"q": {"type": "string"}},
"required": ["q"],
},
},
},
]
formatted_messages, system_message = llm._format_messages_for_anthropic(
[{"role": "user", "content": "Hello"}]
)
params = llm._prepare_completion_params(
formatted_messages, system_message, crewai_tools
)
tools = params["tools"]
assert len(tools) == 1
for t in tools:
assert t.get("type", "") not in (
"tool_search_tool_bm25_20251119",
"tool_search_tool_regex_20251119",
)
assert "defer_loading" not in t
def test_tool_search_no_duplicate_when_manually_provided():
"""If user passes a tool search tool manually, don't inject a duplicate."""
llm = LLM(model="anthropic/claude-sonnet-4-5", tool_search=True)
# User manually includes a tool search tool
tools_with_search = [
{"type": "tool_search_tool_regex_20251119", "name": "tool_search_tool_regex"},
{
"type": "function",
"function": {
"name": "test_tool",
"description": "A test tool",
"parameters": {
"type": "object",
"properties": {"q": {"type": "string"}},
"required": ["q"],
},
},
},
]
formatted_messages, system_message = llm._format_messages_for_anthropic(
[{"role": "user", "content": "Hello"}]
)
params = llm._prepare_completion_params(
formatted_messages, system_message, tools_with_search
)
tools = params["tools"]
search_tools = [
t for t in tools
if t.get("type", "").startswith("tool_search_tool")
]
# Should only have 1 tool search tool (the user's manual one)
assert len(search_tools) == 1
assert search_tools[0]["type"] == "tool_search_tool_regex_20251119"
def test_tool_search_passthrough_preserves_tool_search_type():
"""_convert_tools_for_interference should pass through tool search tools unchanged."""
llm = LLM(model="anthropic/claude-sonnet-4-5")
tools = [
{"type": "tool_search_tool_regex_20251119", "name": "tool_search_tool_regex"},
{
"name": "get_weather",
"description": "Get weather",
"input_schema": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location"],
},
},
]
converted = llm._convert_tools_for_interference(tools)
assert len(converted) == 2
# Tool search tool should be passed through exactly
assert converted[0] == {
"type": "tool_search_tool_regex_20251119",
"name": "tool_search_tool_regex",
}
# Regular tool should be preserved
assert converted[1]["name"] == "get_weather"
assert "input_schema" in converted[1]
def test_tool_search_single_tool_skips_search_and_forces_choice():
"""With only 1 tool, tool_search is skipped (nothing to search) and the
normal forced tool_choice optimisation still applies."""
llm = LLM(model="anthropic/claude-sonnet-4-5", tool_search=True)
crewai_tools = [
{
"type": "function",
"function": {
"name": "test_tool",
"description": "A test tool",
"parameters": {
"type": "object",
"properties": {"q": {"type": "string"}},
"required": ["q"],
},
},
},
]
formatted_messages, system_message = llm._format_messages_for_anthropic(
[{"role": "user", "content": "Hello"}]
)
params = llm._prepare_completion_params(
formatted_messages,
system_message,
crewai_tools,
available_functions={"test_tool": lambda q: "result"},
)
# Single tool — tool_search skipped, tool_choice forced as normal
assert "tool_choice" in params
assert params["tool_choice"]["name"] == "test_tool"
# No tool search tool should be injected
tool_types = [t.get("type", "") for t in params["tools"]]
for ts_type in ("tool_search_tool_bm25_20251119", "tool_search_tool_regex_20251119"):
assert ts_type not in tool_types
# No defer_loading on the single tool
assert "defer_loading" not in params["tools"][0]
def test_tool_search_via_llm_class():
"""Verify tool_search param passes through LLM class correctly."""
from crewai.llms.providers.anthropic.completion import (
AnthropicCompletion,
AnthropicToolSearchConfig,
)
# Test with True
llm = LLM(model="anthropic/claude-sonnet-4-5", tool_search=True)
assert isinstance(llm, AnthropicCompletion)
assert llm.tool_search is not None
assert llm.tool_search.type == "bm25"
# Test with config
llm2 = LLM(
model="anthropic/claude-sonnet-4-5",
tool_search=AnthropicToolSearchConfig(type="regex"),
)
assert llm2.tool_search is not None
assert llm2.tool_search.type == "regex"
# Test without (default)
llm3 = LLM(model="anthropic/claude-sonnet-4-5")
assert llm3.tool_search is None
# Many tools shared by the VCR tests below
_MANY_TOOLS = [
{
"name": name,
"description": desc,
"input_schema": {
"type": "object",
"properties": {"input": {"type": "string", "description": f"Input for {name}"}},
"required": ["input"],
},
}
for name, desc in [
("get_weather", "Get current weather conditions for a specified location"),
("search_files", "Search through files in the workspace by name or content"),
("read_database", "Read records from a database table with optional filtering"),
("write_database", "Write or update records in a database table"),
("send_email", "Send an email message to one or more recipients"),
("read_email", "Read emails from inbox with filtering options"),
("create_ticket", "Create a new support ticket in the ticketing system"),
("update_ticket", "Update an existing support ticket status or description"),
("list_users", "List all users in the system with optional filters"),
("get_user_profile", "Get detailed profile information for a specific user"),
("deploy_service", "Deploy a service to the specified environment"),
("rollback_service", "Rollback a service deployment to a previous version"),
("get_service_logs", "Get service logs filtered by time range and severity"),
("run_sql_query", "Run a read-only SQL query against the analytics database"),
("create_dashboard", "Create a new monitoring dashboard with widgets"),
]
]
@pytest.mark.vcr()
def test_tool_search_discovers_and_calls_tool():
"""Tool search should discover the right tool and return a tool_use block."""
llm = LLM(model="anthropic/claude-sonnet-4-5", tool_search=True)
result = llm.call(
"What is the weather in Tokyo?",
tools=_MANY_TOOLS,
)
# Should return tool_use blocks (list) since no available_functions provided
assert isinstance(result, list)
assert len(result) >= 1
# The discovered tool should be get_weather
tool_names = [getattr(block, "name", None) for block in result]
assert "get_weather" in tool_names
@pytest.mark.vcr()
def test_tool_search_saves_input_tokens():
"""Tool search with deferred loading should use fewer input tokens than loading all tools."""
# Call WITHOUT tool search — all 15 tools loaded upfront
llm_no_search = LLM(model="anthropic/claude-sonnet-4-5")
llm_no_search.call("What is the weather in Tokyo?", tools=_MANY_TOOLS)
usage_no_search = llm_no_search.get_token_usage_summary()
# Call WITH tool search — tools deferred
llm_search = LLM(model="anthropic/claude-sonnet-4-5", tool_search=True)
llm_search.call("What is the weather in Tokyo?", tools=_MANY_TOOLS)
usage_search = llm_search.get_token_usage_summary()
# Tool search should use fewer input tokens
assert usage_search.prompt_tokens < usage_no_search.prompt_tokens, (
f"Expected tool_search ({usage_search.prompt_tokens}) to use fewer input tokens "
f"than no search ({usage_no_search.prompt_tokens})"
)

View File

@@ -967,70 +967,3 @@ def test_bedrock_agent_kickoff_structured_output_with_tools():
assert result.pydantic.result == 42, f"Expected result 42 but got {result.pydantic.result}"
assert result.pydantic.operation, "Operation should not be empty"
assert result.pydantic.explanation, "Explanation should not be empty"
def test_bedrock_groups_three_tool_results():
"""Consecutive tool results should be grouped into one Bedrock user message."""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
test_messages = [
{"role": "user", "content": "Use all three tools, then continue."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "tool-1",
"type": "function",
"function": {
"name": "lookup_weather",
"arguments": '{"location": "New York"}',
},
},
{
"id": "tool-2",
"type": "function",
"function": {
"name": "lookup_news",
"arguments": '{"topic": "AI"}',
},
},
{
"id": "tool-3",
"type": "function",
"function": {
"name": "lookup_stock",
"arguments": '{"ticker": "AMZN"}',
},
},
],
},
{"role": "tool", "tool_call_id": "tool-1", "content": "72F and sunny"},
{"role": "tool", "tool_call_id": "tool-2", "content": "AI news summary"},
{"role": "tool", "tool_call_id": "tool-3", "content": "AMZN up 1.2%"},
]
formatted_messages, system_message = llm._format_messages_for_converse(
test_messages
)
assert system_message is None
assert [message["role"] for message in formatted_messages] == [
"user",
"assistant",
"user",
]
assert len(formatted_messages[1]["content"]) == 3
tool_results = formatted_messages[2]["content"]
assert len(tool_results) == 3
assert [block["toolResult"]["toolUseId"] for block in tool_results] == [
"tool-1",
"tool-2",
"tool-3",
]
assert [block["toolResult"]["content"][0]["text"] for block in tool_results] == [
"72F and sunny",
"AI news summary",
"AMZN up 1.2%",
]

View File

@@ -268,54 +268,6 @@ class TestGetMCPToolsAmpIntegration:
assert len(tools) == 1
assert tools[0].name == "mcp_notion_so_sse_search"
@patch("crewai.mcp.tool_resolver.MCPClient")
@patch.object(MCPToolResolver, "_fetch_amp_mcp_configs")
def test_tool_filter_with_hyphenated_hash_syntax(
self, mock_fetch, mock_client_class, agent
):
"""notion#get-page must match the tool whose sanitized name is get_page."""
mock_fetch.return_value = {
"notion": {
"type": "sse",
"url": "https://mcp.notion.so/sse",
"headers": {"Authorization": "Bearer token"},
},
}
hyphenated_tool_definitions = [
{
"name": "get_page",
"original_name": "get-page",
"description": "Get a page",
"inputSchema": {},
},
{
"name": "search",
"original_name": "search",
"description": "Search tool",
"inputSchema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"],
},
},
]
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=hyphenated_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = agent.get_mcp_tools(["notion#get-page"])
mock_fetch.assert_called_once_with(["notion"])
assert len(tools) == 1
assert tools[0].name.endswith("_get_page")
@patch("crewai.mcp.tool_resolver.MCPClient")
@patch.object(MCPToolResolver, "_fetch_amp_mcp_configs")
def test_deduplicates_slugs(
@@ -419,87 +371,3 @@ class TestGetMCPToolsAmpIntegration:
mock_external.assert_called_once_with("https://external.mcp.com/api")
# 2 from notion + 1 from external + 2 from http_config
assert len(tools) == 5
class TestResolveExternalToolFilter:
"""Tests for _resolve_external with #tool-name filtering."""
@pytest.fixture
def agent(self):
return Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
)
@pytest.fixture
def resolver(self, agent):
return MCPToolResolver(agent=agent, logger=agent._logger)
@patch.object(MCPToolResolver, "_get_mcp_tool_schemas")
def test_filters_hyphenated_tool_name(self, mock_schemas, resolver):
"""https://...#get-page must match the sanitized key get_page in schemas."""
mock_schemas.return_value = {
"get_page": {
"description": "Get a page",
"args_schema": None,
},
"search": {
"description": "Search tool",
"args_schema": None,
},
}
tools = resolver._resolve_external("https://mcp.example.com/api#get-page")
assert len(tools) == 1
assert "get_page" in tools[0].name
@patch.object(MCPToolResolver, "_get_mcp_tool_schemas")
def test_filters_underscored_tool_name(self, mock_schemas, resolver):
"""https://...#get_page must also match the sanitized key get_page."""
mock_schemas.return_value = {
"get_page": {
"description": "Get a page",
"args_schema": None,
},
"search": {
"description": "Search tool",
"args_schema": None,
},
}
tools = resolver._resolve_external("https://mcp.example.com/api#get_page")
assert len(tools) == 1
assert "get_page" in tools[0].name
@patch.object(MCPToolResolver, "_get_mcp_tool_schemas")
def test_returns_all_tools_without_hash(self, mock_schemas, resolver):
mock_schemas.return_value = {
"get_page": {
"description": "Get a page",
"args_schema": None,
},
"search": {
"description": "Search tool",
"args_schema": None,
},
}
tools = resolver._resolve_external("https://mcp.example.com/api")
assert len(tools) == 2
@patch.object(MCPToolResolver, "_get_mcp_tool_schemas")
def test_returns_empty_for_nonexistent_tool(self, mock_schemas, resolver):
mock_schemas.return_value = {
"search": {
"description": "Search tool",
"args_schema": None,
},
}
tools = resolver._resolve_external("https://mcp.example.com/api#nonexistent")
assert len(tools) == 0

View File

@@ -1,5 +1,4 @@
import asyncio
import concurrent.futures
from unittest.mock import AsyncMock, patch
import pytest
@@ -31,17 +30,6 @@ def mock_tool_definitions():
]
def _make_mock_client(tool_definitions):
"""Create a mock MCPClient that returns *tool_definitions*."""
client = AsyncMock()
client.list_tools = AsyncMock(return_value=tool_definitions)
client.connected = False
client.connect = AsyncMock()
client.disconnect = AsyncMock()
client.call_tool = AsyncMock(return_value="test result")
return client
def test_agent_with_stdio_mcp_config(mock_tool_definitions):
"""Test agent setup with MCPServerStdio configuration."""
stdio_config = MCPServerStdio(
@@ -57,8 +45,14 @@ def test_agent_with_stdio_mcp_config(mock_tool_definitions):
mcps=[stdio_config],
)
with patch("crewai.mcp.tool_resolver.MCPClient") as mock_client_class:
mock_client_class.return_value = _make_mock_client(mock_tool_definitions)
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False # Will trigger connect
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = agent.get_mcp_tools([stdio_config])
@@ -66,7 +60,8 @@ def test_agent_with_stdio_mcp_config(mock_tool_definitions):
assert all(isinstance(tool, BaseTool) for tool in tools)
mock_client_class.assert_called_once()
transport = mock_client_class.call_args.kwargs["transport"]
call_args = mock_client_class.call_args
transport = call_args.kwargs["transport"]
assert transport.command == "python"
assert transport.args == ["server.py"]
assert transport.env == {"API_KEY": "test_key"}
@@ -88,7 +83,12 @@ def test_agent_with_http_mcp_config(mock_tool_definitions):
)
with patch("crewai.mcp.tool_resolver.MCPClient") as mock_client_class:
mock_client_class.return_value = _make_mock_client(mock_tool_definitions)
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False # Will trigger connect
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = agent.get_mcp_tools([http_config])
@@ -96,7 +96,8 @@ def test_agent_with_http_mcp_config(mock_tool_definitions):
assert all(isinstance(tool, BaseTool) for tool in tools)
mock_client_class.assert_called_once()
transport = mock_client_class.call_args.kwargs["transport"]
call_args = mock_client_class.call_args
transport = call_args.kwargs["transport"]
assert transport.url == "https://api.example.com/mcp"
assert transport.headers == {"Authorization": "Bearer test_token"}
assert transport.streamable is True
@@ -117,7 +118,12 @@ def test_agent_with_sse_mcp_config(mock_tool_definitions):
)
with patch("crewai.mcp.tool_resolver.MCPClient") as mock_client_class:
mock_client_class.return_value = _make_mock_client(mock_tool_definitions)
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client_class.return_value = mock_client
tools = agent.get_mcp_tools([sse_config])
@@ -125,7 +131,8 @@ def test_agent_with_sse_mcp_config(mock_tool_definitions):
assert all(isinstance(tool, BaseTool) for tool in tools)
mock_client_class.assert_called_once()
transport = mock_client_class.call_args.kwargs["transport"]
call_args = mock_client_class.call_args
transport = call_args.kwargs["transport"]
assert transport.url == "https://api.example.com/mcp/sse"
assert transport.headers == {"Authorization": "Bearer test_token"}
@@ -135,7 +142,13 @@ def test_mcp_tool_execution_in_sync_context(mock_tool_definitions):
http_config = MCPServerHTTP(url="https://api.example.com/mcp")
with patch("crewai.mcp.tool_resolver.MCPClient") as mock_client_class:
mock_client_class.return_value = _make_mock_client(mock_tool_definitions)
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.call_tool = AsyncMock(return_value="test result")
mock_client_class.return_value = mock_client
agent = Agent(
role="Test Agent",
@@ -147,12 +160,12 @@ def test_mcp_tool_execution_in_sync_context(mock_tool_definitions):
tools = agent.get_mcp_tools([http_config])
assert len(tools) == 2
tool = tools[0]
result = tool.run(query="test query")
assert result == "test result"
# 1 discovery + 1 for the run() invocation
assert mock_client_class.call_count == 2
mock_client.call_tool.assert_called()
@pytest.mark.asyncio
@@ -161,7 +174,13 @@ async def test_mcp_tool_execution_in_async_context(mock_tool_definitions):
http_config = MCPServerHTTP(url="https://api.example.com/mcp")
with patch("crewai.mcp.tool_resolver.MCPClient") as mock_client_class:
mock_client_class.return_value = _make_mock_client(mock_tool_definitions)
mock_client = AsyncMock()
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
mock_client.connected = False
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.call_tool = AsyncMock(return_value="test result")
mock_client_class.return_value = mock_client
agent = Agent(
role="Test Agent",
@@ -173,129 +192,9 @@ async def test_mcp_tool_execution_in_async_context(mock_tool_definitions):
tools = agent.get_mcp_tools([http_config])
assert len(tools) == 2
tool = tools[0]
result = tool.run(query="test query")
assert result == "test result"
assert mock_client_class.call_count == 2
def test_each_invocation_gets_fresh_client(mock_tool_definitions):
"""Every tool.run() must create its own MCPClient (no shared state)."""
http_config = MCPServerHTTP(url="https://api.example.com/mcp")
clients_created: list = []
def _make_client(**kwargs):
client = _make_mock_client(mock_tool_definitions)
clients_created.append(client)
return client
with patch("crewai.mcp.tool_resolver.MCPClient", side_effect=_make_client):
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
mcps=[http_config],
)
tools = agent.get_mcp_tools([http_config])
assert len(tools) == 2
# 1 discovery client so far
assert len(clients_created) == 1
# Two sequential calls to the same tool must create 2 new clients
tools[0].run(query="q1")
tools[0].run(query="q2")
assert len(clients_created) == 3
assert clients_created[1] is not clients_created[2]
def test_parallel_mcp_tool_execution_same_tool(mock_tool_definitions):
"""Parallel calls to the *same* tool must not interfere."""
http_config = MCPServerHTTP(url="https://api.example.com/mcp")
call_log: list[str] = []
def _make_client(**kwargs):
client = AsyncMock()
client.list_tools = AsyncMock(return_value=mock_tool_definitions)
client.connected = False
client.connect = AsyncMock()
client.disconnect = AsyncMock()
async def _call_tool(name, args):
call_log.append(name)
await asyncio.sleep(0.05)
return f"result-{name}"
client.call_tool = AsyncMock(side_effect=_call_tool)
return client
with patch("crewai.mcp.tool_resolver.MCPClient", side_effect=_make_client):
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
mcps=[http_config],
)
tools = agent.get_mcp_tools([http_config])
assert len(tools) >= 1
tool = tools[0]
# Call the SAME tool concurrently -- the exact scenario from the bug
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool:
futures = [
pool.submit(tool.run, query="q1"),
pool.submit(tool.run, query="q2"),
]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
assert len(results) == 2
assert all("result-" in r for r in results)
assert len(call_log) == 2
def test_parallel_mcp_tool_execution_different_tools(mock_tool_definitions):
"""Parallel calls to different tools from the same server must not interfere."""
http_config = MCPServerHTTP(url="https://api.example.com/mcp")
call_log: list[str] = []
def _make_client(**kwargs):
client = AsyncMock()
client.list_tools = AsyncMock(return_value=mock_tool_definitions)
client.connected = False
client.connect = AsyncMock()
client.disconnect = AsyncMock()
async def _call_tool(name, args):
call_log.append(name)
await asyncio.sleep(0.05)
return f"result-{name}"
client.call_tool = AsyncMock(side_effect=_call_tool)
return client
with patch("crewai.mcp.tool_resolver.MCPClient", side_effect=_make_client):
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
mcps=[http_config],
)
tools = agent.get_mcp_tools([http_config])
assert len(tools) == 2
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool:
futures = [
pool.submit(tools[0].run, query="q1"),
pool.submit(tools[1].run, query="q2"),
]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
assert len(results) == 2
assert all("result-" in r for r in results)
assert len(call_log) == 2
mock_client.call_tool.assert_called()

View File

@@ -971,128 +971,6 @@ class TestCollapseToOutcomeJsonParsing:
assert mock_llm.call.call_count == 2
class TestLLMObjectPreservedInContext:
"""Tests that BaseLLM objects have their model string preserved in PendingFeedbackContext."""
@patch("crewai.flow.flow.crewai_event_bus.emit")
def test_basellm_object_model_string_survives_roundtrip(self, mock_emit: MagicMock) -> None:
"""Test that when llm is a BaseLLM object, its model string is stored in context
so that outcome collapsing works after async pause/resume.
This is the exact bug: locally the sync path keeps the LLM object in memory,
but in production the async path serializes the context and the LLM object was
discarded (stored as None), causing resume to skip classification and always
fall back to emit[0].
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
# Create a mock BaseLLM object (not a string)
mock_llm_obj = MagicMock()
mock_llm_obj.model = "gemini/gemini-2.0-flash"
class PausingProvider:
def __init__(self, persistence: SQLiteFlowPersistence):
self.persistence = persistence
self.captured_context: PendingFeedbackContext | None = None
def request_feedback(
self, context: PendingFeedbackContext, flow: Flow
) -> str:
self.captured_context = context
self.persistence.save_pending_feedback(
flow_uuid=context.flow_id,
context=context,
state_data=flow.state if isinstance(flow.state, dict) else flow.state.model_dump(),
)
raise HumanFeedbackPending(context=context)
provider = PausingProvider(persistence)
class TestFlow(Flow):
result_path: str = ""
@start()
@human_feedback(
message="Approve?",
emit=["needs_changes", "approved"],
llm=mock_llm_obj,
default_outcome="approved",
provider=provider,
)
def review(self):
return "content for review"
@listen("approved")
def handle_approved(self):
self.result_path = "approved"
return "Approved!"
@listen("needs_changes")
def handle_changes(self):
self.result_path = "needs_changes"
return "Changes needed"
# Phase 1: Start flow (should pause)
flow1 = TestFlow(persistence=persistence)
result = flow1.kickoff()
assert isinstance(result, HumanFeedbackPending)
# Verify the context stored the model STRING, not None
assert provider.captured_context is not None
assert provider.captured_context.llm == "gemini/gemini-2.0-flash"
# Verify it survives persistence roundtrip
flow_id = result.context.flow_id
loaded = persistence.load_pending_feedback(flow_id)
assert loaded is not None
_, loaded_context = loaded
assert loaded_context.llm == "gemini/gemini-2.0-flash"
# Phase 2: Resume with positive feedback - should use LLM to classify
flow2 = TestFlow.from_pending(flow_id, persistence)
assert flow2._pending_feedback_context is not None
assert flow2._pending_feedback_context.llm == "gemini/gemini-2.0-flash"
# Mock _collapse_to_outcome to verify it gets called (not skipped)
with patch.object(flow2, "_collapse_to_outcome", return_value="approved") as mock_collapse:
flow2.resume("this looks good, proceed!")
# The key assertion: _collapse_to_outcome was called (not skipped due to llm=None)
mock_collapse.assert_called_once_with(
feedback="this looks good, proceed!",
outcomes=["needs_changes", "approved"],
llm="gemini/gemini-2.0-flash",
)
assert flow2.last_human_feedback.outcome == "approved"
assert flow2.result_path == "approved"
def test_string_llm_still_works(self) -> None:
"""Test that passing llm as a string still works correctly."""
context = PendingFeedbackContext(
flow_id="str-llm-test",
flow_class="test.Flow",
method_name="review",
method_output="output",
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
serialized = context.to_dict()
restored = PendingFeedbackContext.from_dict(serialized)
assert restored.llm == "gpt-4o-mini"
def test_none_llm_when_no_model_attr(self) -> None:
"""Test that llm is None when object has no model attribute."""
mock_obj = MagicMock(spec=[]) # No attributes
# Simulate what the decorator does
llm_value = mock_obj if isinstance(mock_obj, str) else getattr(mock_obj, "model", None)
assert llm_value is None
class TestAsyncHumanFeedbackEdgeCases:
"""Edge case tests for async human feedback."""

View File

@@ -882,3 +882,86 @@ class TestEndToEndMCPSchema:
)
assert obj.filters.date_from == datetime.date(2025, 1, 1)
assert obj.filters.categories == ["news", "tech"]
class TestExtraFieldsConfig:
"""Tests for extra fields handling with ConfigDict.
Regression tests for https://github.com/crewAIInc/crewAI/issues/4796.
MCP tools need to ignore extra fields like security_context injected
by CrewAI's tool execution framework.
"""
SIMPLE_SCHEMA: dict = {
"type": "object",
"properties": {
"query": {"type": "string"},
"top_k": {
"anyOf": [{"type": "integer"}, {"type": "null"}],
"default": None,
},
},
"required": ["query"],
}
def test_default_config_forbids_extra(self) -> None:
"""By default, create_model_from_schema forbids extra fields."""
Model = create_model_from_schema(self.SIMPLE_SCHEMA)
with pytest.raises(Exception):
Model(query="test", security_context={"agent_fingerprint": "abc"})
def test_ignore_config_allows_extra(self) -> None:
"""With extra='ignore', extra fields are silently dropped."""
from pydantic import ConfigDict
Model = create_model_from_schema(
self.SIMPLE_SCHEMA,
__config__=ConfigDict(extra="ignore"),
)
obj = Model(
query="test",
security_context={
"agent_fingerprint": {
"uuid_str": "test-uuid",
"created_at": "2026-01-01T00:00:00",
},
"metadata": {},
},
)
assert obj.query == "test"
# security_context should not appear in the dumped model
assert "security_context" not in obj.model_dump()
def test_ignore_config_preserves_validation(self) -> None:
"""Extra='ignore' still validates declared fields correctly."""
from pydantic import ConfigDict
Model = create_model_from_schema(
self.SIMPLE_SCHEMA,
__config__=ConfigDict(extra="ignore"),
)
# Valid input works
obj = Model(query="test", top_k=5)
assert obj.query == "test"
assert obj.top_k == 5
# Missing required field still fails
with pytest.raises(Exception):
Model(top_k=5)
def test_ignore_config_with_mcp_schema(self) -> None:
"""Extra='ignore' works with complex MCP-like schemas."""
from pydantic import ConfigDict
Model = create_model_from_schema(
TestEndToEndMCPSchema.MCP_SCHEMA,
__config__=ConfigDict(extra="ignore"),
)
obj = Model(
query="search term",
format="json",
filters={"date_from": "2025-01-01"},
security_context={"agent_fingerprint": "test"},
)
assert obj.query == "search term"
assert "security_context" not in obj.model_dump()