feat: use json schema for tool argument serialization
Some checks failed
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

- Replace Python representation with JsonSchema for tool arguments
  - Remove deprecated PydanticSchemaParser in favor of direct schema generation
  - Add handling for VAR_POSITIONAL and VAR_KEYWORD parameters
  - Improve tool argument schema collection
This commit is contained in:
Greyson LaLonde
2025-12-11 15:50:19 -05:00
committed by GitHub
parent 9bd8ad51f7
commit 38b0b125d3
15 changed files with 442 additions and 602 deletions

View File

@@ -16,7 +16,7 @@ from crewai.events.types.knowledge_events import (
KnowledgeSearchQueryFailedEvent,
)
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
from crewai.utilities.converter import generate_model_description
from crewai.utilities.pydantic_schema_utils import generate_model_description
if TYPE_CHECKING:

View File

@@ -5,10 +5,9 @@ from __future__ import annotations
from abc import ABC, abstractmethod
import json
import re
from typing import TYPE_CHECKING, Final, Literal
from crewai.utilities.converter import generate_model_description
from typing import TYPE_CHECKING, Any, Final, Literal
from crewai.utilities.pydantic_schema_utils import generate_model_description
if TYPE_CHECKING:
@@ -42,7 +41,7 @@ class BaseConverterAdapter(ABC):
"""
self.agent_adapter = agent_adapter
self._output_format: Literal["json", "pydantic"] | None = None
self._schema: str | None = None
self._schema: dict[str, Any] | None = None
@abstractmethod
def configure_structured_output(self, task: Task) -> None:
@@ -129,7 +128,7 @@ class BaseConverterAdapter(ABC):
@staticmethod
def _configure_format_from_task(
task: Task,
) -> tuple[Literal["json", "pydantic"] | None, str | None]:
) -> tuple[Literal["json", "pydantic"] | None, dict[str, Any] | None]:
"""Determine output format and schema from task requirements.
This is a helper method that examines the task's output requirements

View File

@@ -4,6 +4,7 @@ This module contains the OpenAIConverterAdapter class that handles structured
output conversion for OpenAI agents, supporting JSON and Pydantic model formats.
"""
import json
from typing import Any
from crewai.agents.agent_adapters.base_converter_adapter import BaseConverterAdapter
@@ -61,7 +62,7 @@ class OpenAIConverterAdapter(BaseConverterAdapter):
output_schema: str = (
get_i18n()
.slice("formatted_task_instructions")
.format(output_format=self._schema)
.format(output_format=json.dumps(self._schema, indent=2))
)
return f"{base_prompt}\n\n{output_schema}"

View File

@@ -9,10 +9,10 @@ from pydantic import BaseModel
from typing_extensions import Self
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.converter import generate_model_description
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
)
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.types import LLMMessage

View File

@@ -18,10 +18,10 @@ from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.converter import generate_model_description
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
)
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.types import LLMMessage

View File

@@ -3,15 +3,13 @@ from __future__ import annotations
from abc import ABC, abstractmethod
import asyncio
from collections.abc import Awaitable, Callable
from inspect import signature
from inspect import Parameter, signature
import json
from typing import (
Any,
Generic,
ParamSpec,
TypeVar,
cast,
get_args,
get_origin,
overload,
)
@@ -27,6 +25,7 @@ from typing_extensions import TypeIs
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_utils import generate_model_description
_printer = Printer()
@@ -103,20 +102,40 @@ class BaseTool(BaseModel, ABC):
if v != cls._ArgsSchemaPlaceholder:
return v
return cast(
type[PydanticBaseModel],
type(
f"{cls.__name__}Schema",
(PydanticBaseModel,),
{
"__annotations__": {
k: v
for k, v in cls._run.__annotations__.items()
if k != "return"
},
},
),
)
run_sig = signature(cls._run)
fields: dict[str, Any] = {}
for param_name, param in run_sig.parameters.items():
if param_name in ("self", "return"):
continue
if param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
continue
annotation = param.annotation if param.annotation != param.empty else Any
if param.default is param.empty:
fields[param_name] = (annotation, ...)
else:
fields[param_name] = (annotation, param.default)
if not fields:
arun_sig = signature(cls._arun)
for param_name, param in arun_sig.parameters.items():
if param_name in ("self", "return"):
continue
if param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
continue
annotation = (
param.annotation if param.annotation != param.empty else Any
)
if param.default is param.empty:
fields[param_name] = (annotation, ...)
else:
fields[param_name] = (annotation, param.default)
return create_model(f"{cls.__name__}Schema", **fields)
@field_validator("max_usage_count", mode="before")
@classmethod
@@ -226,24 +245,23 @@ class BaseTool(BaseModel, ABC):
args_schema = getattr(tool, "args_schema", None)
if args_schema is None:
# Infer args_schema from the function signature if not provided
func_signature = signature(tool.func)
annotations = func_signature.parameters
args_fields: dict[str, Any] = {}
for name, param in annotations.items():
if name != "self":
param_annotation = (
param.annotation if param.annotation != param.empty else Any
)
field_info = Field(
default=...,
description="",
)
args_fields[name] = (param_annotation, field_info)
if args_fields:
args_schema = create_model(f"{tool.name}Input", **args_fields)
fields: dict[str, Any] = {}
for name, param in func_signature.parameters.items():
if name == "self":
continue
if param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
continue
param_annotation = (
param.annotation if param.annotation != param.empty else Any
)
if param.default is param.empty:
fields[name] = (param_annotation, ...)
else:
fields[name] = (param_annotation, param.default)
if fields:
args_schema = create_model(f"{tool.name}Input", **fields)
else:
# Create a default schema with no fields if no parameters are found
args_schema = create_model(
f"{tool.name}Input", __base__=PydanticBaseModel
)
@@ -257,53 +275,37 @@ class BaseTool(BaseModel, ABC):
def _set_args_schema(self) -> None:
if self.args_schema is None:
class_name = f"{self.__class__.__name__}Schema"
self.args_schema = cast(
type[PydanticBaseModel],
type(
class_name,
(PydanticBaseModel,),
{
"__annotations__": {
k: v
for k, v in self._run.__annotations__.items()
if k != "return"
},
},
),
run_sig = signature(self._run)
fields: dict[str, Any] = {}
for param_name, param in run_sig.parameters.items():
if param_name in ("self", "return"):
continue
if param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
continue
annotation = (
param.annotation if param.annotation != param.empty else Any
)
if param.default is param.empty:
fields[param_name] = (annotation, ...)
else:
fields[param_name] = (annotation, param.default)
self.args_schema = create_model(
f"{self.__class__.__name__}Schema", **fields
)
def _generate_description(self) -> None:
args_schema = {
name: {
"description": field.description,
"type": BaseTool._get_arg_annotations(field.annotation),
}
for name, field in self.args_schema.model_fields.items()
}
self.description = f"Tool Name: {self.name}\nTool Arguments: {args_schema}\nTool Description: {self.description}"
@staticmethod
def _get_arg_annotations(annotation: type[Any] | None) -> str:
if annotation is None:
return "None"
origin = get_origin(annotation)
args = get_args(annotation)
if origin is None:
return (
annotation.__name__
if hasattr(annotation, "__name__")
else str(annotation)
)
if args:
args_str = ", ".join(BaseTool._get_arg_annotations(arg) for arg in args)
return str(f"{origin.__name__}[{args_str}]")
return str(origin.__name__)
"""Generate the tool description with a JSON schema for arguments."""
schema = generate_model_description(self.args_schema)
args_json = json.dumps(schema["json_schema"]["schema"], indent=2)
self.description = (
f"Tool Name: {self.name}\n"
f"Tool Arguments: {args_json}\n"
f"Tool Description: {self.description}"
)
class Tool(BaseTool, Generic[P, R]):
@@ -406,24 +408,23 @@ class Tool(BaseTool, Generic[P, R]):
args_schema = getattr(tool, "args_schema", None)
if args_schema is None:
# Infer args_schema from the function signature if not provided
func_signature = signature(tool.func)
annotations = func_signature.parameters
args_fields: dict[str, Any] = {}
for name, param in annotations.items():
if name != "self":
param_annotation = (
param.annotation if param.annotation != param.empty else Any
)
field_info = Field(
default=...,
description="",
)
args_fields[name] = (param_annotation, field_info)
if args_fields:
args_schema = create_model(f"{tool.name}Input", **args_fields)
fields: dict[str, Any] = {}
for name, param in func_signature.parameters.items():
if name == "self":
continue
if param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
continue
param_annotation = (
param.annotation if param.annotation != param.empty else Any
)
if param.default is param.empty:
fields[name] = (param_annotation, ...)
else:
fields[name] = (param_annotation, param.default)
if fields:
args_schema = create_model(f"{tool.name}Input", **fields)
else:
# Create a default schema with no fields if no parameters are found
args_schema = create_model(
f"{tool.name}Input", __base__=PydanticBaseModel
)
@@ -502,32 +503,38 @@ def tool(
def _make_tool(f: Callable[P2, R2]) -> Tool[P2, R2]:
if f.__doc__ is None:
raise ValueError("Function must have a docstring")
func_annotations = getattr(f, "__annotations__", None)
if func_annotations is None:
if f.__annotations__ is None:
raise ValueError("Function must have type annotations")
func_sig = signature(f)
fields: dict[str, Any] = {}
for param_name, param in func_sig.parameters.items():
if param_name == "return":
continue
if param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
continue
annotation = (
param.annotation if param.annotation != param.empty else Any
)
if param.default is param.empty:
fields[param_name] = (annotation, ...)
else:
fields[param_name] = (annotation, param.default)
class_name = "".join(tool_name.split()).title()
tool_args_schema = cast(
type[PydanticBaseModel],
type(
class_name,
(PydanticBaseModel,),
{
"__annotations__": {
k: v for k, v in func_annotations.items() if k != "return"
},
},
),
)
args_schema = create_model(class_name, **fields)
return Tool(
name=tool_name,
description=f.__doc__,
func=f,
args_schema=tool_args_schema,
args_schema=args_schema,
result_as_answer=result_as_answer,
max_usage_count=max_usage_count,
current_usage_count=0,
)
return _make_tool

View File

@@ -1,7 +1,5 @@
from __future__ import annotations
from collections.abc import Callable
from copy import deepcopy
import json
import re
from typing import TYPE_CHECKING, Any, Final, TypedDict
@@ -13,6 +11,7 @@ from crewai.agents.agent_builder.utilities.base_output_converter import OutputCo
from crewai.utilities.i18n import get_i18n
from crewai.utilities.internal_instructor import InternalInstructor
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_utils import generate_model_description
if TYPE_CHECKING:
@@ -421,221 +420,3 @@ def create_converter(
raise Exception("No output converter found or set.")
return converter # type: ignore[no-any-return]
def resolve_refs(schema: dict[str, Any]) -> dict[str, Any]:
"""Recursively resolve all local $refs in the given JSON Schema using $defs as the source.
This is needed because Pydantic generates $ref-based schemas that
some consumers (e.g. LLMs, tool frameworks) don't handle well.
Args:
schema: JSON Schema dict that may contain "$refs" and "$defs".
Returns:
A new schema dictionary with all local $refs replaced by their definitions.
"""
defs = schema.get("$defs", {})
schema_copy = deepcopy(schema)
def _resolve(node: Any) -> Any:
if isinstance(node, dict):
ref = node.get("$ref")
if isinstance(ref, str) and ref.startswith("#/$defs/"):
def_name = ref.replace("#/$defs/", "")
if def_name in defs:
return _resolve(deepcopy(defs[def_name]))
raise KeyError(f"Definition '{def_name}' not found in $defs.")
return {k: _resolve(v) for k, v in node.items()}
if isinstance(node, list):
return [_resolve(i) for i in node]
return node
return _resolve(schema_copy) # type: ignore[no-any-return]
def add_key_in_dict_recursively(
d: dict[str, Any], key: str, value: Any, criteria: Callable[[dict[str, Any]], bool]
) -> dict[str, Any]:
"""Recursively adds a key/value pair to all nested dicts matching `criteria`."""
if isinstance(d, dict):
if criteria(d) and key not in d:
d[key] = value
for v in d.values():
add_key_in_dict_recursively(v, key, value, criteria)
elif isinstance(d, list):
for i in d:
add_key_in_dict_recursively(i, key, value, criteria)
return d
def fix_discriminator_mappings(schema: dict[str, Any]) -> dict[str, Any]:
"""Replace '#/$defs/...' references in discriminator.mapping with just the model name."""
output = schema.get("properties", {}).get("output")
if not output:
return schema
disc = output.get("discriminator")
if not disc or "mapping" not in disc:
return schema
disc["mapping"] = {k: v.split("/")[-1] for k, v in disc["mapping"].items()}
return schema
def add_const_to_oneof_variants(schema: dict[str, Any]) -> dict[str, Any]:
"""Add const fields to oneOf variants for discriminated unions.
The json_schema_to_pydantic library requires each oneOf variant to have
a const field for the discriminator property. This function adds those
const fields based on the discriminator mapping.
Args:
schema: JSON Schema dict that may contain discriminated unions
Returns:
Modified schema with const fields added to oneOf variants
"""
def _process_oneof(node: dict[str, Any]) -> dict[str, Any]:
"""Process a single node that might contain a oneOf with discriminator."""
if not isinstance(node, dict):
return node
if "oneOf" in node and "discriminator" in node:
discriminator = node["discriminator"]
property_name = discriminator.get("propertyName")
mapping = discriminator.get("mapping", {})
if property_name and mapping:
one_of_variants = node.get("oneOf", [])
for variant in one_of_variants:
if isinstance(variant, dict) and "properties" in variant:
variant_title = variant.get("title", "")
matched_disc_value = None
for disc_value, schema_name in mapping.items():
if variant_title == schema_name or variant_title.endswith(
schema_name
):
matched_disc_value = disc_value
break
if matched_disc_value is not None:
props = variant["properties"]
if property_name in props:
props[property_name]["const"] = matched_disc_value
for key, value in node.items():
if isinstance(value, dict):
node[key] = _process_oneof(value)
elif isinstance(value, list):
node[key] = [
_process_oneof(item) if isinstance(item, dict) else item
for item in value
]
return node
return _process_oneof(deepcopy(schema))
def convert_oneof_to_anyof(schema: dict[str, Any]) -> dict[str, Any]:
"""Convert oneOf to anyOf for OpenAI compatibility.
OpenAI's Structured Outputs support anyOf better than oneOf.
This recursively converts all oneOf occurrences to anyOf.
Args:
schema: JSON schema dictionary.
Returns:
Modified schema with anyOf instead of oneOf.
"""
if isinstance(schema, dict):
if "oneOf" in schema:
schema["anyOf"] = schema.pop("oneOf")
for value in schema.values():
if isinstance(value, dict):
convert_oneof_to_anyof(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
convert_oneof_to_anyof(item)
return schema
def ensure_all_properties_required(schema: dict[str, Any]) -> dict[str, Any]:
"""Ensure all properties are in the required array for OpenAI strict mode.
OpenAI's strict structured outputs require all properties to be listed
in the required array. This recursively updates all objects to include
all their properties in required.
Args:
schema: JSON schema dictionary.
Returns:
Modified schema with all properties marked as required.
"""
if isinstance(schema, dict):
if schema.get("type") == "object" and "properties" in schema:
properties = schema["properties"]
if properties:
schema["required"] = list(properties.keys())
for value in schema.values():
if isinstance(value, dict):
ensure_all_properties_required(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
ensure_all_properties_required(item)
return schema
def generate_model_description(model: type[BaseModel]) -> dict[str, Any]:
"""Generate JSON schema description of a Pydantic model.
This function takes a Pydantic model class and returns its JSON schema,
which includes full type information, discriminators, and all metadata.
The schema is dereferenced to inline all $ref references for better LLM understanding.
Args:
model: A Pydantic model class.
Returns:
A JSON schema dictionary representation of the model.
"""
json_schema = model.model_json_schema(ref_template="#/$defs/{model}")
json_schema = add_key_in_dict_recursively(
json_schema,
key="additionalProperties",
value=False,
criteria=lambda d: d.get("type") == "object"
and "additionalProperties" not in d,
)
json_schema = resolve_refs(json_schema)
json_schema.pop("$defs", None)
json_schema = fix_discriminator_mappings(json_schema)
json_schema = convert_oneof_to_anyof(json_schema)
json_schema = ensure_all_properties_required(json_schema)
return {
"type": "json_schema",
"json_schema": {
"name": model.__name__,
"strict": True,
"schema": json_schema,
},
}

View File

@@ -1,14 +1,15 @@
from __future__ import annotations
from typing import TYPE_CHECKING, cast
import json
from typing import TYPE_CHECKING, Any, cast
from pydantic import BaseModel, Field
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.task_events import TaskEvaluationEvent
from crewai.llm import LLM
from crewai.utilities.converter import Converter
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
from crewai.utilities.i18n import get_i18n
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.training_converter import TrainingConverter
@@ -62,7 +63,7 @@ class TaskEvaluator:
Args:
original_agent: The agent to evaluate.
"""
self.llm = cast(LLM, original_agent.llm)
self.llm = original_agent.llm
self.original_agent = original_agent
def evaluate(self, task: Task, output: str) -> TaskEvaluation:
@@ -79,7 +80,8 @@ class TaskEvaluator:
- Investigate the Converter.to_pydantic signature, returns BaseModel strictly?
"""
crewai_event_bus.emit(
self, TaskEvaluationEvent(evaluation_type="task_evaluation", task=task)
self,
TaskEvaluationEvent(evaluation_type="task_evaluation", task=task), # type: ignore[no-untyped-call]
)
evaluation_query = (
f"Assess the quality of the task completed based on the description, expected output, and actual results.\n\n"
@@ -94,9 +96,14 @@ class TaskEvaluator:
instructions = "Convert all responses into valid JSON output."
if not self.llm.supports_function_calling():
model_schema = PydanticSchemaParser(model=TaskEvaluation).get_schema()
instructions = f"{instructions}\n\nReturn only valid JSON with the following schema:\n```json\n{model_schema}\n```"
if not self.llm.supports_function_calling(): # type: ignore[union-attr]
schema_dict = generate_model_description(TaskEvaluation)
output_schema: str = (
get_i18n()
.slice("formatted_task_instructions")
.format(output_format=json.dumps(schema_dict, indent=2))
)
instructions = f"{instructions}\n\n{output_schema}"
converter = Converter(
llm=self.llm,
@@ -108,7 +115,7 @@ class TaskEvaluator:
return cast(TaskEvaluation, converter.to_pydantic())
def evaluate_training_data(
self, training_data: dict, agent_id: str
self, training_data: dict[str, Any], agent_id: str
) -> TrainingTaskEvaluation:
"""
Evaluate the training data based on the llm output, human feedback, and improved output.
@@ -121,7 +128,8 @@ class TaskEvaluator:
- Investigate the Converter.to_pydantic signature, returns BaseModel strictly?
"""
crewai_event_bus.emit(
self, TaskEvaluationEvent(evaluation_type="training_data_evaluation")
self,
TaskEvaluationEvent(evaluation_type="training_data_evaluation"), # type: ignore[no-untyped-call]
)
output_training_data = training_data[agent_id]
@@ -164,11 +172,14 @@ class TaskEvaluator:
)
instructions = "I'm gonna convert this raw text into valid JSON."
if not self.llm.supports_function_calling():
model_schema = PydanticSchemaParser(
model=TrainingTaskEvaluation
).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
if not self.llm.supports_function_calling(): # type: ignore[union-attr]
schema_dict = generate_model_description(TrainingTaskEvaluation)
output_schema: str = (
get_i18n()
.slice("formatted_task_instructions")
.format(output_format=json.dumps(schema_dict, indent=2))
)
instructions = f"{instructions}\n\n{output_schema}"
converter = TrainingConverter(
llm=self.llm,

View File

@@ -1,103 +0,0 @@
from typing import Any, Union, get_args, get_origin
from pydantic import BaseModel, Field
class PydanticSchemaParser(BaseModel):
model: type[BaseModel] = Field(..., description="The Pydantic model to parse.")
def get_schema(self) -> str:
"""Public method to get the schema of a Pydantic model.
Returns:
String representation of the model schema.
"""
return "{\n" + self._get_model_schema(self.model) + "\n}"
def _get_model_schema(self, model: type[BaseModel], depth: int = 0) -> str:
"""Recursively get the schema of a Pydantic model, handling nested models and lists.
Args:
model: The Pydantic model to process.
depth: The current depth of recursion for indentation purposes.
Returns:
A string representation of the model schema.
"""
indent: str = " " * 4 * depth
lines: list[str] = [
f"{indent} {field_name}: {self._get_field_type_for_annotation(field.annotation, depth + 1)}"
for field_name, field in model.model_fields.items()
]
return ",\n".join(lines)
def _format_list_type(self, list_item_type: Any, depth: int) -> str:
"""Format a List type, handling nested models if necessary.
Args:
list_item_type: The type of items in the list.
depth: The current depth of recursion for indentation purposes.
Returns:
A string representation of the List type.
"""
if isinstance(list_item_type, type) and issubclass(list_item_type, BaseModel):
nested_schema = self._get_model_schema(list_item_type, depth + 1)
nested_indent = " " * 4 * depth
return f"List[\n{nested_indent}{{\n{nested_schema}\n{nested_indent}}}\n{nested_indent}]"
return f"List[{list_item_type.__name__}]"
def _format_union_type(self, field_type: Any, depth: int) -> str:
"""Format a Union type, handling Optional and nested types.
Args:
field_type: The Union type to format.
depth: The current depth of recursion for indentation purposes.
Returns:
A string representation of the Union type.
"""
args = get_args(field_type)
if type(None) in args:
# It's an Optional type
non_none_args = [arg for arg in args if arg is not type(None)]
if len(non_none_args) == 1:
inner_type = self._get_field_type_for_annotation(
non_none_args[0], depth
)
return f"Optional[{inner_type}]"
# Union with None and multiple other types
inner_types = ", ".join(
self._get_field_type_for_annotation(arg, depth) for arg in non_none_args
)
return f"Optional[Union[{inner_types}]]"
# General Union type
inner_types = ", ".join(
self._get_field_type_for_annotation(arg, depth) for arg in args
)
return f"Union[{inner_types}]"
def _get_field_type_for_annotation(self, annotation: Any, depth: int) -> str:
"""Recursively get the string representation of a field's type annotation.
Args:
annotation: The type annotation to process.
depth: The current depth of recursion for indentation purposes.
Returns:
A string representation of the type annotation.
"""
origin: Any = get_origin(annotation)
if origin is list:
list_item_type = get_args(annotation)[0]
return self._format_list_type(list_item_type, depth)
if origin is dict:
key_type, value_type = get_args(annotation)
return f"Dict[{key_type.__name__}, {value_type.__name__}]"
if origin is Union:
return self._format_union_type(annotation, depth)
if isinstance(annotation, type) and issubclass(annotation, BaseModel):
nested_schema = self._get_model_schema(annotation, depth)
nested_indent = " " * 4 * depth
return f"{annotation.__name__}\n{nested_indent}{{\n{nested_schema}\n{nested_indent}}}"
return annotation.__name__

View File

@@ -0,0 +1,245 @@
"""Utilities for generating JSON schemas from Pydantic models.
This module provides functions for converting Pydantic models to JSON schemas
suitable for use with LLMs and tool definitions.
"""
from collections.abc import Callable
from copy import deepcopy
from typing import Any
from pydantic import BaseModel
def resolve_refs(schema: dict[str, Any]) -> dict[str, Any]:
"""Recursively resolve all local $refs in the given JSON Schema using $defs as the source.
This is needed because Pydantic generates $ref-based schemas that
some consumers (e.g. LLMs, tool frameworks) don't handle well.
Args:
schema: JSON Schema dict that may contain "$refs" and "$defs".
Returns:
A new schema dictionary with all local $refs replaced by their definitions.
"""
defs = schema.get("$defs", {})
schema_copy = deepcopy(schema)
def _resolve(node: Any) -> Any:
if isinstance(node, dict):
ref = node.get("$ref")
if isinstance(ref, str) and ref.startswith("#/$defs/"):
def_name = ref.replace("#/$defs/", "")
if def_name in defs:
return _resolve(deepcopy(defs[def_name]))
raise KeyError(f"Definition '{def_name}' not found in $defs.")
return {k: _resolve(v) for k, v in node.items()}
if isinstance(node, list):
return [_resolve(i) for i in node]
return node
return _resolve(schema_copy) # type: ignore[no-any-return]
def add_key_in_dict_recursively(
d: dict[str, Any], key: str, value: Any, criteria: Callable[[dict[str, Any]], bool]
) -> dict[str, Any]:
"""Recursively adds a key/value pair to all nested dicts matching `criteria`.
Args:
d: The dictionary to modify.
key: The key to add.
value: The value to add.
criteria: A function that returns True for dicts that should receive the key.
Returns:
The modified dictionary.
"""
if isinstance(d, dict):
if criteria(d) and key not in d:
d[key] = value
for v in d.values():
add_key_in_dict_recursively(v, key, value, criteria)
elif isinstance(d, list):
for i in d:
add_key_in_dict_recursively(i, key, value, criteria)
return d
def fix_discriminator_mappings(schema: dict[str, Any]) -> dict[str, Any]:
"""Replace '#/$defs/...' references in discriminator.mapping with just the model name.
Args:
schema: JSON schema dictionary.
Returns:
Modified schema with fixed discriminator mappings.
"""
output = schema.get("properties", {}).get("output")
if not output:
return schema
disc = output.get("discriminator")
if not disc or "mapping" not in disc:
return schema
disc["mapping"] = {k: v.split("/")[-1] for k, v in disc["mapping"].items()}
return schema
def add_const_to_oneof_variants(schema: dict[str, Any]) -> dict[str, Any]:
"""Add const fields to oneOf variants for discriminated unions.
The json_schema_to_pydantic library requires each oneOf variant to have
a const field for the discriminator property. This function adds those
const fields based on the discriminator mapping.
Args:
schema: JSON Schema dict that may contain discriminated unions
Returns:
Modified schema with const fields added to oneOf variants
"""
def _process_oneof(node: dict[str, Any]) -> dict[str, Any]:
"""Process a single node that might contain a oneOf with discriminator."""
if not isinstance(node, dict):
return node
if "oneOf" in node and "discriminator" in node:
discriminator = node["discriminator"]
property_name = discriminator.get("propertyName")
mapping = discriminator.get("mapping", {})
if property_name and mapping:
one_of_variants = node.get("oneOf", [])
for variant in one_of_variants:
if isinstance(variant, dict) and "properties" in variant:
variant_title = variant.get("title", "")
matched_disc_value = None
for disc_value, schema_name in mapping.items():
if variant_title == schema_name or variant_title.endswith(
schema_name
):
matched_disc_value = disc_value
break
if matched_disc_value is not None:
props = variant["properties"]
if property_name in props:
props[property_name]["const"] = matched_disc_value
for key, value in node.items():
if isinstance(value, dict):
node[key] = _process_oneof(value)
elif isinstance(value, list):
node[key] = [
_process_oneof(item) if isinstance(item, dict) else item
for item in value
]
return node
return _process_oneof(deepcopy(schema))
def convert_oneof_to_anyof(schema: dict[str, Any]) -> dict[str, Any]:
"""Convert oneOf to anyOf for OpenAI compatibility.
OpenAI's Structured Outputs support anyOf better than oneOf.
This recursively converts all oneOf occurrences to anyOf.
Args:
schema: JSON schema dictionary.
Returns:
Modified schema with anyOf instead of oneOf.
"""
if isinstance(schema, dict):
if "oneOf" in schema:
schema["anyOf"] = schema.pop("oneOf")
for value in schema.values():
if isinstance(value, dict):
convert_oneof_to_anyof(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
convert_oneof_to_anyof(item)
return schema
def ensure_all_properties_required(schema: dict[str, Any]) -> dict[str, Any]:
"""Ensure all properties are in the required array for OpenAI strict mode.
OpenAI's strict structured outputs require all properties to be listed
in the required array. This recursively updates all objects to include
all their properties in required.
Args:
schema: JSON schema dictionary.
Returns:
Modified schema with all properties marked as required.
"""
if isinstance(schema, dict):
if schema.get("type") == "object" and "properties" in schema:
properties = schema["properties"]
if properties:
schema["required"] = list(properties.keys())
for value in schema.values():
if isinstance(value, dict):
ensure_all_properties_required(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
ensure_all_properties_required(item)
return schema
def generate_model_description(model: type[BaseModel]) -> dict[str, Any]:
"""Generate JSON schema description of a Pydantic model.
This function takes a Pydantic model class and returns its JSON schema,
which includes full type information, discriminators, and all metadata.
The schema is dereferenced to inline all $ref references for better LLM understanding.
Args:
model: A Pydantic model class.
Returns:
A JSON schema dictionary representation of the model.
"""
json_schema = model.model_json_schema(ref_template="#/$defs/{model}")
json_schema = add_key_in_dict_recursively(
json_schema,
key="additionalProperties",
value=False,
criteria=lambda d: d.get("type") == "object"
and "additionalProperties" not in d,
)
json_schema = resolve_refs(json_schema)
json_schema.pop("$defs", None)
json_schema = fix_discriminator_mappings(json_schema)
json_schema = convert_oneof_to_anyof(json_schema)
json_schema = ensure_all_properties_required(json_schema)
return {
"type": "json_schema",
"json_schema": {
"name": model.__name__,
"strict": True,
"schema": json_schema,
},
}

View File

@@ -17,10 +17,11 @@ def test_creating_a_tool_using_annotation():
# Assert all the right attributes were defined
assert my_tool.name == "Name of my tool"
assert (
my_tool.description
== "Tool Name: Name of my tool\nTool Arguments: {'question': {'description': None, 'type': 'str'}}\nTool Description: Clear description for what this tool is useful for, your agent will need this information to use it."
)
assert "Tool Name: Name of my tool" in my_tool.description
assert "Tool Arguments:" in my_tool.description
assert '"question"' in my_tool.description
assert '"type": "string"' in my_tool.description
assert "Tool Description: Clear description for what this tool is useful for" in my_tool.description
assert my_tool.args_schema.model_json_schema()["properties"] == {
"question": {"title": "Question", "type": "string"}
}
@@ -31,10 +32,9 @@ def test_creating_a_tool_using_annotation():
converted_tool = my_tool.to_structured_tool()
assert converted_tool.name == "Name of my tool"
assert (
converted_tool.description
== "Tool Name: Name of my tool\nTool Arguments: {'question': {'description': None, 'type': 'str'}}\nTool Description: Clear description for what this tool is useful for, your agent will need this information to use it."
)
assert "Tool Name: Name of my tool" in converted_tool.description
assert "Tool Arguments:" in converted_tool.description
assert '"question"' in converted_tool.description
assert converted_tool.args_schema.model_json_schema()["properties"] == {
"question": {"title": "Question", "type": "string"}
}
@@ -56,10 +56,11 @@ def test_creating_a_tool_using_baseclass():
# Assert all the right attributes were defined
assert my_tool.name == "Name of my tool"
assert (
my_tool.description
== "Tool Name: Name of my tool\nTool Arguments: {'question': {'description': None, 'type': 'str'}}\nTool Description: Clear description for what this tool is useful for, your agent will need this information to use it."
)
assert "Tool Name: Name of my tool" in my_tool.description
assert "Tool Arguments:" in my_tool.description
assert '"question"' in my_tool.description
assert '"type": "string"' in my_tool.description
assert "Tool Description: Clear description for what this tool is useful for" in my_tool.description
assert my_tool.args_schema.model_json_schema()["properties"] == {
"question": {"title": "Question", "type": "string"}
}
@@ -68,10 +69,9 @@ def test_creating_a_tool_using_baseclass():
converted_tool = my_tool.to_structured_tool()
assert converted_tool.name == "Name of my tool"
assert (
converted_tool.description
== "Tool Name: Name of my tool\nTool Arguments: {'question': {'description': None, 'type': 'str'}}\nTool Description: Clear description for what this tool is useful for, your agent will need this information to use it."
)
assert "Tool Name: Name of my tool" in converted_tool.description
assert "Tool Arguments:" in converted_tool.description
assert '"question"' in converted_tool.description
assert converted_tool.args_schema.model_json_schema()["properties"] == {
"question": {"title": "Question", "type": "string"}
}

View File

@@ -107,25 +107,20 @@ def test_tool_usage_render():
rendered = tool_usage._render()
# Updated checks to match the actual output
# Check that the rendered output contains the expected tool information
assert "Tool Name: Random Number Generator" in rendered
assert "Tool Arguments:" in rendered
assert (
"'min_value': {'description': 'The minimum value of the range (inclusive)', 'type': 'int'}"
in rendered
)
assert (
"'max_value': {'description': 'The maximum value of the range (inclusive)', 'type': 'int'}"
in rendered
)
assert (
"Tool Description: Generates a random number within a specified range"
in rendered
)
assert (
"Tool Name: Random Number Generator\nTool Arguments: {'min_value': {'description': 'The minimum value of the range (inclusive)', 'type': 'int'}, 'max_value': {'description': 'The maximum value of the range (inclusive)', 'type': 'int'}}\nTool Description: Generates a random number within a specified range"
in rendered
)
# Check that the JSON schema format is used (proper JSON schema types)
assert '"min_value"' in rendered
assert '"max_value"' in rendered
assert '"type": "integer"' in rendered
assert '"description": "The minimum value of the range (inclusive)"' in rendered
assert '"description": "The maximum value of the range (inclusive)"' in rendered
def test_validate_tool_input_booleans_and_none():

View File

@@ -1,4 +1,3 @@
from unittest import mock
from unittest.mock import MagicMock, patch
from crewai.utilities.converter import ConverterError
@@ -44,26 +43,26 @@ def test_evaluate_training_data(converter_mock):
)
assert result == function_return_value
converter_mock.assert_has_calls(
[
mock.call(
llm=original_agent.llm,
text="Assess the quality of the training data based on the llm output, human feedback , and llm "
"output improved result.\n\nIteration: data1\nInitial Output:\nInitial output 1\n\nHuman Feedback:\nHuman feedback "
"1\n\nImproved Output:\nImproved output 1\n\n------------------------------------------------\n\nIteration: data2\nInitial Output:\nInitial output 2\n\nHuman "
"Feedback:\nHuman feedback 2\n\nImproved Output:\nImproved output 2\n\n------------------------------------------------\n\nPlease provide:\n- Provide "
"a list of clear, actionable instructions derived from the Human Feedbacks to enhance the Agent's "
"performance. Analyze the differences between Initial Outputs and Improved Outputs to generate specific "
"action items for future tasks. Ensure all key and specificpoints from the human feedback are "
"incorporated into these instructions.\n- A score from 0 to 10 evaluating on completion, quality, and "
"overall performance from the improved output to the initial output based on the human feedback\n",
model=TrainingTaskEvaluation,
instructions="I'm gonna convert this raw text into valid JSON.\n\nThe json should have the "
"following structure, with the following keys:\n{\n suggestions: List[str],\n quality: float,\n final_summary: str\n}",
),
mock.call().to_pydantic(),
]
)
# Verify the converter was called with correct arguments
converter_mock.assert_called_once()
call_kwargs = converter_mock.call_args.kwargs
assert call_kwargs["llm"] == original_agent.llm
assert call_kwargs["model"] == TrainingTaskEvaluation
assert "Iteration: data1" in call_kwargs["text"]
assert "Iteration: data2" in call_kwargs["text"]
instructions = call_kwargs["instructions"]
assert "I'm gonna convert this raw text into valid JSON." in instructions
assert "OpenAPI schema" in instructions
assert '"type": "json_schema"' in instructions
assert '"name": "TrainingTaskEvaluation"' in instructions
assert '"suggestions"' in instructions
assert '"quality"' in instructions
assert '"final_summary"' in instructions
converter_mock.return_value.to_pydantic.assert_called_once()
@patch("crewai.utilities.converter.Converter.to_pydantic")

View File

@@ -16,7 +16,6 @@ from crewai.utilities.converter import (
handle_partial_json,
validate_model,
)
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
from pydantic import BaseModel
import pytest

View File

@@ -1,94 +0,0 @@
from typing import Any, Dict, List, Optional, Set, Tuple, Union
import pytest
from pydantic import BaseModel, Field
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
def test_simple_model():
class SimpleModel(BaseModel):
field1: int
field2: str
parser = PydanticSchemaParser(model=SimpleModel)
schema = parser.get_schema()
expected_schema = """{
field1: int,
field2: str
}"""
assert schema.strip() == expected_schema.strip()
def test_nested_model():
class NestedModel(BaseModel):
nested_field: int
class ParentModel(BaseModel):
parent_field: str
nested: NestedModel
parser = PydanticSchemaParser(model=ParentModel)
schema = parser.get_schema()
expected_schema = """{
parent_field: str,
nested: NestedModel
{
nested_field: int
}
}"""
assert schema.strip() == expected_schema.strip()
def test_model_with_list():
class ListModel(BaseModel):
list_field: List[int]
parser = PydanticSchemaParser(model=ListModel)
schema = parser.get_schema()
expected_schema = """{
list_field: List[int]
}"""
assert schema.strip() == expected_schema.strip()
def test_model_with_optional_field():
class OptionalModel(BaseModel):
optional_field: Optional[str]
parser = PydanticSchemaParser(model=OptionalModel)
schema = parser.get_schema()
expected_schema = """{
optional_field: Optional[str]
}"""
assert schema.strip() == expected_schema.strip()
def test_model_with_union():
class UnionModel(BaseModel):
union_field: Union[int, str]
parser = PydanticSchemaParser(model=UnionModel)
schema = parser.get_schema()
expected_schema = """{
union_field: Union[int, str]
}"""
assert schema.strip() == expected_schema.strip()
def test_model_with_dict():
class DictModel(BaseModel):
dict_field: Dict[str, int]
parser = PydanticSchemaParser(model=DictModel)
schema = parser.get_schema()
expected_schema = """{
dict_field: Dict[str, int]
}"""
assert schema.strip() == expected_schema.strip()