feat: add structured outputs and response_format support across providers (#4280)

* feat: add response_format parameter to Azure and Gemini providers

* feat: add structured outputs support to Bedrock and Anthropic providers

* chore: bump anthropic dep

* fix: use beta structured output for new models
This commit is contained in:
Greyson LaLonde
2026-01-26 14:03:33 -05:00
committed by GitHub
parent a32de6bdac
commit 9797567342
7 changed files with 491 additions and 175 deletions

View File

@@ -90,7 +90,7 @@ azure-ai-inference = [
"azure-ai-inference~=1.0.0b9",
]
anthropic = [
"anthropic~=0.71.0",
"anthropic~=0.73.0",
]
a2a = [
"a2a-sdk~=0.3.10",

View File

@@ -3,9 +3,8 @@ from __future__ import annotations
import json
import logging
import os
from typing import TYPE_CHECKING, Any, Literal, cast
from typing import TYPE_CHECKING, Any, Final, Literal, TypeGuard, cast
from anthropic.types import ThinkingBlock
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
@@ -22,8 +21,9 @@ if TYPE_CHECKING:
from crewai.llms.hooks.base import BaseInterceptor
try:
from anthropic import Anthropic, AsyncAnthropic
from anthropic import Anthropic, AsyncAnthropic, transform_schema
from anthropic.types import Message, TextBlock, ThinkingBlock, ToolUseBlock
from anthropic.types.beta import BetaMessage
import httpx
except ImportError:
raise ImportError(
@@ -31,7 +31,62 @@ except ImportError:
) from None
ANTHROPIC_FILES_API_BETA = "files-api-2025-04-14"
ANTHROPIC_FILES_API_BETA: Final = "files-api-2025-04-14"
ANTHROPIC_STRUCTURED_OUTPUTS_BETA: Final = "structured-outputs-2025-11-13"
NATIVE_STRUCTURED_OUTPUT_MODELS: Final[
tuple[
Literal["claude-sonnet-4-5"],
Literal["claude-sonnet-4.5"],
Literal["claude-opus-4-5"],
Literal["claude-opus-4.5"],
Literal["claude-opus-4-1"],
Literal["claude-opus-4.1"],
Literal["claude-haiku-4-5"],
Literal["claude-haiku-4.5"],
]
] = (
"claude-sonnet-4-5",
"claude-sonnet-4.5",
"claude-opus-4-5",
"claude-opus-4.5",
"claude-opus-4-1",
"claude-opus-4.1",
"claude-haiku-4-5",
"claude-haiku-4.5",
)
def _supports_native_structured_outputs(model: str) -> bool:
"""Check if the model supports native structured outputs.
Native structured outputs are only available for Claude 4.5 models
(Sonnet 4.5, Opus 4.5, Opus 4.1, Haiku 4.5).
Other models require the tool-based fallback approach.
Args:
model: The model name/identifier.
Returns:
True if the model supports native structured outputs.
"""
model_lower = model.lower()
return any(prefix in model_lower for prefix in NATIVE_STRUCTURED_OUTPUT_MODELS)
def _is_pydantic_model_class(obj: Any) -> TypeGuard[type[BaseModel]]:
"""Check if an object is a Pydantic model class.
This distinguishes between Pydantic model classes that support structured
outputs (have model_json_schema) and plain dicts like {"type": "json_object"}.
Args:
obj: The object to check.
Returns:
True if obj is a Pydantic model class.
"""
return isinstance(obj, type) and issubclass(obj, BaseModel)
def _contains_file_id_reference(messages: list[dict[str, Any]]) -> bool:
@@ -84,6 +139,7 @@ class AnthropicCompletion(BaseLLM):
client_params: dict[str, Any] | None = None,
interceptor: BaseInterceptor[httpx.Request, httpx.Response] | None = None,
thinking: AnthropicThinkingConfig | None = None,
response_format: type[BaseModel] | None = None,
**kwargs: Any,
):
"""Initialize Anthropic chat completion client.
@@ -101,6 +157,8 @@ class AnthropicCompletion(BaseLLM):
stream: Enable streaming responses
client_params: Additional parameters for the Anthropic client
interceptor: HTTP interceptor for modifying requests/responses at transport level.
response_format: Pydantic model for structured output. When provided, responses
will be validated against this model schema.
**kwargs: Additional parameters
"""
super().__init__(
@@ -131,6 +189,7 @@ class AnthropicCompletion(BaseLLM):
self.stop_sequences = stop_sequences or []
self.thinking = thinking
self.previous_thinking_blocks: list[ThinkingBlock] = []
self.response_format = response_format
# Model-specific settings
self.is_claude_3 = "claude-3" in model.lower()
self.supports_tools = True
@@ -231,6 +290,8 @@ class AnthropicCompletion(BaseLLM):
formatted_messages, system_message, tools
)
effective_response_model = response_model or self.response_format
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
@@ -238,7 +299,7 @@ class AnthropicCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
response_model,
effective_response_model,
)
return self._handle_completion(
@@ -246,7 +307,7 @@ class AnthropicCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
response_model,
effective_response_model,
)
except Exception as e:
@@ -298,13 +359,15 @@ class AnthropicCompletion(BaseLLM):
formatted_messages, system_message, tools
)
effective_response_model = response_model or self.response_format
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
response_model,
effective_response_model,
)
return await self._ahandle_completion(
@@ -312,7 +375,7 @@ class AnthropicCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
response_model,
effective_response_model,
)
except Exception as e:
@@ -565,22 +628,40 @@ class AnthropicCompletion(BaseLLM):
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""Handle non-streaming message completion."""
if response_model:
structured_tool = {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"input_schema": response_model.model_json_schema(),
}
params["tools"] = [structured_tool]
params["tool_choice"] = {"type": "tool", "name": "structured_output"}
uses_file_api = _contains_file_id_reference(params.get("messages", []))
betas: list[str] = []
use_native_structured_output = False
if uses_file_api:
betas.append(ANTHROPIC_FILES_API_BETA)
extra_body: dict[str, Any] | None = None
if _is_pydantic_model_class(response_model):
schema = transform_schema(response_model.model_json_schema())
if _supports_native_structured_outputs(self.model):
use_native_structured_output = True
betas.append(ANTHROPIC_STRUCTURED_OUTPUTS_BETA)
extra_body = {
"output_format": {
"type": "json_schema",
"schema": schema,
}
}
else:
structured_tool = {
"name": "structured_output",
"description": "Output the structured response",
"input_schema": schema,
}
params["tools"] = [structured_tool]
params["tool_choice"] = {"type": "tool", "name": "structured_output"}
try:
if uses_file_api:
params["betas"] = [ANTHROPIC_FILES_API_BETA]
response = self.client.beta.messages.create(**params)
if betas:
params["betas"] = betas
response = self.client.beta.messages.create(
**params, extra_body=extra_body
)
else:
response = self.client.messages.create(**params)
@@ -593,22 +674,34 @@ class AnthropicCompletion(BaseLLM):
usage = self._extract_anthropic_token_usage(response)
self._track_token_usage_internal(usage)
if response_model and response.content:
tool_uses = [
block for block in response.content if isinstance(block, ToolUseBlock)
]
if tool_uses and tool_uses[0].name == "structured_output":
structured_data = tool_uses[0].input
structured_json = json.dumps(structured_data)
self._emit_call_completed_event(
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_json
if _is_pydantic_model_class(response_model) and response.content:
if use_native_structured_output:
for block in response.content:
if isinstance(block, TextBlock):
structured_json = block.text
self._emit_call_completed_event(
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_json
else:
for block in response.content:
if (
isinstance(block, ToolUseBlock)
and block.name == "structured_output"
):
structured_json = json.dumps(block.input)
self._emit_call_completed_event(
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_json
# Check if Claude wants to use tools
if response.content:
@@ -678,17 +771,31 @@ class AnthropicCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str:
) -> str | Any:
"""Handle streaming message completion."""
if response_model:
structured_tool = {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"input_schema": response_model.model_json_schema(),
}
betas: list[str] = []
use_native_structured_output = False
params["tools"] = [structured_tool]
params["tool_choice"] = {"type": "tool", "name": "structured_output"}
extra_body: dict[str, Any] | None = None
if _is_pydantic_model_class(response_model):
schema = transform_schema(response_model.model_json_schema())
if _supports_native_structured_outputs(self.model):
use_native_structured_output = True
betas.append(ANTHROPIC_STRUCTURED_OUTPUTS_BETA)
extra_body = {
"output_format": {
"type": "json_schema",
"schema": schema,
}
}
else:
structured_tool = {
"name": "structured_output",
"description": "Output the structured response",
"input_schema": schema,
}
params["tools"] = [structured_tool]
params["tool_choice"] = {"type": "tool", "name": "structured_output"}
full_response = ""
@@ -696,15 +803,22 @@ class AnthropicCompletion(BaseLLM):
# (the SDK sets it internally)
stream_params = {k: v for k, v in params.items() if k != "stream"}
if betas:
stream_params["betas"] = betas
current_tool_calls: dict[int, dict[str, Any]] = {}
# Make streaming API call
with self.client.messages.stream(**stream_params) as stream:
stream_context = (
self.client.beta.messages.stream(**stream_params, extra_body=extra_body)
if betas
else self.client.messages.stream(**stream_params)
)
with stream_context as stream:
response_id = None
for event in stream:
if hasattr(event, "message") and hasattr(event.message, "id"):
response_id = event.message.id
if hasattr(event, "delta") and hasattr(event.delta, "text"):
text_delta = event.delta.text
full_response += text_delta
@@ -712,7 +826,7 @@ class AnthropicCompletion(BaseLLM):
chunk=text_delta,
from_task=from_task,
from_agent=from_agent,
response_id=response_id
response_id=response_id,
)
if event.type == "content_block_start":
@@ -739,7 +853,7 @@ class AnthropicCompletion(BaseLLM):
"index": block_index,
},
call_type=LLMCallType.TOOL_CALL,
response_id=response_id
response_id=response_id,
)
elif event.type == "content_block_delta":
if event.delta.type == "input_json_delta":
@@ -763,10 +877,10 @@ class AnthropicCompletion(BaseLLM):
"index": block_index,
},
call_type=LLMCallType.TOOL_CALL,
response_id=response_id
response_id=response_id,
)
final_message: Message = stream.get_final_message()
final_message = stream.get_final_message()
thinking_blocks: list[ThinkingBlock] = []
if final_message.content:
@@ -781,25 +895,30 @@ class AnthropicCompletion(BaseLLM):
usage = self._extract_anthropic_token_usage(final_message)
self._track_token_usage_internal(usage)
if response_model and final_message.content:
tool_uses = [
block
for block in final_message.content
if isinstance(block, ToolUseBlock)
]
if tool_uses and tool_uses[0].name == "structured_output":
structured_data = tool_uses[0].input
structured_json = json.dumps(structured_data)
if _is_pydantic_model_class(response_model):
if use_native_structured_output:
self._emit_call_completed_event(
response=structured_json,
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_json
return full_response
for block in final_message.content:
if (
isinstance(block, ToolUseBlock)
and block.name == "structured_output"
):
structured_json = json.dumps(block.input)
self._emit_call_completed_event(
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_json
if final_message.content:
tool_uses = [
@@ -809,11 +928,9 @@ class AnthropicCompletion(BaseLLM):
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
# Handle tool use conversation flow internally
return self._handle_tool_use_conversation(
final_message,
tool_uses,
@@ -823,10 +940,8 @@ class AnthropicCompletion(BaseLLM):
from_agent,
)
# Apply stop words to full response
full_response = self._apply_stop_words(full_response)
# Emit completion event and return full response
self._emit_call_completed_event(
response=full_response,
call_type=LLMCallType.LLM_CALL,
@@ -884,7 +999,7 @@ class AnthropicCompletion(BaseLLM):
def _handle_tool_use_conversation(
self,
initial_response: Message,
initial_response: Message | BetaMessage,
tool_uses: list[ToolUseBlock],
params: dict[str, Any],
available_functions: dict[str, Any],
@@ -1002,22 +1117,40 @@ class AnthropicCompletion(BaseLLM):
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""Handle non-streaming async message completion."""
if response_model:
structured_tool = {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"input_schema": response_model.model_json_schema(),
}
params["tools"] = [structured_tool]
params["tool_choice"] = {"type": "tool", "name": "structured_output"}
uses_file_api = _contains_file_id_reference(params.get("messages", []))
betas: list[str] = []
use_native_structured_output = False
if uses_file_api:
betas.append(ANTHROPIC_FILES_API_BETA)
extra_body: dict[str, Any] | None = None
if _is_pydantic_model_class(response_model):
schema = transform_schema(response_model.model_json_schema())
if _supports_native_structured_outputs(self.model):
use_native_structured_output = True
betas.append(ANTHROPIC_STRUCTURED_OUTPUTS_BETA)
extra_body = {
"output_format": {
"type": "json_schema",
"schema": schema,
}
}
else:
structured_tool = {
"name": "structured_output",
"description": "Output the structured response",
"input_schema": schema,
}
params["tools"] = [structured_tool]
params["tool_choice"] = {"type": "tool", "name": "structured_output"}
try:
if uses_file_api:
params["betas"] = [ANTHROPIC_FILES_API_BETA]
response = await self.async_client.beta.messages.create(**params)
if betas:
params["betas"] = betas
response = await self.async_client.beta.messages.create(
**params, extra_body=extra_body
)
else:
response = await self.async_client.messages.create(**params)
@@ -1030,23 +1163,34 @@ class AnthropicCompletion(BaseLLM):
usage = self._extract_anthropic_token_usage(response)
self._track_token_usage_internal(usage)
if response_model and response.content:
tool_uses = [
block for block in response.content if isinstance(block, ToolUseBlock)
]
if tool_uses and tool_uses[0].name == "structured_output":
structured_data = tool_uses[0].input
structured_json = json.dumps(structured_data)
self._emit_call_completed_event(
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_json
if _is_pydantic_model_class(response_model) and response.content:
if use_native_structured_output:
for block in response.content:
if isinstance(block, TextBlock):
structured_json = block.text
self._emit_call_completed_event(
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_json
else:
for block in response.content:
if (
isinstance(block, ToolUseBlock)
and block.name == "structured_output"
):
structured_json = json.dumps(block.input)
self._emit_call_completed_event(
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_json
if response.content:
tool_uses = [
@@ -1102,25 +1246,49 @@ class AnthropicCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str:
) -> str | Any:
"""Handle async streaming message completion."""
if response_model:
structured_tool = {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"input_schema": response_model.model_json_schema(),
}
betas: list[str] = []
use_native_structured_output = False
params["tools"] = [structured_tool]
params["tool_choice"] = {"type": "tool", "name": "structured_output"}
extra_body: dict[str, Any] | None = None
if _is_pydantic_model_class(response_model):
schema = transform_schema(response_model.model_json_schema())
if _supports_native_structured_outputs(self.model):
use_native_structured_output = True
betas.append(ANTHROPIC_STRUCTURED_OUTPUTS_BETA)
extra_body = {
"output_format": {
"type": "json_schema",
"schema": schema,
}
}
else:
structured_tool = {
"name": "structured_output",
"description": "Output the structured response",
"input_schema": schema,
}
params["tools"] = [structured_tool]
params["tool_choice"] = {"type": "tool", "name": "structured_output"}
full_response = ""
stream_params = {k: v for k, v in params.items() if k != "stream"}
if betas:
stream_params["betas"] = betas
current_tool_calls: dict[int, dict[str, Any]] = {}
async with self.async_client.messages.stream(**stream_params) as stream:
stream_context = (
self.async_client.beta.messages.stream(
**stream_params, extra_body=extra_body
)
if betas
else self.async_client.messages.stream(**stream_params)
)
async with stream_context as stream:
response_id = None
async for event in stream:
if hasattr(event, "message") and hasattr(event.message, "id"):
@@ -1133,7 +1301,7 @@ class AnthropicCompletion(BaseLLM):
chunk=text_delta,
from_task=from_task,
from_agent=from_agent,
response_id=response_id
response_id=response_id,
)
if event.type == "content_block_start":
@@ -1160,7 +1328,7 @@ class AnthropicCompletion(BaseLLM):
"index": block_index,
},
call_type=LLMCallType.TOOL_CALL,
response_id=response_id
response_id=response_id,
)
elif event.type == "content_block_delta":
if event.delta.type == "input_json_delta":
@@ -1184,33 +1352,38 @@ class AnthropicCompletion(BaseLLM):
"index": block_index,
},
call_type=LLMCallType.TOOL_CALL,
response_id=response_id
response_id=response_id,
)
final_message: Message = await stream.get_final_message()
final_message = await stream.get_final_message()
usage = self._extract_anthropic_token_usage(final_message)
self._track_token_usage_internal(usage)
if response_model and final_message.content:
tool_uses = [
block
for block in final_message.content
if isinstance(block, ToolUseBlock)
]
if tool_uses and tool_uses[0].name == "structured_output":
structured_data = tool_uses[0].input
structured_json = json.dumps(structured_data)
if _is_pydantic_model_class(response_model):
if use_native_structured_output:
self._emit_call_completed_event(
response=structured_json,
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_json
return full_response
for block in final_message.content:
if (
isinstance(block, ToolUseBlock)
and block.name == "structured_output"
):
structured_json = json.dumps(block.input)
self._emit_call_completed_event(
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_json
if final_message.content:
tool_uses = [
@@ -1220,7 +1393,6 @@ class AnthropicCompletion(BaseLLM):
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
@@ -1247,7 +1419,7 @@ class AnthropicCompletion(BaseLLM):
async def _ahandle_tool_use_conversation(
self,
initial_response: Message,
initial_response: Message | BetaMessage,
tool_uses: list[ToolUseBlock],
params: dict[str, Any],
available_functions: dict[str, Any],
@@ -1356,7 +1528,9 @@ class AnthropicCompletion(BaseLLM):
return int(200000 * CONTEXT_WINDOW_USAGE_RATIO)
@staticmethod
def _extract_anthropic_token_usage(response: Message) -> dict[str, Any]:
def _extract_anthropic_token_usage(
response: Message | BetaMessage,
) -> dict[str, Any]:
"""Extract token usage from Anthropic response."""
if hasattr(response, "usage") and response.usage:
usage = response.usage

View File

@@ -92,6 +92,7 @@ class AzureCompletion(BaseLLM):
stop: list[str] | None = None,
stream: bool = False,
interceptor: BaseInterceptor[Any, Any] | None = None,
response_format: type[BaseModel] | None = None,
**kwargs: Any,
):
"""Initialize Azure AI Inference chat completion client.
@@ -111,6 +112,9 @@ class AzureCompletion(BaseLLM):
stop: Stop sequences
stream: Enable streaming responses
interceptor: HTTP interceptor (not yet supported for Azure).
response_format: Pydantic model for structured output. Used as default when
response_model is not passed to call()/acall() methods.
Only works with OpenAI models deployed on Azure.
**kwargs: Additional parameters
"""
if interceptor is not None:
@@ -165,6 +169,7 @@ class AzureCompletion(BaseLLM):
self.presence_penalty = presence_penalty
self.max_tokens = max_tokens
self.stream = stream
self.response_format = response_format
self.is_openai_model = any(
prefix in model.lower() for prefix in ["gpt-", "o1-", "text-"]
@@ -298,6 +303,7 @@ class AzureCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
)
effective_response_model = response_model or self.response_format
# Format messages for Azure
formatted_messages = self._format_messages_for_azure(messages)
@@ -307,7 +313,7 @@ class AzureCompletion(BaseLLM):
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, tools, response_model
formatted_messages, tools, effective_response_model
)
# Handle streaming vs non-streaming
@@ -317,7 +323,7 @@ class AzureCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
response_model,
effective_response_model,
)
return self._handle_completion(
@@ -325,7 +331,7 @@ class AzureCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
response_model,
effective_response_model,
)
except Exception as e:
@@ -364,11 +370,12 @@ class AzureCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
)
effective_response_model = response_model or self.response_format
formatted_messages = self._format_messages_for_azure(messages)
completion_params = self._prepare_completion_params(
formatted_messages, tools, response_model
formatted_messages, tools, effective_response_model
)
if self.stream:
@@ -377,7 +384,7 @@ class AzureCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
response_model,
effective_response_model,
)
return await self._ahandle_completion(
@@ -385,7 +392,7 @@ class AzureCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
response_model,
effective_response_model,
)
except Exception as e:
@@ -726,7 +733,7 @@ class AzureCompletion(BaseLLM):
"""
if update.choices:
choice = update.choices[0]
response_id = update.id if hasattr(update,"id") else None
response_id = update.id if hasattr(update, "id") else None
if choice.delta and choice.delta.content:
content_delta = choice.delta.content
full_response += content_delta
@@ -734,7 +741,7 @@ class AzureCompletion(BaseLLM):
chunk=content_delta,
from_task=from_task,
from_agent=from_agent,
response_id=response_id
response_id=response_id,
)
if choice.delta and choice.delta.tool_calls:
@@ -769,7 +776,7 @@ class AzureCompletion(BaseLLM):
"index": idx,
},
call_type=LLMCallType.TOOL_CALL,
response_id=response_id
response_id=response_id,
)
return full_response

View File

@@ -172,6 +172,7 @@ class BedrockCompletion(BaseLLM):
additional_model_request_fields: dict[str, Any] | None = None,
additional_model_response_field_paths: list[str] | None = None,
interceptor: BaseInterceptor[Any, Any] | None = None,
response_format: type[BaseModel] | None = None,
**kwargs: Any,
) -> None:
"""Initialize AWS Bedrock completion client.
@@ -192,6 +193,8 @@ class BedrockCompletion(BaseLLM):
additional_model_request_fields: Model-specific request parameters
additional_model_response_field_paths: Custom response field paths
interceptor: HTTP interceptor (not yet supported for Bedrock).
response_format: Pydantic model for structured output. Used as default when
response_model is not passed to call()/acall() methods.
**kwargs: Additional parameters
"""
if interceptor is not None:
@@ -248,6 +251,7 @@ class BedrockCompletion(BaseLLM):
self.top_k = top_k
self.stream = stream
self.stop_sequences = stop_sequences
self.response_format = response_format
# Store advanced features (optional)
self.guardrail_config = guardrail_config
@@ -299,6 +303,8 @@ class BedrockCompletion(BaseLLM):
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""Call AWS Bedrock Converse API."""
effective_response_model = response_model or self.response_format
try:
# Emit call started event
self._emit_call_started_event(
@@ -375,6 +381,7 @@ class BedrockCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_converse(
@@ -383,6 +390,7 @@ class BedrockCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
@@ -425,6 +433,8 @@ class BedrockCompletion(BaseLLM):
NotImplementedError: If aiobotocore is not installed.
LLMContextLengthExceededError: If context window is exceeded.
"""
effective_response_model = response_model or self.response_format
if not AIOBOTOCORE_AVAILABLE:
raise NotImplementedError(
"Async support for AWS Bedrock requires aiobotocore. "
@@ -494,11 +504,21 @@ class BedrockCompletion(BaseLLM):
if self.stream:
return await self._ahandle_streaming_converse(
formatted_messages, body, available_functions, from_task, from_agent
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_converse(
formatted_messages, body, available_functions, from_task, from_agent
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
@@ -520,10 +540,29 @@ class BedrockCompletion(BaseLLM):
available_functions: Mapping[str, Any] | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
) -> str:
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""Handle non-streaming converse API call following AWS best practices."""
if response_model:
structured_tool: ConverseToolTypeDef = {
"toolSpec": {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"inputSchema": {"json": response_model.model_json_schema()},
}
}
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(
object,
{
"tools": [structured_tool],
"toolChoice": {"tool": {"name": "structured_output"}},
},
),
)
try:
# Validate messages format before API call
if not messages:
raise ValueError("Messages cannot be empty")
@@ -571,6 +610,21 @@ class BedrockCompletion(BaseLLM):
# If there are tool uses but no available_functions, return them for the executor to handle
tool_uses = [block["toolUse"] for block in content if "toolUse" in block]
if response_model and tool_uses:
for tool_use in tool_uses:
if tool_use.get("name") == "structured_output":
structured_data = tool_use.get("input", {})
result = response_model.model_validate(structured_data)
self._emit_call_completed_event(
response=result.model_dump_json(),
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages,
)
return result
if tool_uses and not available_functions:
self._emit_call_completed_event(
response=tool_uses,
@@ -717,8 +771,28 @@ class BedrockCompletion(BaseLLM):
available_functions: dict[str, Any] | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str:
"""Handle streaming converse API call with comprehensive event handling."""
if response_model:
structured_tool: ConverseToolTypeDef = {
"toolSpec": {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"inputSchema": {"json": response_model.model_json_schema()},
}
}
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(
object,
{
"tools": [structured_tool],
"toolChoice": {"tool": {"name": "structured_output"}},
},
),
)
full_response = ""
current_tool_use: dict[str, Any] | None = None
tool_use_id: str | None = None
@@ -805,7 +879,7 @@ class BedrockCompletion(BaseLLM):
"index": tool_use_index,
},
call_type=LLMCallType.TOOL_CALL,
response_id=response_id
response_id=response_id,
)
elif "contentBlockStop" in event:
logging.debug("Content block stopped in stream")
@@ -929,8 +1003,28 @@ class BedrockCompletion(BaseLLM):
available_functions: Mapping[str, Any] | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
) -> str:
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""Handle async non-streaming converse API call."""
if response_model:
structured_tool: ConverseToolTypeDef = {
"toolSpec": {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"inputSchema": {"json": response_model.model_json_schema()},
}
}
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(
object,
{
"tools": [structured_tool],
"toolChoice": {"tool": {"name": "structured_output"}},
},
),
)
try:
if not messages:
raise ValueError("Messages cannot be empty")
@@ -976,6 +1070,21 @@ class BedrockCompletion(BaseLLM):
# If there are tool uses but no available_functions, return them for the executor to handle
tool_uses = [block["toolUse"] for block in content if "toolUse" in block]
if response_model and tool_uses:
for tool_use in tool_uses:
if tool_use.get("name") == "structured_output":
structured_data = tool_use.get("input", {})
result = response_model.model_validate(structured_data)
self._emit_call_completed_event(
response=result.model_dump_json(),
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages,
)
return result
if tool_uses and not available_functions:
self._emit_call_completed_event(
response=tool_uses,
@@ -1106,8 +1215,28 @@ class BedrockCompletion(BaseLLM):
available_functions: dict[str, Any] | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str:
"""Handle async streaming converse API call."""
if response_model:
structured_tool: ConverseToolTypeDef = {
"toolSpec": {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"inputSchema": {"json": response_model.model_json_schema()},
}
}
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(
object,
{
"tools": [structured_tool],
"toolChoice": {"tool": {"name": "structured_output"}},
},
),
)
full_response = ""
current_tool_use: dict[str, Any] | None = None
tool_use_id: str | None = None
@@ -1174,7 +1303,7 @@ class BedrockCompletion(BaseLLM):
chunk=text_chunk,
from_task=from_task,
from_agent=from_agent,
response_id=response_id
response_id=response_id,
)
elif "toolUse" in delta and current_tool_use:
tool_input = delta["toolUse"].get("input", "")

View File

@@ -56,6 +56,7 @@ class GeminiCompletion(BaseLLM):
client_params: dict[str, Any] | None = None,
interceptor: BaseInterceptor[Any, Any] | None = None,
use_vertexai: bool | None = None,
response_format: type[BaseModel] | None = None,
**kwargs: Any,
):
"""Initialize Google Gemini chat completion client.
@@ -86,6 +87,8 @@ class GeminiCompletion(BaseLLM):
- None (default): Check GOOGLE_GENAI_USE_VERTEXAI env var
When using Vertex AI with API key (Express mode), http_options with
api_version="v1" is automatically configured.
response_format: Pydantic model for structured output. Used as default when
response_model is not passed to call()/acall() methods.
**kwargs: Additional parameters
"""
if interceptor is not None:
@@ -121,6 +124,7 @@ class GeminiCompletion(BaseLLM):
self.safety_settings = safety_settings or {}
self.stop_sequences = stop_sequences or []
self.tools: list[dict[str, Any]] | None = None
self.response_format = response_format
# Model-specific settings
version_match = re.search(r"gemini-(\d+(?:\.\d+)?)", model.lower())
@@ -292,6 +296,7 @@ class GeminiCompletion(BaseLLM):
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
formatted_content, system_instruction = self._format_messages_for_gemini(
messages
@@ -303,7 +308,7 @@ class GeminiCompletion(BaseLLM):
raise ValueError("LLM call blocked by before_llm_call hook")
config = self._prepare_generation_config(
system_instruction, tools, response_model
system_instruction, tools, effective_response_model
)
if self.stream:
@@ -313,7 +318,7 @@ class GeminiCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
response_model,
effective_response_model,
)
return self._handle_completion(
@@ -322,7 +327,7 @@ class GeminiCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
response_model,
effective_response_model,
)
except APIError as e:
@@ -374,13 +379,14 @@ class GeminiCompletion(BaseLLM):
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
formatted_content, system_instruction = self._format_messages_for_gemini(
messages
)
config = self._prepare_generation_config(
system_instruction, tools, response_model
system_instruction, tools, effective_response_model
)
if self.stream:
@@ -390,7 +396,7 @@ class GeminiCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
response_model,
effective_response_model,
)
return await self._ahandle_completion(
@@ -399,7 +405,7 @@ class GeminiCompletion(BaseLLM):
available_functions,
from_task,
from_agent,
response_model,
effective_response_model,
)
except APIError as e:
@@ -570,10 +576,10 @@ class GeminiCompletion(BaseLLM):
types.Content(role="user", parts=[function_response_part])
)
elif role == "assistant" and message.get("tool_calls"):
parts: list[types.Part] = []
tool_parts: list[types.Part] = []
if text_content:
parts.append(types.Part.from_text(text=text_content))
tool_parts.append(types.Part.from_text(text=text_content))
tool_calls: list[dict[str, Any]] = message.get("tool_calls") or []
for tool_call in tool_calls:
@@ -592,11 +598,11 @@ class GeminiCompletion(BaseLLM):
else:
func_args = func_args_raw
parts.append(
tool_parts.append(
types.Part.from_function_call(name=func_name, args=func_args)
)
contents.append(types.Content(role="model", parts=parts))
contents.append(types.Content(role="model", parts=tool_parts))
else:
# Convert role for Gemini (assistant -> model)
gemini_role = "model" if role == "assistant" else "user"
@@ -790,7 +796,7 @@ class GeminiCompletion(BaseLLM):
Returns:
Tuple of (updated full_response, updated function_calls, updated usage_data)
"""
response_id=chunk.response_id if hasattr(chunk,"response_id") else None
response_id = chunk.response_id if hasattr(chunk, "response_id") else None
if chunk.usage_metadata:
usage_data = self._extract_token_usage(chunk)
@@ -800,7 +806,7 @@ class GeminiCompletion(BaseLLM):
chunk=chunk.text,
from_task=from_task,
from_agent=from_agent,
response_id=response_id
response_id=response_id,
)
if chunk.candidates:
@@ -837,7 +843,7 @@ class GeminiCompletion(BaseLLM):
"index": call_index,
},
call_type=LLMCallType.TOOL_CALL,
response_id=response_id
response_id=response_id,
)
return full_response, function_calls, usage_data
@@ -972,7 +978,7 @@ class GeminiCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str:
) -> str | Any:
"""Handle streaming content generation."""
full_response = ""
function_calls: dict[int, dict[str, Any]] = {}
@@ -1050,7 +1056,7 @@ class GeminiCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str:
) -> str | Any:
"""Handle async streaming content generation."""
full_response = ""
function_calls: dict[int, dict[str, Any]] = {}

View File

@@ -1,6 +1,8 @@
interactions:
- request:
body: '{"max_tokens":4096,"messages":[{"role":"user","content":"Say hello in French"}],"model":"claude-sonnet-4-0","stream":false,"tool_choice":{"type":"tool","name":"structured_output"},"tools":[{"name":"structured_output","description":"Returns structured data according to the schema","input_schema":{"description":"Response model for greeting test.","properties":{"greeting":{"title":"Greeting","type":"string"},"language":{"title":"Language","type":"string"}},"required":["greeting","language"],"title":"GreetingResponse","type":"object"}}]}'
body: '{"max_tokens":4096,"messages":[{"role":"user","content":"Say hello in French"}],"model":"claude-sonnet-4-0","stream":false,"tool_choice":{"type":"tool","name":"structured_output"},"tools":[{"name":"structured_output","description":"Output
the structured response","input_schema":{"type":"object","description":"Response
model for greeting test.","title":"GreetingResponse","properties":{"greeting":{"type":"string","title":"Greeting"},"language":{"type":"string","title":"Language"}},"additionalProperties":false,"required":["greeting","language"]}}]}'
headers:
User-Agent:
- X-USER-AGENT-XXX
@@ -13,7 +15,7 @@ interactions:
connection:
- keep-alive
content-length:
- '539'
- '551'
content-type:
- application/json
host:
@@ -29,7 +31,7 @@ interactions:
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 0.75.0
- 0.76.0
x-stainless-retry-count:
- '0'
x-stainless-runtime:
@@ -42,7 +44,7 @@ interactions:
uri: https://api.anthropic.com/v1/messages
response:
body:
string: '{"model":"claude-sonnet-4-20250514","id":"msg_01XjvX2nCho1knuucbwwgCpw","type":"message","role":"assistant","content":[{"type":"tool_use","id":"toolu_019rfPRSDmBb7CyCTdGMv5rK","name":"structured_output","input":{"greeting":"Bonjour","language":"French"}}],"stop_reason":"tool_use","stop_sequence":null,"usage":{"input_tokens":432,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":53,"service_tier":"standard"}}'
string: '{"model":"claude-sonnet-4-20250514","id":"msg_01CKTyVmak15L5oQ36mv4sL9","type":"message","role":"assistant","content":[{"type":"tool_use","id":"toolu_0174BYmn6xiSnUwVhFD8S7EW","name":"structured_output","input":{"greeting":"Bonjour","language":"French"}}],"stop_reason":"tool_use","stop_sequence":null,"usage":{"input_tokens":436,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":53,"service_tier":"standard"}}'
headers:
CF-RAY:
- CF-RAY-XXX
@@ -51,7 +53,7 @@ interactions:
Content-Type:
- application/json
Date:
- Mon, 01 Dec 2025 11:19:38 GMT
- Mon, 26 Jan 2026 14:59:34 GMT
Server:
- cloudflare
Transfer-Encoding:
@@ -82,12 +84,10 @@ interactions:
- DYNAMIC
request-id:
- REQUEST-ID-XXX
retry-after:
- '24'
strict-transport-security:
- STS-XXX
x-envoy-upstream-service-time:
- '2101'
- '968'
status:
code: 200
message: OK

8
uv.lock generated
View File

@@ -310,7 +310,7 @@ wheels = [
[[package]]
name = "anthropic"
version = "0.71.1"
version = "0.73.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
@@ -322,9 +322,9 @@ dependencies = [
{ name = "sniffio" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/05/4b/19620875841f692fdc35eb58bf0201c8ad8c47b8443fecbf1b225312175b/anthropic-0.71.1.tar.gz", hash = "sha256:a77d156d3e7d318b84681b59823b2dee48a8ac508a3e54e49f0ab0d074e4b0da", size = 493294, upload-time = "2025-10-28T17:28:42.213Z" }
sdist = { url = "https://files.pythonhosted.org/packages/f0/07/f550112c3f5299d02f06580577f602e8a112b1988ad7c98ac1a8f7292d7e/anthropic-0.73.0.tar.gz", hash = "sha256:30f0d7d86390165f86af6ca7c3041f8720bb2e1b0e12a44525c8edfdbd2c5239", size = 425168, upload-time = "2025-11-14T18:47:52.635Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/4b/68/b2f988b13325f9ac9921b1e87f0b7994468014e1b5bd3bdbd2472f5baf45/anthropic-0.71.1-py3-none-any.whl", hash = "sha256:6ca6c579f0899a445faeeed9c0eb97aa4bdb751196262f9ccc96edfc0bb12679", size = 355020, upload-time = "2025-10-28T17:28:40.653Z" },
{ url = "https://files.pythonhosted.org/packages/15/b1/5d4d3f649e151e58dc938cf19c4d0cd19fca9a986879f30fea08a7b17138/anthropic-0.73.0-py3-none-any.whl", hash = "sha256:0d56cd8b3ca3fea9c9b5162868bdfd053fbc189b8b56d4290bd2d427b56db769", size = 367839, upload-time = "2025-11-14T18:47:51.195Z" },
]
[[package]]
@@ -1276,7 +1276,7 @@ requires-dist = [
{ name = "aiobotocore", marker = "extra == 'aws'", specifier = "~=2.25.2" },
{ name = "aiocache", extras = ["memcached", "redis"], marker = "extra == 'a2a'", specifier = "~=0.12.3" },
{ name = "aiosqlite", specifier = "~=0.21.0" },
{ name = "anthropic", marker = "extra == 'anthropic'", specifier = "~=0.71.0" },
{ name = "anthropic", marker = "extra == 'anthropic'", specifier = "~=0.73.0" },
{ name = "appdirs", specifier = "~=1.4.4" },
{ name = "azure-ai-inference", marker = "extra == 'azure-ai-inference'", specifier = "~=1.0.0b9" },
{ name = "boto3", marker = "extra == 'aws'", specifier = "~=1.40.38" },