Compare commits

...

3 Commits

Author SHA1 Message Date
Lucas Gomide
29be32aa8c feat: improve JSON schema handling for MCP tools
- Convert enum constraints to Literal types
- Handle format constraints (date, date-time)
- Preserve original MCP tool names for server calls
2026-02-19 15:15:50 -03:00
Lucas Gomide
ff00055e2c feat: use original tool name instead of the normalized one 2026-02-05 12:01:15 -03:00
Lucas Gomide
507aec7a48 wip 2026-02-05 11:39:55 -03:00
4 changed files with 216 additions and 89 deletions

View File

@@ -1178,6 +1178,7 @@ class Agent(BaseAgent):
tools = []
for tool_def in tools_list:
tool_name = tool_def.get("name", "")
original_tool_name = tool_def.get("original_name", tool_name)
if not tool_name:
continue
@@ -1199,6 +1200,7 @@ class Agent(BaseAgent):
tool_name=tool_name,
tool_schema=tool_schema,
server_name=server_name,
original_tool_name=original_tool_name,
)
tools.append(native_tool)
except Exception as e:
@@ -1213,26 +1215,63 @@ class Agent(BaseAgent):
raise RuntimeError(f"Failed to get native MCP tools: {e}") from e
def _get_amp_mcp_tools(self, amp_ref: str) -> list[BaseTool]:
"""Get tools from CrewAI AMP MCP marketplace."""
# Parse: "crewai-amp:mcp-name" or "crewai-amp:mcp-name#tool_name"
"""Get tools from CrewAI AMP MCP via crewai-oauth service.
Fetches MCP server configuration with tokens injected from crewai-oauth,
then uses _get_native_mcp_tools to connect and discover tools.
"""
# Parse: "crewai-amp:mcp-slug" or "crewai-amp:mcp-slug#tool_name"
amp_part = amp_ref.replace("crewai-amp:", "")
if "#" in amp_part:
mcp_name, specific_tool = amp_part.split("#", 1)
mcp_slug, specific_tool = amp_part.split("#", 1)
else:
mcp_name, specific_tool = amp_part, None
mcp_slug, specific_tool = amp_part, None
# Call AMP API to get MCP server URLs
mcp_servers = self._fetch_amp_mcp_servers(mcp_name)
# Fetch MCP config from crewai-oauth (with tokens injected)
mcp_config_dict = self._fetch_amp_mcp_config(mcp_slug)
tools = []
for server_config in mcp_servers:
server_ref = server_config["url"]
if specific_tool:
server_ref += f"#{specific_tool}"
server_tools = self._get_external_mcp_tools(server_ref)
tools.extend(server_tools)
if not mcp_config_dict:
self._logger.log(
"warning", f"Failed to fetch MCP config for '{mcp_slug}' from crewai-oauth"
)
return []
return tools
# Convert dict to MCPServerConfig (MCPServerHTTP or MCPServerSSE)
config_type = mcp_config_dict.get("type", "http")
if config_type == "sse":
mcp_config = MCPServerSSE(
url=mcp_config_dict["url"],
headers=mcp_config_dict.get("headers"),
cache_tools_list=mcp_config_dict.get("cache_tools_list", False),
)
else:
mcp_config = MCPServerHTTP(
url=mcp_config_dict["url"],
headers=mcp_config_dict.get("headers"),
streamable=mcp_config_dict.get("streamable", True),
cache_tools_list=mcp_config_dict.get("cache_tools_list", False),
)
# Apply tool filter if specific tool requested
if specific_tool:
from crewai.mcp.filters import create_static_tool_filter
mcp_config.tool_filter = create_static_tool_filter(
allowed_tool_names=[specific_tool]
)
# Use native MCP tools to connect and discover tools
try:
tools, client = self._get_native_mcp_tools(mcp_config)
if client:
self._mcp_clients.append(client)
return tools
except Exception as e:
self._logger.log(
"warning", f"Failed to get MCP tools from '{mcp_slug}': {e}"
)
return []
@staticmethod
def _extract_server_name(server_url: str) -> str:
@@ -1401,77 +1440,69 @@ class Agent(BaseAgent):
Returns:
Pydantic BaseModel class
"""
from pydantic import Field, create_model
properties = json_schema.get("properties", {})
required_fields = json_schema.get("required", [])
field_definitions: dict[str, Any] = {}
for field_name, field_schema in properties.items():
field_type = self._json_type_to_python(field_schema)
field_description = field_schema.get("description", "")
is_required = field_name in required_fields
if is_required:
field_definitions[field_name] = (
field_type,
Field(..., description=field_description),
)
else:
field_definitions[field_name] = (
field_type | None,
Field(default=None, description=field_description),
)
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
model_name = f"{tool_name.replace('-', '_').replace(' ', '_')}Schema"
return create_model(model_name, **field_definitions) # type: ignore[no-any-return]
return create_model_from_schema(
json_schema,
model_name=model_name,
enrich_descriptions=True,
)
def _json_type_to_python(self, field_schema: dict[str, Any]) -> type:
"""Convert JSON Schema type to Python type.
def _fetch_amp_mcp_config(self, mcp_slug: str) -> dict[str, Any] | None:
"""Fetch MCP server configuration from crewai-oauth service.
Returns MCPServerConfig dict with tokens injected, ready for use with
_get_native_mcp_tools.
Environment variables:
CREWAI_OAUTH_URL: Base URL of crewai-oauth service
CREWAI_OAUTH_API_KEY: API key for authenticating with crewai-oauth
Args:
field_schema: JSON Schema field definition
mcp_slug: The MCP server slug (e.g., "notion-mcp-abc123")
Returns:
Python type
Dict with type, url, headers, streamable, cache_tools_list, or None if failed.
"""
import os
json_type = field_schema.get("type")
import requests
if "anyOf" in field_schema:
types: list[type] = []
for option in field_schema["anyOf"]:
if "const" in option:
types.append(str)
else:
types.append(self._json_type_to_python(option))
unique_types = list(set(types))
if len(unique_types) > 1:
result: Any = unique_types[0]
for t in unique_types[1:]:
result = result | t
return result # type: ignore[no-any-return]
return unique_types[0]
try:
endpoint = f"http://localhost:8787/mcp/me/{mcp_slug}/config"
response = requests.get(
endpoint,
headers={"Authorization": "Bearer 6b327f9ebe62726590f8de8f624cf018ad4765fecb7373f9db475a940ad546d0"},
timeout=30,
)
type_mapping: dict[str | None, type] = {
"string": str,
"number": float,
"integer": int,
"boolean": bool,
"array": list,
"object": dict,
}
if response.status_code == 200:
return response.json()
elif response.status_code == 400:
error_data = response.json()
self._logger.log(
"warning",
f"MCP '{mcp_slug}' is not connected: {error_data.get('error_description', 'Unknown error')}",
)
return None
elif response.status_code == 404:
self._logger.log(
"warning", f"MCP server '{mcp_slug}' not found in crewai-oauth"
)
return None
else:
self._logger.log(
"warning",
f"Failed to fetch MCP config from crewai-oauth: HTTP {response.status_code}",
)
return None
return type_mapping.get(json_type, Any)
@staticmethod
def _fetch_amp_mcp_servers(mcp_name: str) -> list[dict[str, Any]]:
"""Fetch MCP server configurations from CrewAI AMP API."""
# TODO: Implement AMP API call to "integrations/mcps" endpoint
# Should return list of server configs with URLs
return []
except requests.exceptions.RequestException as e:
self._logger.log(
"warning", f"Failed to connect to crewai-oauth: {e}"
)
return None
@staticmethod
def get_multimodal_tools() -> Sequence[BaseTool]:

View File

@@ -420,6 +420,7 @@ class MCPClient:
return [
{
"name": sanitize_tool_name(tool.name),
"original_name": tool.name,
"description": getattr(tool, "description", ""),
"inputSchema": getattr(tool, "inputSchema", {}),
}

View File

@@ -27,14 +27,16 @@ class MCPNativeTool(BaseTool):
tool_name: str,
tool_schema: dict[str, Any],
server_name: str,
original_tool_name: str | None = None,
) -> None:
"""Initialize native MCP tool.
Args:
mcp_client: MCPClient instance with active session.
tool_name: Original name of the tool on the MCP server.
tool_name: Name of the tool (may be prefixed).
tool_schema: Schema information for the tool.
server_name: Name of the MCP server for prefixing.
original_tool_name: Original name of the tool on the MCP server.
"""
# Create tool name with server prefix to avoid conflicts
prefixed_name = f"{server_name}_{tool_name}"
@@ -57,7 +59,7 @@ class MCPNativeTool(BaseTool):
# Set instance attributes after super().__init__
self._mcp_client = mcp_client
self._original_tool_name = tool_name
self._original_tool_name = original_tool_name or tool_name
self._server_name = server_name
# self._logger = logging.getLogger(__name__)

View File

@@ -482,10 +482,66 @@ FORMAT_TYPE_MAP: dict[str, type[Any]] = {
}
def build_rich_field_description(prop_schema: dict[str, Any]) -> str:
"""Build a comprehensive field description including constraints.
Embeds format, enum, pattern, min/max, and example constraints into the
description text so that LLMs can understand tool parameter requirements
without inspecting the raw JSON Schema.
Args:
prop_schema: Property schema with description and constraints.
Returns:
Enhanced description with format, enum, and other constraints.
"""
parts: list[str] = []
description = prop_schema.get("description", "")
if description:
parts.append(description)
format_type = prop_schema.get("format")
if format_type:
parts.append(f"Format: {format_type}")
enum_values = prop_schema.get("enum")
if enum_values:
enum_str = ", ".join(repr(v) for v in enum_values)
parts.append(f"Allowed values: [{enum_str}]")
pattern = prop_schema.get("pattern")
if pattern:
parts.append(f"Pattern: {pattern}")
minimum = prop_schema.get("minimum")
maximum = prop_schema.get("maximum")
if minimum is not None:
parts.append(f"Minimum: {minimum}")
if maximum is not None:
parts.append(f"Maximum: {maximum}")
min_length = prop_schema.get("minLength")
max_length = prop_schema.get("maxLength")
if min_length is not None:
parts.append(f"Min length: {min_length}")
if max_length is not None:
parts.append(f"Max length: {max_length}")
examples = prop_schema.get("examples")
if examples:
examples_str = ", ".join(repr(e) for e in examples[:3])
parts.append(f"Examples: {examples_str}")
return ". ".join(parts) if parts else ""
def create_model_from_schema( # type: ignore[no-any-unimported]
json_schema: dict[str, Any],
*,
root_schema: dict[str, Any] | None = None,
model_name: str | None = None,
enrich_descriptions: bool = False,
__config__: ConfigDict | None = None,
__base__: type[BaseModel] | None = None,
__module__: str = __name__,
@@ -503,6 +559,13 @@ def create_model_from_schema( # type: ignore[no-any-unimported]
json_schema: A dictionary representing the JSON schema.
root_schema: The root schema containing $defs. If not provided, the
current schema is treated as the root schema.
model_name: Override for the model name. If not provided, the schema
``title`` field is used, falling back to ``"DynamicModel"``.
enrich_descriptions: When True, augment field descriptions with
constraint info (format, enum, pattern, min/max, examples) via
:func:`build_rich_field_description`. Useful for LLM-facing tool
schemas where constraints in the description help the model
understand parameter requirements.
__config__: Pydantic configuration for the generated model.
__base__: Base class for the generated model. Defaults to BaseModel.
__module__: Module name for the generated model class.
@@ -539,10 +602,14 @@ def create_model_from_schema( # type: ignore[no-any-unimported]
if "title" not in json_schema and "title" in (root_schema or {}):
json_schema["title"] = (root_schema or {}).get("title")
model_name = json_schema.get("title") or "DynamicModel"
effective_name = model_name or json_schema.get("title") or "DynamicModel"
field_definitions = {
name: _json_schema_to_pydantic_field(
name, prop, json_schema.get("required", []), effective_root
name,
prop,
json_schema.get("required", []),
effective_root,
enrich_descriptions=enrich_descriptions,
)
for name, prop in (json_schema.get("properties", {}) or {}).items()
}
@@ -550,7 +617,7 @@ def create_model_from_schema( # type: ignore[no-any-unimported]
effective_config = __config__ or ConfigDict(extra="forbid")
return create_model_base(
model_name,
effective_name,
__config__=effective_config,
__base__=__base__,
__module__=__module__,
@@ -565,6 +632,8 @@ def _json_schema_to_pydantic_field(
json_schema: dict[str, Any],
required: list[str],
root_schema: dict[str, Any],
*,
enrich_descriptions: bool = False,
) -> Any:
"""Convert a JSON schema property to a Pydantic field definition.
@@ -573,20 +642,29 @@ def _json_schema_to_pydantic_field(
json_schema: The JSON schema for this field.
required: List of required field names.
root_schema: The root schema for resolving $ref.
enrich_descriptions: When True, embed constraints in the description.
Returns:
A tuple of (type, Field) for use with create_model.
"""
type_ = _json_schema_to_pydantic_type(json_schema, root_schema, name_=name.title())
description = json_schema.get("description")
examples = json_schema.get("examples")
type_ = _json_schema_to_pydantic_type(
json_schema, root_schema, name_=name.title(), enrich_descriptions=enrich_descriptions
)
is_required = name in required
field_params: dict[str, Any] = {}
schema_extra: dict[str, Any] = {}
if description:
field_params["description"] = description
if enrich_descriptions:
rich_desc = build_rich_field_description(json_schema)
if rich_desc:
field_params["description"] = rich_desc
else:
description = json_schema.get("description")
if description:
field_params["description"] = description
examples = json_schema.get("examples")
if examples:
schema_extra["examples"] = examples
@@ -702,6 +780,7 @@ def _json_schema_to_pydantic_type(
root_schema: dict[str, Any],
*,
name_: str | None = None,
enrich_descriptions: bool = False,
) -> Any:
"""Convert a JSON schema to a Python/Pydantic type.
@@ -709,6 +788,7 @@ def _json_schema_to_pydantic_type(
json_schema: The JSON schema to convert.
root_schema: The root schema for resolving $ref.
name_: Optional name for nested models.
enrich_descriptions: Propagated to nested model creation.
Returns:
A Python type corresponding to the JSON schema.
@@ -716,7 +796,9 @@ def _json_schema_to_pydantic_type(
ref = json_schema.get("$ref")
if ref:
ref_schema = _resolve_ref(ref, root_schema)
return _json_schema_to_pydantic_type(ref_schema, root_schema, name_=name_)
return _json_schema_to_pydantic_type(
ref_schema, root_schema, name_=name_, enrich_descriptions=enrich_descriptions
)
enum_values = json_schema.get("enum")
if enum_values:
@@ -731,7 +813,10 @@ def _json_schema_to_pydantic_type(
if any_of_schemas:
any_of_types = [
_json_schema_to_pydantic_type(
schema, root_schema, name_=f"{name_ or 'Union'}Option{i}"
schema,
root_schema,
name_=f"{name_ or 'Union'}Option{i}",
enrich_descriptions=enrich_descriptions,
)
for i, schema in enumerate(any_of_schemas)
]
@@ -741,10 +826,14 @@ def _json_schema_to_pydantic_type(
if all_of_schemas:
if len(all_of_schemas) == 1:
return _json_schema_to_pydantic_type(
all_of_schemas[0], root_schema, name_=name_
all_of_schemas[0], root_schema, name_=name_,
enrich_descriptions=enrich_descriptions,
)
merged = _merge_all_of_schemas(all_of_schemas, root_schema)
return _json_schema_to_pydantic_type(merged, root_schema, name_=name_)
return _json_schema_to_pydantic_type(
merged, root_schema, name_=name_,
enrich_descriptions=enrich_descriptions,
)
type_ = json_schema.get("type")
@@ -760,7 +849,8 @@ def _json_schema_to_pydantic_type(
items_schema = json_schema.get("items")
if items_schema:
item_type = _json_schema_to_pydantic_type(
items_schema, root_schema, name_=name_
items_schema, root_schema, name_=name_,
enrich_descriptions=enrich_descriptions,
)
return list[item_type] # type: ignore[valid-type]
return list
@@ -770,7 +860,10 @@ def _json_schema_to_pydantic_type(
json_schema_ = json_schema.copy()
if json_schema_.get("title") is None:
json_schema_["title"] = name_ or "DynamicModel"
return create_model_from_schema(json_schema_, root_schema=root_schema)
return create_model_from_schema(
json_schema_, root_schema=root_schema,
enrich_descriptions=enrich_descriptions,
)
return dict
if type_ == "null":
return None