diff --git a/lib/crewai/pyproject.toml b/lib/crewai/pyproject.toml index bd01dbf9a..4a21e1fa6 100644 --- a/lib/crewai/pyproject.toml +++ b/lib/crewai/pyproject.toml @@ -90,7 +90,7 @@ azure-ai-inference = [ "azure-ai-inference~=1.0.0b9", ] anthropic = [ - "anthropic~=0.71.0", + "anthropic~=0.73.0", ] a2a = [ "a2a-sdk~=0.3.10", diff --git a/lib/crewai/src/crewai/llms/providers/anthropic/completion.py b/lib/crewai/src/crewai/llms/providers/anthropic/completion.py index 1d626a7f4..e98249567 100644 --- a/lib/crewai/src/crewai/llms/providers/anthropic/completion.py +++ b/lib/crewai/src/crewai/llms/providers/anthropic/completion.py @@ -3,9 +3,8 @@ from __future__ import annotations import json import logging import os -from typing import TYPE_CHECKING, Any, Literal, cast +from typing import TYPE_CHECKING, Any, Final, Literal, TypeGuard, cast -from anthropic.types import ThinkingBlock from pydantic import BaseModel from crewai.events.types.llm_events import LLMCallType @@ -22,8 +21,9 @@ if TYPE_CHECKING: from crewai.llms.hooks.base import BaseInterceptor try: - from anthropic import Anthropic, AsyncAnthropic + from anthropic import Anthropic, AsyncAnthropic, transform_schema from anthropic.types import Message, TextBlock, ThinkingBlock, ToolUseBlock + from anthropic.types.beta import BetaMessage import httpx except ImportError: raise ImportError( @@ -31,7 +31,62 @@ except ImportError: ) from None -ANTHROPIC_FILES_API_BETA = "files-api-2025-04-14" +ANTHROPIC_FILES_API_BETA: Final = "files-api-2025-04-14" +ANTHROPIC_STRUCTURED_OUTPUTS_BETA: Final = "structured-outputs-2025-11-13" + +NATIVE_STRUCTURED_OUTPUT_MODELS: Final[ + tuple[ + Literal["claude-sonnet-4-5"], + Literal["claude-sonnet-4.5"], + Literal["claude-opus-4-5"], + Literal["claude-opus-4.5"], + Literal["claude-opus-4-1"], + Literal["claude-opus-4.1"], + Literal["claude-haiku-4-5"], + Literal["claude-haiku-4.5"], + ] +] = ( + "claude-sonnet-4-5", + "claude-sonnet-4.5", + "claude-opus-4-5", + "claude-opus-4.5", + "claude-opus-4-1", + "claude-opus-4.1", + "claude-haiku-4-5", + "claude-haiku-4.5", +) + + +def _supports_native_structured_outputs(model: str) -> bool: + """Check if the model supports native structured outputs. + + Native structured outputs are only available for Claude 4.5 models + (Sonnet 4.5, Opus 4.5, Opus 4.1, Haiku 4.5). + Other models require the tool-based fallback approach. + + Args: + model: The model name/identifier. + + Returns: + True if the model supports native structured outputs. + """ + model_lower = model.lower() + return any(prefix in model_lower for prefix in NATIVE_STRUCTURED_OUTPUT_MODELS) + + +def _is_pydantic_model_class(obj: Any) -> TypeGuard[type[BaseModel]]: + """Check if an object is a Pydantic model class. + + This distinguishes between Pydantic model classes that support structured + outputs (have model_json_schema) and plain dicts like {"type": "json_object"}. + + Args: + obj: The object to check. + + Returns: + True if obj is a Pydantic model class. + """ + return isinstance(obj, type) and issubclass(obj, BaseModel) def _contains_file_id_reference(messages: list[dict[str, Any]]) -> bool: @@ -84,6 +139,7 @@ class AnthropicCompletion(BaseLLM): client_params: dict[str, Any] | None = None, interceptor: BaseInterceptor[httpx.Request, httpx.Response] | None = None, thinking: AnthropicThinkingConfig | None = None, + response_format: type[BaseModel] | None = None, **kwargs: Any, ): """Initialize Anthropic chat completion client. @@ -101,6 +157,8 @@ class AnthropicCompletion(BaseLLM): stream: Enable streaming responses client_params: Additional parameters for the Anthropic client interceptor: HTTP interceptor for modifying requests/responses at transport level. + response_format: Pydantic model for structured output. When provided, responses + will be validated against this model schema. **kwargs: Additional parameters """ super().__init__( @@ -131,6 +189,7 @@ class AnthropicCompletion(BaseLLM): self.stop_sequences = stop_sequences or [] self.thinking = thinking self.previous_thinking_blocks: list[ThinkingBlock] = [] + self.response_format = response_format # Model-specific settings self.is_claude_3 = "claude-3" in model.lower() self.supports_tools = True @@ -231,6 +290,8 @@ class AnthropicCompletion(BaseLLM): formatted_messages, system_message, tools ) + effective_response_model = response_model or self.response_format + # Handle streaming vs non-streaming if self.stream: return self._handle_streaming_completion( @@ -238,7 +299,7 @@ class AnthropicCompletion(BaseLLM): available_functions, from_task, from_agent, - response_model, + effective_response_model, ) return self._handle_completion( @@ -246,7 +307,7 @@ class AnthropicCompletion(BaseLLM): available_functions, from_task, from_agent, - response_model, + effective_response_model, ) except Exception as e: @@ -298,13 +359,15 @@ class AnthropicCompletion(BaseLLM): formatted_messages, system_message, tools ) + effective_response_model = response_model or self.response_format + if self.stream: return await self._ahandle_streaming_completion( completion_params, available_functions, from_task, from_agent, - response_model, + effective_response_model, ) return await self._ahandle_completion( @@ -312,7 +375,7 @@ class AnthropicCompletion(BaseLLM): available_functions, from_task, from_agent, - response_model, + effective_response_model, ) except Exception as e: @@ -565,22 +628,40 @@ class AnthropicCompletion(BaseLLM): response_model: type[BaseModel] | None = None, ) -> str | Any: """Handle non-streaming message completion.""" - if response_model: - structured_tool = { - "name": "structured_output", - "description": "Returns structured data according to the schema", - "input_schema": response_model.model_json_schema(), - } - - params["tools"] = [structured_tool] - params["tool_choice"] = {"type": "tool", "name": "structured_output"} - uses_file_api = _contains_file_id_reference(params.get("messages", [])) + betas: list[str] = [] + use_native_structured_output = False + + if uses_file_api: + betas.append(ANTHROPIC_FILES_API_BETA) + + extra_body: dict[str, Any] | None = None + if _is_pydantic_model_class(response_model): + schema = transform_schema(response_model.model_json_schema()) + if _supports_native_structured_outputs(self.model): + use_native_structured_output = True + betas.append(ANTHROPIC_STRUCTURED_OUTPUTS_BETA) + extra_body = { + "output_format": { + "type": "json_schema", + "schema": schema, + } + } + else: + structured_tool = { + "name": "structured_output", + "description": "Output the structured response", + "input_schema": schema, + } + params["tools"] = [structured_tool] + params["tool_choice"] = {"type": "tool", "name": "structured_output"} try: - if uses_file_api: - params["betas"] = [ANTHROPIC_FILES_API_BETA] - response = self.client.beta.messages.create(**params) + if betas: + params["betas"] = betas + response = self.client.beta.messages.create( + **params, extra_body=extra_body + ) else: response = self.client.messages.create(**params) @@ -593,22 +674,34 @@ class AnthropicCompletion(BaseLLM): usage = self._extract_anthropic_token_usage(response) self._track_token_usage_internal(usage) - if response_model and response.content: - tool_uses = [ - block for block in response.content if isinstance(block, ToolUseBlock) - ] - if tool_uses and tool_uses[0].name == "structured_output": - structured_data = tool_uses[0].input - structured_json = json.dumps(structured_data) - self._emit_call_completed_event( - response=structured_json, - call_type=LLMCallType.LLM_CALL, - from_task=from_task, - from_agent=from_agent, - messages=params["messages"], - ) - - return structured_json + if _is_pydantic_model_class(response_model) and response.content: + if use_native_structured_output: + for block in response.content: + if isinstance(block, TextBlock): + structured_json = block.text + self._emit_call_completed_event( + response=structured_json, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=params["messages"], + ) + return structured_json + else: + for block in response.content: + if ( + isinstance(block, ToolUseBlock) + and block.name == "structured_output" + ): + structured_json = json.dumps(block.input) + self._emit_call_completed_event( + response=structured_json, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=params["messages"], + ) + return structured_json # Check if Claude wants to use tools if response.content: @@ -678,17 +771,31 @@ class AnthropicCompletion(BaseLLM): from_task: Any | None = None, from_agent: Any | None = None, response_model: type[BaseModel] | None = None, - ) -> str: + ) -> str | Any: """Handle streaming message completion.""" - if response_model: - structured_tool = { - "name": "structured_output", - "description": "Returns structured data according to the schema", - "input_schema": response_model.model_json_schema(), - } + betas: list[str] = [] + use_native_structured_output = False - params["tools"] = [structured_tool] - params["tool_choice"] = {"type": "tool", "name": "structured_output"} + extra_body: dict[str, Any] | None = None + if _is_pydantic_model_class(response_model): + schema = transform_schema(response_model.model_json_schema()) + if _supports_native_structured_outputs(self.model): + use_native_structured_output = True + betas.append(ANTHROPIC_STRUCTURED_OUTPUTS_BETA) + extra_body = { + "output_format": { + "type": "json_schema", + "schema": schema, + } + } + else: + structured_tool = { + "name": "structured_output", + "description": "Output the structured response", + "input_schema": schema, + } + params["tools"] = [structured_tool] + params["tool_choice"] = {"type": "tool", "name": "structured_output"} full_response = "" @@ -696,15 +803,22 @@ class AnthropicCompletion(BaseLLM): # (the SDK sets it internally) stream_params = {k: v for k, v in params.items() if k != "stream"} + if betas: + stream_params["betas"] = betas + current_tool_calls: dict[int, dict[str, Any]] = {} - # Make streaming API call - with self.client.messages.stream(**stream_params) as stream: + stream_context = ( + self.client.beta.messages.stream(**stream_params, extra_body=extra_body) + if betas + else self.client.messages.stream(**stream_params) + ) + with stream_context as stream: response_id = None for event in stream: if hasattr(event, "message") and hasattr(event.message, "id"): response_id = event.message.id - + if hasattr(event, "delta") and hasattr(event.delta, "text"): text_delta = event.delta.text full_response += text_delta @@ -712,7 +826,7 @@ class AnthropicCompletion(BaseLLM): chunk=text_delta, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if event.type == "content_block_start": @@ -739,7 +853,7 @@ class AnthropicCompletion(BaseLLM): "index": block_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) elif event.type == "content_block_delta": if event.delta.type == "input_json_delta": @@ -763,10 +877,10 @@ class AnthropicCompletion(BaseLLM): "index": block_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) - final_message: Message = stream.get_final_message() + final_message = stream.get_final_message() thinking_blocks: list[ThinkingBlock] = [] if final_message.content: @@ -781,25 +895,30 @@ class AnthropicCompletion(BaseLLM): usage = self._extract_anthropic_token_usage(final_message) self._track_token_usage_internal(usage) - if response_model and final_message.content: - tool_uses = [ - block - for block in final_message.content - if isinstance(block, ToolUseBlock) - ] - if tool_uses and tool_uses[0].name == "structured_output": - structured_data = tool_uses[0].input - structured_json = json.dumps(structured_data) - + if _is_pydantic_model_class(response_model): + if use_native_structured_output: self._emit_call_completed_event( - response=structured_json, + response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"], ) - - return structured_json + return full_response + for block in final_message.content: + if ( + isinstance(block, ToolUseBlock) + and block.name == "structured_output" + ): + structured_json = json.dumps(block.input) + self._emit_call_completed_event( + response=structured_json, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=params["messages"], + ) + return structured_json if final_message.content: tool_uses = [ @@ -809,11 +928,9 @@ class AnthropicCompletion(BaseLLM): ] if tool_uses: - # If no available_functions, return tool calls for executor to handle if not available_functions: return list(tool_uses) - # Handle tool use conversation flow internally return self._handle_tool_use_conversation( final_message, tool_uses, @@ -823,10 +940,8 @@ class AnthropicCompletion(BaseLLM): from_agent, ) - # Apply stop words to full response full_response = self._apply_stop_words(full_response) - # Emit completion event and return full response self._emit_call_completed_event( response=full_response, call_type=LLMCallType.LLM_CALL, @@ -884,7 +999,7 @@ class AnthropicCompletion(BaseLLM): def _handle_tool_use_conversation( self, - initial_response: Message, + initial_response: Message | BetaMessage, tool_uses: list[ToolUseBlock], params: dict[str, Any], available_functions: dict[str, Any], @@ -1002,22 +1117,40 @@ class AnthropicCompletion(BaseLLM): response_model: type[BaseModel] | None = None, ) -> str | Any: """Handle non-streaming async message completion.""" - if response_model: - structured_tool = { - "name": "structured_output", - "description": "Returns structured data according to the schema", - "input_schema": response_model.model_json_schema(), - } - - params["tools"] = [structured_tool] - params["tool_choice"] = {"type": "tool", "name": "structured_output"} - uses_file_api = _contains_file_id_reference(params.get("messages", [])) + betas: list[str] = [] + use_native_structured_output = False + + if uses_file_api: + betas.append(ANTHROPIC_FILES_API_BETA) + + extra_body: dict[str, Any] | None = None + if _is_pydantic_model_class(response_model): + schema = transform_schema(response_model.model_json_schema()) + if _supports_native_structured_outputs(self.model): + use_native_structured_output = True + betas.append(ANTHROPIC_STRUCTURED_OUTPUTS_BETA) + extra_body = { + "output_format": { + "type": "json_schema", + "schema": schema, + } + } + else: + structured_tool = { + "name": "structured_output", + "description": "Output the structured response", + "input_schema": schema, + } + params["tools"] = [structured_tool] + params["tool_choice"] = {"type": "tool", "name": "structured_output"} try: - if uses_file_api: - params["betas"] = [ANTHROPIC_FILES_API_BETA] - response = await self.async_client.beta.messages.create(**params) + if betas: + params["betas"] = betas + response = await self.async_client.beta.messages.create( + **params, extra_body=extra_body + ) else: response = await self.async_client.messages.create(**params) @@ -1030,23 +1163,34 @@ class AnthropicCompletion(BaseLLM): usage = self._extract_anthropic_token_usage(response) self._track_token_usage_internal(usage) - if response_model and response.content: - tool_uses = [ - block for block in response.content if isinstance(block, ToolUseBlock) - ] - if tool_uses and tool_uses[0].name == "structured_output": - structured_data = tool_uses[0].input - structured_json = json.dumps(structured_data) - - self._emit_call_completed_event( - response=structured_json, - call_type=LLMCallType.LLM_CALL, - from_task=from_task, - from_agent=from_agent, - messages=params["messages"], - ) - - return structured_json + if _is_pydantic_model_class(response_model) and response.content: + if use_native_structured_output: + for block in response.content: + if isinstance(block, TextBlock): + structured_json = block.text + self._emit_call_completed_event( + response=structured_json, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=params["messages"], + ) + return structured_json + else: + for block in response.content: + if ( + isinstance(block, ToolUseBlock) + and block.name == "structured_output" + ): + structured_json = json.dumps(block.input) + self._emit_call_completed_event( + response=structured_json, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=params["messages"], + ) + return structured_json if response.content: tool_uses = [ @@ -1102,25 +1246,49 @@ class AnthropicCompletion(BaseLLM): from_task: Any | None = None, from_agent: Any | None = None, response_model: type[BaseModel] | None = None, - ) -> str: + ) -> str | Any: """Handle async streaming message completion.""" - if response_model: - structured_tool = { - "name": "structured_output", - "description": "Returns structured data according to the schema", - "input_schema": response_model.model_json_schema(), - } + betas: list[str] = [] + use_native_structured_output = False - params["tools"] = [structured_tool] - params["tool_choice"] = {"type": "tool", "name": "structured_output"} + extra_body: dict[str, Any] | None = None + if _is_pydantic_model_class(response_model): + schema = transform_schema(response_model.model_json_schema()) + if _supports_native_structured_outputs(self.model): + use_native_structured_output = True + betas.append(ANTHROPIC_STRUCTURED_OUTPUTS_BETA) + extra_body = { + "output_format": { + "type": "json_schema", + "schema": schema, + } + } + else: + structured_tool = { + "name": "structured_output", + "description": "Output the structured response", + "input_schema": schema, + } + params["tools"] = [structured_tool] + params["tool_choice"] = {"type": "tool", "name": "structured_output"} full_response = "" stream_params = {k: v for k, v in params.items() if k != "stream"} + if betas: + stream_params["betas"] = betas + current_tool_calls: dict[int, dict[str, Any]] = {} - async with self.async_client.messages.stream(**stream_params) as stream: + stream_context = ( + self.async_client.beta.messages.stream( + **stream_params, extra_body=extra_body + ) + if betas + else self.async_client.messages.stream(**stream_params) + ) + async with stream_context as stream: response_id = None async for event in stream: if hasattr(event, "message") and hasattr(event.message, "id"): @@ -1133,7 +1301,7 @@ class AnthropicCompletion(BaseLLM): chunk=text_delta, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if event.type == "content_block_start": @@ -1160,7 +1328,7 @@ class AnthropicCompletion(BaseLLM): "index": block_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) elif event.type == "content_block_delta": if event.delta.type == "input_json_delta": @@ -1184,33 +1352,38 @@ class AnthropicCompletion(BaseLLM): "index": block_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) - final_message: Message = await stream.get_final_message() + final_message = await stream.get_final_message() usage = self._extract_anthropic_token_usage(final_message) self._track_token_usage_internal(usage) - if response_model and final_message.content: - tool_uses = [ - block - for block in final_message.content - if isinstance(block, ToolUseBlock) - ] - if tool_uses and tool_uses[0].name == "structured_output": - structured_data = tool_uses[0].input - structured_json = json.dumps(structured_data) - + if _is_pydantic_model_class(response_model): + if use_native_structured_output: self._emit_call_completed_event( - response=structured_json, + response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"], ) - - return structured_json + return full_response + for block in final_message.content: + if ( + isinstance(block, ToolUseBlock) + and block.name == "structured_output" + ): + structured_json = json.dumps(block.input) + self._emit_call_completed_event( + response=structured_json, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=params["messages"], + ) + return structured_json if final_message.content: tool_uses = [ @@ -1220,7 +1393,6 @@ class AnthropicCompletion(BaseLLM): ] if tool_uses: - # If no available_functions, return tool calls for executor to handle if not available_functions: return list(tool_uses) @@ -1247,7 +1419,7 @@ class AnthropicCompletion(BaseLLM): async def _ahandle_tool_use_conversation( self, - initial_response: Message, + initial_response: Message | BetaMessage, tool_uses: list[ToolUseBlock], params: dict[str, Any], available_functions: dict[str, Any], @@ -1356,7 +1528,9 @@ class AnthropicCompletion(BaseLLM): return int(200000 * CONTEXT_WINDOW_USAGE_RATIO) @staticmethod - def _extract_anthropic_token_usage(response: Message) -> dict[str, Any]: + def _extract_anthropic_token_usage( + response: Message | BetaMessage, + ) -> dict[str, Any]: """Extract token usage from Anthropic response.""" if hasattr(response, "usage") and response.usage: usage = response.usage diff --git a/lib/crewai/src/crewai/llms/providers/azure/completion.py b/lib/crewai/src/crewai/llms/providers/azure/completion.py index a3aed7f4b..1de18d984 100644 --- a/lib/crewai/src/crewai/llms/providers/azure/completion.py +++ b/lib/crewai/src/crewai/llms/providers/azure/completion.py @@ -92,6 +92,7 @@ class AzureCompletion(BaseLLM): stop: list[str] | None = None, stream: bool = False, interceptor: BaseInterceptor[Any, Any] | None = None, + response_format: type[BaseModel] | None = None, **kwargs: Any, ): """Initialize Azure AI Inference chat completion client. @@ -111,6 +112,9 @@ class AzureCompletion(BaseLLM): stop: Stop sequences stream: Enable streaming responses interceptor: HTTP interceptor (not yet supported for Azure). + response_format: Pydantic model for structured output. Used as default when + response_model is not passed to call()/acall() methods. + Only works with OpenAI models deployed on Azure. **kwargs: Additional parameters """ if interceptor is not None: @@ -165,6 +169,7 @@ class AzureCompletion(BaseLLM): self.presence_penalty = presence_penalty self.max_tokens = max_tokens self.stream = stream + self.response_format = response_format self.is_openai_model = any( prefix in model.lower() for prefix in ["gpt-", "o1-", "text-"] @@ -298,6 +303,7 @@ class AzureCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, ) + effective_response_model = response_model or self.response_format # Format messages for Azure formatted_messages = self._format_messages_for_azure(messages) @@ -307,7 +313,7 @@ class AzureCompletion(BaseLLM): # Prepare completion parameters completion_params = self._prepare_completion_params( - formatted_messages, tools, response_model + formatted_messages, tools, effective_response_model ) # Handle streaming vs non-streaming @@ -317,7 +323,7 @@ class AzureCompletion(BaseLLM): available_functions, from_task, from_agent, - response_model, + effective_response_model, ) return self._handle_completion( @@ -325,7 +331,7 @@ class AzureCompletion(BaseLLM): available_functions, from_task, from_agent, - response_model, + effective_response_model, ) except Exception as e: @@ -364,11 +370,12 @@ class AzureCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, ) + effective_response_model = response_model or self.response_format formatted_messages = self._format_messages_for_azure(messages) completion_params = self._prepare_completion_params( - formatted_messages, tools, response_model + formatted_messages, tools, effective_response_model ) if self.stream: @@ -377,7 +384,7 @@ class AzureCompletion(BaseLLM): available_functions, from_task, from_agent, - response_model, + effective_response_model, ) return await self._ahandle_completion( @@ -385,7 +392,7 @@ class AzureCompletion(BaseLLM): available_functions, from_task, from_agent, - response_model, + effective_response_model, ) except Exception as e: @@ -726,7 +733,7 @@ class AzureCompletion(BaseLLM): """ if update.choices: choice = update.choices[0] - response_id = update.id if hasattr(update,"id") else None + response_id = update.id if hasattr(update, "id") else None if choice.delta and choice.delta.content: content_delta = choice.delta.content full_response += content_delta @@ -734,7 +741,7 @@ class AzureCompletion(BaseLLM): chunk=content_delta, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if choice.delta and choice.delta.tool_calls: @@ -769,7 +776,7 @@ class AzureCompletion(BaseLLM): "index": idx, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) return full_response diff --git a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py index 73fefc478..2ab638f11 100644 --- a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py +++ b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py @@ -172,6 +172,7 @@ class BedrockCompletion(BaseLLM): additional_model_request_fields: dict[str, Any] | None = None, additional_model_response_field_paths: list[str] | None = None, interceptor: BaseInterceptor[Any, Any] | None = None, + response_format: type[BaseModel] | None = None, **kwargs: Any, ) -> None: """Initialize AWS Bedrock completion client. @@ -192,6 +193,8 @@ class BedrockCompletion(BaseLLM): additional_model_request_fields: Model-specific request parameters additional_model_response_field_paths: Custom response field paths interceptor: HTTP interceptor (not yet supported for Bedrock). + response_format: Pydantic model for structured output. Used as default when + response_model is not passed to call()/acall() methods. **kwargs: Additional parameters """ if interceptor is not None: @@ -248,6 +251,7 @@ class BedrockCompletion(BaseLLM): self.top_k = top_k self.stream = stream self.stop_sequences = stop_sequences + self.response_format = response_format # Store advanced features (optional) self.guardrail_config = guardrail_config @@ -299,6 +303,8 @@ class BedrockCompletion(BaseLLM): response_model: type[BaseModel] | None = None, ) -> str | Any: """Call AWS Bedrock Converse API.""" + effective_response_model = response_model or self.response_format + try: # Emit call started event self._emit_call_started_event( @@ -375,6 +381,7 @@ class BedrockCompletion(BaseLLM): available_functions, from_task, from_agent, + effective_response_model, ) return self._handle_converse( @@ -383,6 +390,7 @@ class BedrockCompletion(BaseLLM): available_functions, from_task, from_agent, + effective_response_model, ) except Exception as e: @@ -425,6 +433,8 @@ class BedrockCompletion(BaseLLM): NotImplementedError: If aiobotocore is not installed. LLMContextLengthExceededError: If context window is exceeded. """ + effective_response_model = response_model or self.response_format + if not AIOBOTOCORE_AVAILABLE: raise NotImplementedError( "Async support for AWS Bedrock requires aiobotocore. " @@ -494,11 +504,21 @@ class BedrockCompletion(BaseLLM): if self.stream: return await self._ahandle_streaming_converse( - formatted_messages, body, available_functions, from_task, from_agent + formatted_messages, + body, + available_functions, + from_task, + from_agent, + effective_response_model, ) return await self._ahandle_converse( - formatted_messages, body, available_functions, from_task, from_agent + formatted_messages, + body, + available_functions, + from_task, + from_agent, + effective_response_model, ) except Exception as e: @@ -520,10 +540,29 @@ class BedrockCompletion(BaseLLM): available_functions: Mapping[str, Any] | None = None, from_task: Any | None = None, from_agent: Any | None = None, - ) -> str: + response_model: type[BaseModel] | None = None, + ) -> str | Any: """Handle non-streaming converse API call following AWS best practices.""" + if response_model: + structured_tool: ConverseToolTypeDef = { + "toolSpec": { + "name": "structured_output", + "description": "Returns structured data according to the schema", + "inputSchema": {"json": response_model.model_json_schema()}, + } + } + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast( + object, + { + "tools": [structured_tool], + "toolChoice": {"tool": {"name": "structured_output"}}, + }, + ), + ) + try: - # Validate messages format before API call if not messages: raise ValueError("Messages cannot be empty") @@ -571,6 +610,21 @@ class BedrockCompletion(BaseLLM): # If there are tool uses but no available_functions, return them for the executor to handle tool_uses = [block["toolUse"] for block in content if "toolUse" in block] + + if response_model and tool_uses: + for tool_use in tool_uses: + if tool_use.get("name") == "structured_output": + structured_data = tool_use.get("input", {}) + result = response_model.model_validate(structured_data) + self._emit_call_completed_event( + response=result.model_dump_json(), + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=messages, + ) + return result + if tool_uses and not available_functions: self._emit_call_completed_event( response=tool_uses, @@ -717,8 +771,28 @@ class BedrockCompletion(BaseLLM): available_functions: dict[str, Any] | None = None, from_task: Any | None = None, from_agent: Any | None = None, + response_model: type[BaseModel] | None = None, ) -> str: """Handle streaming converse API call with comprehensive event handling.""" + if response_model: + structured_tool: ConverseToolTypeDef = { + "toolSpec": { + "name": "structured_output", + "description": "Returns structured data according to the schema", + "inputSchema": {"json": response_model.model_json_schema()}, + } + } + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast( + object, + { + "tools": [structured_tool], + "toolChoice": {"tool": {"name": "structured_output"}}, + }, + ), + ) + full_response = "" current_tool_use: dict[str, Any] | None = None tool_use_id: str | None = None @@ -805,7 +879,7 @@ class BedrockCompletion(BaseLLM): "index": tool_use_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) elif "contentBlockStop" in event: logging.debug("Content block stopped in stream") @@ -929,8 +1003,28 @@ class BedrockCompletion(BaseLLM): available_functions: Mapping[str, Any] | None = None, from_task: Any | None = None, from_agent: Any | None = None, - ) -> str: + response_model: type[BaseModel] | None = None, + ) -> str | Any: """Handle async non-streaming converse API call.""" + if response_model: + structured_tool: ConverseToolTypeDef = { + "toolSpec": { + "name": "structured_output", + "description": "Returns structured data according to the schema", + "inputSchema": {"json": response_model.model_json_schema()}, + } + } + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast( + object, + { + "tools": [structured_tool], + "toolChoice": {"tool": {"name": "structured_output"}}, + }, + ), + ) + try: if not messages: raise ValueError("Messages cannot be empty") @@ -976,6 +1070,21 @@ class BedrockCompletion(BaseLLM): # If there are tool uses but no available_functions, return them for the executor to handle tool_uses = [block["toolUse"] for block in content if "toolUse" in block] + + if response_model and tool_uses: + for tool_use in tool_uses: + if tool_use.get("name") == "structured_output": + structured_data = tool_use.get("input", {}) + result = response_model.model_validate(structured_data) + self._emit_call_completed_event( + response=result.model_dump_json(), + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=messages, + ) + return result + if tool_uses and not available_functions: self._emit_call_completed_event( response=tool_uses, @@ -1106,8 +1215,28 @@ class BedrockCompletion(BaseLLM): available_functions: dict[str, Any] | None = None, from_task: Any | None = None, from_agent: Any | None = None, + response_model: type[BaseModel] | None = None, ) -> str: """Handle async streaming converse API call.""" + if response_model: + structured_tool: ConverseToolTypeDef = { + "toolSpec": { + "name": "structured_output", + "description": "Returns structured data according to the schema", + "inputSchema": {"json": response_model.model_json_schema()}, + } + } + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast( + object, + { + "tools": [structured_tool], + "toolChoice": {"tool": {"name": "structured_output"}}, + }, + ), + ) + full_response = "" current_tool_use: dict[str, Any] | None = None tool_use_id: str | None = None @@ -1174,7 +1303,7 @@ class BedrockCompletion(BaseLLM): chunk=text_chunk, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) elif "toolUse" in delta and current_tool_use: tool_input = delta["toolUse"].get("input", "") diff --git a/lib/crewai/src/crewai/llms/providers/gemini/completion.py b/lib/crewai/src/crewai/llms/providers/gemini/completion.py index 9687f3d4f..d101ad0be 100644 --- a/lib/crewai/src/crewai/llms/providers/gemini/completion.py +++ b/lib/crewai/src/crewai/llms/providers/gemini/completion.py @@ -56,6 +56,7 @@ class GeminiCompletion(BaseLLM): client_params: dict[str, Any] | None = None, interceptor: BaseInterceptor[Any, Any] | None = None, use_vertexai: bool | None = None, + response_format: type[BaseModel] | None = None, **kwargs: Any, ): """Initialize Google Gemini chat completion client. @@ -86,6 +87,8 @@ class GeminiCompletion(BaseLLM): - None (default): Check GOOGLE_GENAI_USE_VERTEXAI env var When using Vertex AI with API key (Express mode), http_options with api_version="v1" is automatically configured. + response_format: Pydantic model for structured output. Used as default when + response_model is not passed to call()/acall() methods. **kwargs: Additional parameters """ if interceptor is not None: @@ -121,6 +124,7 @@ class GeminiCompletion(BaseLLM): self.safety_settings = safety_settings or {} self.stop_sequences = stop_sequences or [] self.tools: list[dict[str, Any]] | None = None + self.response_format = response_format # Model-specific settings version_match = re.search(r"gemini-(\d+(?:\.\d+)?)", model.lower()) @@ -292,6 +296,7 @@ class GeminiCompletion(BaseLLM): from_agent=from_agent, ) self.tools = tools + effective_response_model = response_model or self.response_format formatted_content, system_instruction = self._format_messages_for_gemini( messages @@ -303,7 +308,7 @@ class GeminiCompletion(BaseLLM): raise ValueError("LLM call blocked by before_llm_call hook") config = self._prepare_generation_config( - system_instruction, tools, response_model + system_instruction, tools, effective_response_model ) if self.stream: @@ -313,7 +318,7 @@ class GeminiCompletion(BaseLLM): available_functions, from_task, from_agent, - response_model, + effective_response_model, ) return self._handle_completion( @@ -322,7 +327,7 @@ class GeminiCompletion(BaseLLM): available_functions, from_task, from_agent, - response_model, + effective_response_model, ) except APIError as e: @@ -374,13 +379,14 @@ class GeminiCompletion(BaseLLM): from_agent=from_agent, ) self.tools = tools + effective_response_model = response_model or self.response_format formatted_content, system_instruction = self._format_messages_for_gemini( messages ) config = self._prepare_generation_config( - system_instruction, tools, response_model + system_instruction, tools, effective_response_model ) if self.stream: @@ -390,7 +396,7 @@ class GeminiCompletion(BaseLLM): available_functions, from_task, from_agent, - response_model, + effective_response_model, ) return await self._ahandle_completion( @@ -399,7 +405,7 @@ class GeminiCompletion(BaseLLM): available_functions, from_task, from_agent, - response_model, + effective_response_model, ) except APIError as e: @@ -570,10 +576,10 @@ class GeminiCompletion(BaseLLM): types.Content(role="user", parts=[function_response_part]) ) elif role == "assistant" and message.get("tool_calls"): - parts: list[types.Part] = [] + tool_parts: list[types.Part] = [] if text_content: - parts.append(types.Part.from_text(text=text_content)) + tool_parts.append(types.Part.from_text(text=text_content)) tool_calls: list[dict[str, Any]] = message.get("tool_calls") or [] for tool_call in tool_calls: @@ -592,11 +598,11 @@ class GeminiCompletion(BaseLLM): else: func_args = func_args_raw - parts.append( + tool_parts.append( types.Part.from_function_call(name=func_name, args=func_args) ) - contents.append(types.Content(role="model", parts=parts)) + contents.append(types.Content(role="model", parts=tool_parts)) else: # Convert role for Gemini (assistant -> model) gemini_role = "model" if role == "assistant" else "user" @@ -790,7 +796,7 @@ class GeminiCompletion(BaseLLM): Returns: Tuple of (updated full_response, updated function_calls, updated usage_data) """ - response_id=chunk.response_id if hasattr(chunk,"response_id") else None + response_id = chunk.response_id if hasattr(chunk, "response_id") else None if chunk.usage_metadata: usage_data = self._extract_token_usage(chunk) @@ -800,7 +806,7 @@ class GeminiCompletion(BaseLLM): chunk=chunk.text, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if chunk.candidates: @@ -837,7 +843,7 @@ class GeminiCompletion(BaseLLM): "index": call_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) return full_response, function_calls, usage_data @@ -972,7 +978,7 @@ class GeminiCompletion(BaseLLM): from_task: Any | None = None, from_agent: Any | None = None, response_model: type[BaseModel] | None = None, - ) -> str: + ) -> str | Any: """Handle streaming content generation.""" full_response = "" function_calls: dict[int, dict[str, Any]] = {} @@ -1050,7 +1056,7 @@ class GeminiCompletion(BaseLLM): from_task: Any | None = None, from_agent: Any | None = None, response_model: type[BaseModel] | None = None, - ) -> str: + ) -> str | Any: """Handle async streaming content generation.""" full_response = "" function_calls: dict[int, dict[str, Any]] = {} diff --git a/lib/crewai/tests/cassettes/llms/anthropic/test_anthropic_async_with_response_model.yaml b/lib/crewai/tests/cassettes/llms/anthropic/test_anthropic_async_with_response_model.yaml index 34af261c2..03e805b8a 100644 --- a/lib/crewai/tests/cassettes/llms/anthropic/test_anthropic_async_with_response_model.yaml +++ b/lib/crewai/tests/cassettes/llms/anthropic/test_anthropic_async_with_response_model.yaml @@ -1,6 +1,8 @@ interactions: - request: - body: '{"max_tokens":4096,"messages":[{"role":"user","content":"Say hello in French"}],"model":"claude-sonnet-4-0","stream":false,"tool_choice":{"type":"tool","name":"structured_output"},"tools":[{"name":"structured_output","description":"Returns structured data according to the schema","input_schema":{"description":"Response model for greeting test.","properties":{"greeting":{"title":"Greeting","type":"string"},"language":{"title":"Language","type":"string"}},"required":["greeting","language"],"title":"GreetingResponse","type":"object"}}]}' + body: '{"max_tokens":4096,"messages":[{"role":"user","content":"Say hello in French"}],"model":"claude-sonnet-4-0","stream":false,"tool_choice":{"type":"tool","name":"structured_output"},"tools":[{"name":"structured_output","description":"Output + the structured response","input_schema":{"type":"object","description":"Response + model for greeting test.","title":"GreetingResponse","properties":{"greeting":{"type":"string","title":"Greeting"},"language":{"type":"string","title":"Language"}},"additionalProperties":false,"required":["greeting","language"]}}]}' headers: User-Agent: - X-USER-AGENT-XXX @@ -13,7 +15,7 @@ interactions: connection: - keep-alive content-length: - - '539' + - '551' content-type: - application/json host: @@ -29,7 +31,7 @@ interactions: x-stainless-os: - X-STAINLESS-OS-XXX x-stainless-package-version: - - 0.75.0 + - 0.76.0 x-stainless-retry-count: - '0' x-stainless-runtime: @@ -42,7 +44,7 @@ interactions: uri: https://api.anthropic.com/v1/messages response: body: - string: '{"model":"claude-sonnet-4-20250514","id":"msg_01XjvX2nCho1knuucbwwgCpw","type":"message","role":"assistant","content":[{"type":"tool_use","id":"toolu_019rfPRSDmBb7CyCTdGMv5rK","name":"structured_output","input":{"greeting":"Bonjour","language":"French"}}],"stop_reason":"tool_use","stop_sequence":null,"usage":{"input_tokens":432,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":53,"service_tier":"standard"}}' + string: '{"model":"claude-sonnet-4-20250514","id":"msg_01CKTyVmak15L5oQ36mv4sL9","type":"message","role":"assistant","content":[{"type":"tool_use","id":"toolu_0174BYmn6xiSnUwVhFD8S7EW","name":"structured_output","input":{"greeting":"Bonjour","language":"French"}}],"stop_reason":"tool_use","stop_sequence":null,"usage":{"input_tokens":436,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":53,"service_tier":"standard"}}' headers: CF-RAY: - CF-RAY-XXX @@ -51,7 +53,7 @@ interactions: Content-Type: - application/json Date: - - Mon, 01 Dec 2025 11:19:38 GMT + - Mon, 26 Jan 2026 14:59:34 GMT Server: - cloudflare Transfer-Encoding: @@ -82,12 +84,10 @@ interactions: - DYNAMIC request-id: - REQUEST-ID-XXX - retry-after: - - '24' strict-transport-security: - STS-XXX x-envoy-upstream-service-time: - - '2101' + - '968' status: code: 200 message: OK diff --git a/uv.lock b/uv.lock index 36c4c60ba..499d4bd2d 100644 --- a/uv.lock +++ b/uv.lock @@ -310,7 +310,7 @@ wheels = [ [[package]] name = "anthropic" -version = "0.71.1" +version = "0.73.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, @@ -322,9 +322,9 @@ dependencies = [ { name = "sniffio" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/05/4b/19620875841f692fdc35eb58bf0201c8ad8c47b8443fecbf1b225312175b/anthropic-0.71.1.tar.gz", hash = "sha256:a77d156d3e7d318b84681b59823b2dee48a8ac508a3e54e49f0ab0d074e4b0da", size = 493294, upload-time = "2025-10-28T17:28:42.213Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f0/07/f550112c3f5299d02f06580577f602e8a112b1988ad7c98ac1a8f7292d7e/anthropic-0.73.0.tar.gz", hash = "sha256:30f0d7d86390165f86af6ca7c3041f8720bb2e1b0e12a44525c8edfdbd2c5239", size = 425168, upload-time = "2025-11-14T18:47:52.635Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/4b/68/b2f988b13325f9ac9921b1e87f0b7994468014e1b5bd3bdbd2472f5baf45/anthropic-0.71.1-py3-none-any.whl", hash = "sha256:6ca6c579f0899a445faeeed9c0eb97aa4bdb751196262f9ccc96edfc0bb12679", size = 355020, upload-time = "2025-10-28T17:28:40.653Z" }, + { url = "https://files.pythonhosted.org/packages/15/b1/5d4d3f649e151e58dc938cf19c4d0cd19fca9a986879f30fea08a7b17138/anthropic-0.73.0-py3-none-any.whl", hash = "sha256:0d56cd8b3ca3fea9c9b5162868bdfd053fbc189b8b56d4290bd2d427b56db769", size = 367839, upload-time = "2025-11-14T18:47:51.195Z" }, ] [[package]] @@ -1276,7 +1276,7 @@ requires-dist = [ { name = "aiobotocore", marker = "extra == 'aws'", specifier = "~=2.25.2" }, { name = "aiocache", extras = ["memcached", "redis"], marker = "extra == 'a2a'", specifier = "~=0.12.3" }, { name = "aiosqlite", specifier = "~=0.21.0" }, - { name = "anthropic", marker = "extra == 'anthropic'", specifier = "~=0.71.0" }, + { name = "anthropic", marker = "extra == 'anthropic'", specifier = "~=0.73.0" }, { name = "appdirs", specifier = "~=1.4.4" }, { name = "azure-ai-inference", marker = "extra == 'azure-ai-inference'", specifier = "~=1.0.0b9" }, { name = "boto3", marker = "extra == 'aws'", specifier = "~=1.40.38" },