bedrock works

This commit is contained in:
lorenzejay
2026-01-29 16:00:51 -08:00
parent 50660d0dc8
commit 383fcaab9d
2 changed files with 487 additions and 180 deletions

View File

@@ -45,6 +45,78 @@ except ImportError:
'AWS Bedrock native provider not available, to install: uv add "crewai[bedrock]"'
) from None
STRUCTURED_OUTPUT_TOOL_NAME = "structured_output"
def _preprocess_structured_data(
data: dict[str, Any], response_model: type[BaseModel]
) -> dict[str, Any]:
"""Preprocess structured data to handle common LLM output format issues.
Some models (especially Claude on Bedrock) may return array fields as
markdown-formatted strings instead of proper JSON arrays. This function
attempts to convert such strings to arrays before validation.
Args:
data: The raw structured data from the tool response
response_model: The Pydantic model class to validate against
Returns:
Preprocessed data with string-to-array conversions where needed
"""
import re
from typing import get_origin
# Get model field annotations
model_fields = response_model.model_fields
processed_data = dict(data)
for field_name, field_info in model_fields.items():
if field_name not in processed_data:
continue
value = processed_data[field_name]
# Check if the field expects a list type
annotation = field_info.annotation
origin = get_origin(annotation)
# Handle list[X] or List[X] types
is_list_type = origin is list or (
origin is not None and str(origin).startswith("list")
)
if is_list_type and isinstance(value, str):
# Try to parse markdown-style bullet points or numbered lists
lines = value.strip().split("\n")
parsed_items = []
for line in lines:
line = line.strip()
if not line:
continue
# Remove common bullet point prefixes
# Matches: "- item", "* item", "• item", "1. item", "1) item"
cleaned = re.sub(r"^[-*•]\s*", "", line)
cleaned = re.sub(r"^\d+[.)]\s*", "", cleaned)
cleaned = cleaned.strip()
if cleaned:
parsed_items.append(cleaned)
if parsed_items:
processed_data[field_name] = parsed_items
logging.debug(
f"Converted markdown-formatted string to list for field '{field_name}': "
f"{len(parsed_items)} items"
)
return processed_data
try:
from aiobotocore.session import ( # type: ignore[import-untyped]
get_session as get_aiobotocore_session,
@@ -545,27 +617,56 @@ class BedrockCompletion(BaseLLM):
) -> str | Any:
"""Handle non-streaming converse API call following AWS best practices."""
if response_model:
structured_tool: ConverseToolTypeDef = {
"toolSpec": {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"inputSchema": {
"json": generate_model_description(response_model)
.get("json_schema", {})
.get("schema", {})
},
# Check if structured_output tool already exists (from a previous recursive call)
existing_tool_config = body.get("toolConfig")
existing_tools: list[Any] = []
structured_output_already_exists = False
if existing_tool_config:
existing_tools = list(existing_tool_config.get("tools", []))
for tool in existing_tools:
tool_spec = tool.get("toolSpec", {})
if tool_spec.get("name") == STRUCTURED_OUTPUT_TOOL_NAME:
structured_output_already_exists = True
break
if not structured_output_already_exists:
structured_tool: ConverseToolTypeDef = {
"toolSpec": {
"name": STRUCTURED_OUTPUT_TOOL_NAME,
"description": (
"Use this tool to provide your final structured response. "
"Call this tool when you have gathered all necessary information "
"and are ready to provide the final answer in the required format."
),
"inputSchema": {
"json": generate_model_description(response_model)
.get("json_schema", {})
.get("schema", {})
},
}
}
}
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(
object,
{
"tools": [structured_tool],
"toolChoice": {"tool": {"name": "structured_output"}},
},
),
)
if existing_tools:
existing_tools.append(structured_tool)
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": existing_tools}),
)
else:
# No existing tools, use only structured_output with forced toolChoice
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(
object,
{
"tools": [structured_tool],
"toolChoice": {
"tool": {"name": STRUCTURED_OUTPUT_TOOL_NAME}
},
},
),
)
try:
if not messages:
@@ -616,29 +717,46 @@ class BedrockCompletion(BaseLLM):
# If there are tool uses but no available_functions, return them for the executor to handle
tool_uses = [block["toolUse"] for block in content if "toolUse" in block]
# Check for structured_output tool call first
if response_model and tool_uses:
for tool_use in tool_uses:
if tool_use.get("name") == "structured_output":
if tool_use.get("name") == STRUCTURED_OUTPUT_TOOL_NAME:
structured_data = tool_use.get("input", {})
result = response_model.model_validate(structured_data)
self._emit_call_completed_event(
response=result.model_dump_json(),
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages,
structured_data = _preprocess_structured_data(
structured_data, response_model
)
return result
try:
result = response_model.model_validate(structured_data)
self._emit_call_completed_event(
response=result.model_dump_json(),
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages,
)
return result
except Exception as e:
error_msg = (
f"Failed to validate {STRUCTURED_OUTPUT_TOOL_NAME} tool response "
f"with model {response_model.__name__}: {e}"
)
logging.error(error_msg)
raise ValueError(error_msg) from e
if tool_uses and not available_functions:
# Filter out structured_output from tool_uses returned to executor
non_structured_output_tool_uses = [
tu for tu in tool_uses if tu.get("name") != STRUCTURED_OUTPUT_TOOL_NAME
]
if non_structured_output_tool_uses and not available_functions:
self._emit_call_completed_event(
response=tool_uses,
response=non_structured_output_tool_uses,
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages,
)
return tool_uses
return non_structured_output_tool_uses
# Process content blocks and handle tool use correctly
text_content = ""
@@ -655,6 +773,9 @@ class BedrockCompletion(BaseLLM):
function_name = tool_use_block["name"]
function_args = tool_use_block.get("input", {})
if function_name == STRUCTURED_OUTPUT_TOOL_NAME:
continue
logging.debug(
f"Tool use requested: {function_name} with ID {tool_use_id}"
)
@@ -691,7 +812,12 @@ class BedrockCompletion(BaseLLM):
)
return self._handle_converse(
messages, body, available_functions, from_task, from_agent
messages,
body,
available_functions,
from_task,
from_agent,
response_model,
)
# Apply stop sequences if configured
@@ -780,27 +906,58 @@ class BedrockCompletion(BaseLLM):
) -> str:
"""Handle streaming converse API call with comprehensive event handling."""
if response_model:
structured_tool: ConverseToolTypeDef = {
"toolSpec": {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"inputSchema": {
"json": generate_model_description(response_model)
.get("json_schema", {})
.get("schema", {})
},
# Check if structured_output tool already exists (from a previous recursive call)
existing_tool_config = body.get("toolConfig")
existing_tools: list[Any] = []
structured_output_already_exists = False
if existing_tool_config:
existing_tools = list(existing_tool_config.get("tools", []))
# Check if structured_output tool is already in the tools list
for tool in existing_tools:
tool_spec = tool.get("toolSpec", {})
if tool_spec.get("name") == STRUCTURED_OUTPUT_TOOL_NAME:
structured_output_already_exists = True
break
if not structured_output_already_exists:
structured_tool: ConverseToolTypeDef = {
"toolSpec": {
"name": STRUCTURED_OUTPUT_TOOL_NAME,
"description": (
"Use this tool to provide your final structured response. "
"Call this tool when you have gathered all necessary information "
"and are ready to provide the final answer in the required format."
),
"inputSchema": {
"json": generate_model_description(response_model)
.get("json_schema", {})
.get("schema", {})
},
}
}
}
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(
object,
{
"tools": [structured_tool],
"toolChoice": {"tool": {"name": "structured_output"}},
},
),
)
if existing_tools:
# Append structured_output to existing tools, don't force toolChoice
existing_tools.append(structured_tool)
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": existing_tools}),
)
else:
# No existing tools, use only structured_output with forced toolChoice
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(
object,
{
"tools": [structured_tool],
"toolChoice": {
"tool": {"name": STRUCTURED_OUTPUT_TOOL_NAME}
},
},
),
)
full_response = ""
current_tool_use: dict[str, Any] | None = None
@@ -892,47 +1049,79 @@ class BedrockCompletion(BaseLLM):
)
elif "contentBlockStop" in event:
logging.debug("Content block stopped in stream")
if current_tool_use and available_functions:
if current_tool_use:
function_name = current_tool_use["name"]
function_args = cast(
dict[str, Any], current_tool_use.get("input", {})
)
tool_result = self._handle_tool_execution(
function_name=function_name,
function_args=function_args,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
if tool_result is not None and tool_use_id:
messages.append(
{
"role": "assistant",
"content": [{"toolUse": current_tool_use}],
}
# Check if this is the structured_output tool
if (
function_name == STRUCTURED_OUTPUT_TOOL_NAME
and response_model
):
function_args = _preprocess_structured_data(
function_args, response_model
)
messages.append(
{
"role": "user",
"content": [
{
"toolResult": {
"toolUseId": tool_use_id,
"content": [
{"text": str(tool_result)}
],
try:
result = response_model.model_validate(
function_args
)
self._emit_call_completed_event(
response=result.model_dump_json(),
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages,
)
return result # type: ignore[return-value]
except Exception as e:
error_msg = (
f"Failed to validate {STRUCTURED_OUTPUT_TOOL_NAME} tool response "
f"with model {response_model.__name__}: {e}"
)
logging.error(error_msg)
raise ValueError(error_msg) from e
# Handle regular tool execution
if available_functions:
tool_result = self._handle_tool_execution(
function_name=function_name,
function_args=function_args,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
if tool_result is not None and tool_use_id:
messages.append(
{
"role": "assistant",
"content": [{"toolUse": current_tool_use}],
}
)
messages.append(
{
"role": "user",
"content": [
{
"toolResult": {
"toolUseId": tool_use_id,
"content": [
{"text": str(tool_result)}
],
}
}
}
],
}
)
return self._handle_converse(
messages,
body,
available_functions,
from_task,
from_agent,
)
],
}
)
return self._handle_converse(
messages,
body,
available_functions,
from_task,
from_agent,
response_model,
)
current_tool_use = None
tool_use_id = None
elif "messageStop" in event:
@@ -1016,27 +1205,58 @@ class BedrockCompletion(BaseLLM):
) -> str | Any:
"""Handle async non-streaming converse API call."""
if response_model:
structured_tool: ConverseToolTypeDef = {
"toolSpec": {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"inputSchema": {
"json": generate_model_description(response_model)
.get("json_schema", {})
.get("schema", {})
},
# Check if structured_output tool already exists (from a previous recursive call)
existing_tool_config = body.get("toolConfig")
existing_tools: list[Any] = []
structured_output_already_exists = False
if existing_tool_config:
existing_tools = list(existing_tool_config.get("tools", []))
# Check if structured_output tool is already in the tools list
for tool in existing_tools:
tool_spec = tool.get("toolSpec", {})
if tool_spec.get("name") == STRUCTURED_OUTPUT_TOOL_NAME:
structured_output_already_exists = True
break
if not structured_output_already_exists:
structured_tool: ConverseToolTypeDef = {
"toolSpec": {
"name": STRUCTURED_OUTPUT_TOOL_NAME,
"description": (
"Use this tool to provide your final structured response. "
"Call this tool when you have gathered all necessary information "
"and are ready to provide the final answer in the required format."
),
"inputSchema": {
"json": generate_model_description(response_model)
.get("json_schema", {})
.get("schema", {})
},
}
}
}
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(
object,
{
"tools": [structured_tool],
"toolChoice": {"tool": {"name": "structured_output"}},
},
),
)
if existing_tools:
# Append structured_output to existing tools, don't force toolChoice
existing_tools.append(structured_tool)
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": existing_tools}),
)
else:
# No existing tools, use only structured_output with forced toolChoice
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(
object,
{
"tools": [structured_tool],
"toolChoice": {
"tool": {"name": STRUCTURED_OUTPUT_TOOL_NAME}
},
},
),
)
try:
if not messages:
@@ -1084,29 +1304,46 @@ class BedrockCompletion(BaseLLM):
# If there are tool uses but no available_functions, return them for the executor to handle
tool_uses = [block["toolUse"] for block in content if "toolUse" in block]
# Check for structured_output tool call first
if response_model and tool_uses:
for tool_use in tool_uses:
if tool_use.get("name") == "structured_output":
if tool_use.get("name") == STRUCTURED_OUTPUT_TOOL_NAME:
structured_data = tool_use.get("input", {})
result = response_model.model_validate(structured_data)
self._emit_call_completed_event(
response=result.model_dump_json(),
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages,
structured_data = _preprocess_structured_data(
structured_data, response_model
)
return result
try:
result = response_model.model_validate(structured_data)
self._emit_call_completed_event(
response=result.model_dump_json(),
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages,
)
return result
except Exception as e:
error_msg = (
f"Failed to validate {STRUCTURED_OUTPUT_TOOL_NAME} tool response "
f"with model {response_model.__name__}: {e}"
)
logging.error(error_msg)
raise ValueError(error_msg) from e
if tool_uses and not available_functions:
# Filter out structured_output from tool_uses returned to executor
non_structured_output_tool_uses = [
tu for tu in tool_uses if tu.get("name") != STRUCTURED_OUTPUT_TOOL_NAME
]
if non_structured_output_tool_uses and not available_functions:
self._emit_call_completed_event(
response=tool_uses,
response=non_structured_output_tool_uses,
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages,
)
return tool_uses
return non_structured_output_tool_uses
text_content = ""
@@ -1120,6 +1357,10 @@ class BedrockCompletion(BaseLLM):
function_name = tool_use_block["name"]
function_args = tool_use_block.get("input", {})
# Skip structured_output - it's handled above
if function_name == STRUCTURED_OUTPUT_TOOL_NAME:
continue
logging.debug(
f"Tool use requested: {function_name} with ID {tool_use_id}"
)
@@ -1155,7 +1396,12 @@ class BedrockCompletion(BaseLLM):
)
return await self._ahandle_converse(
messages, body, available_functions, from_task, from_agent
messages,
body,
available_functions,
from_task,
from_agent,
response_model,
)
text_content = self._apply_stop_words(text_content)
@@ -1232,27 +1478,58 @@ class BedrockCompletion(BaseLLM):
) -> str:
"""Handle async streaming converse API call."""
if response_model:
structured_tool: ConverseToolTypeDef = {
"toolSpec": {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"inputSchema": {
"json": generate_model_description(response_model)
.get("json_schema", {})
.get("schema", {})
},
# Check if structured_output tool already exists (from a previous recursive call)
existing_tool_config = body.get("toolConfig")
existing_tools: list[Any] = []
structured_output_already_exists = False
if existing_tool_config:
existing_tools = list(existing_tool_config.get("tools", []))
# Check if structured_output tool is already in the tools list
for tool in existing_tools:
tool_spec = tool.get("toolSpec", {})
if tool_spec.get("name") == STRUCTURED_OUTPUT_TOOL_NAME:
structured_output_already_exists = True
break
if not structured_output_already_exists:
structured_tool: ConverseToolTypeDef = {
"toolSpec": {
"name": STRUCTURED_OUTPUT_TOOL_NAME,
"description": (
"Use this tool to provide your final structured response. "
"Call this tool when you have gathered all necessary information "
"and are ready to provide the final answer in the required format."
),
"inputSchema": {
"json": generate_model_description(response_model)
.get("json_schema", {})
.get("schema", {})
},
}
}
}
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(
object,
{
"tools": [structured_tool],
"toolChoice": {"tool": {"name": "structured_output"}},
},
),
)
if existing_tools:
# Append structured_output to existing tools, don't force toolChoice
existing_tools.append(structured_tool)
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": existing_tools}),
)
else:
# No existing tools, use only structured_output with forced toolChoice
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(
object,
{
"tools": [structured_tool],
"toolChoice": {
"tool": {"name": STRUCTURED_OUTPUT_TOOL_NAME}
},
},
),
)
full_response = ""
current_tool_use: dict[str, Any] | None = None
@@ -1346,54 +1623,84 @@ class BedrockCompletion(BaseLLM):
elif "contentBlockStop" in event:
logging.debug("Content block stopped in stream")
if current_tool_use and available_functions:
if current_tool_use:
function_name = current_tool_use["name"]
function_args = cast(
dict[str, Any], current_tool_use.get("input", {})
)
tool_result = self._handle_tool_execution(
function_name=function_name,
function_args=function_args,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Check if this is the structured_output tool
if (
function_name == STRUCTURED_OUTPUT_TOOL_NAME
and response_model
):
function_args = _preprocess_structured_data(
function_args, response_model
)
try:
result = response_model.model_validate(
function_args
)
self._emit_call_completed_event(
response=result.model_dump_json(),
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages,
)
return result # type: ignore[return-value]
except Exception as e:
error_msg = (
f"Failed to validate {STRUCTURED_OUTPUT_TOOL_NAME} tool response "
f"with model {response_model.__name__}: {e}"
)
logging.error(error_msg)
raise ValueError(error_msg) from e
if tool_result is not None and tool_use_id:
messages.append(
{
"role": "assistant",
"content": [{"toolUse": current_tool_use}],
}
# Handle regular tool execution
if available_functions:
tool_result = self._handle_tool_execution(
function_name=function_name,
function_args=function_args,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
messages.append(
{
"role": "user",
"content": [
{
"toolResult": {
"toolUseId": tool_use_id,
"content": [
{"text": str(tool_result)}
],
if tool_result is not None and tool_use_id:
messages.append(
{
"role": "assistant",
"content": [{"toolUse": current_tool_use}],
}
)
messages.append(
{
"role": "user",
"content": [
{
"toolResult": {
"toolUseId": tool_use_id,
"content": [
{"text": str(tool_result)}
],
}
}
}
],
}
)
],
}
)
return await self._ahandle_converse(
messages,
body,
available_functions,
from_task,
from_agent,
)
current_tool_use = None
tool_use_id = None
return await self._ahandle_converse(
messages,
body,
available_functions,
from_task,
from_agent,
response_model,
)
current_tool_use = None
tool_use_id = None
elif "messageStop" in event:
stop_reason = event["messageStop"].get("stopReason")

View File

@@ -925,7 +925,7 @@ def extract_tool_call_info(
)
func_info = tool_call.get("function", {})
func_name = func_info.get("name", "") or tool_call.get("name", "")
func_args = func_info.get("arguments", "{}") or tool_call.get("input", {})
func_args = func_info.get("arguments") or tool_call.get("input") or {}
return call_id, sanitize_tool_name(func_name), func_args
return None