diff --git a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py index 850835ff1..004b076f8 100644 --- a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py +++ b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py @@ -45,6 +45,78 @@ except ImportError: 'AWS Bedrock native provider not available, to install: uv add "crewai[bedrock]"' ) from None + +STRUCTURED_OUTPUT_TOOL_NAME = "structured_output" + + +def _preprocess_structured_data( + data: dict[str, Any], response_model: type[BaseModel] +) -> dict[str, Any]: + """Preprocess structured data to handle common LLM output format issues. + + Some models (especially Claude on Bedrock) may return array fields as + markdown-formatted strings instead of proper JSON arrays. This function + attempts to convert such strings to arrays before validation. + + Args: + data: The raw structured data from the tool response + response_model: The Pydantic model class to validate against + + Returns: + Preprocessed data with string-to-array conversions where needed + """ + import re + from typing import get_origin + + # Get model field annotations + model_fields = response_model.model_fields + + processed_data = dict(data) + + for field_name, field_info in model_fields.items(): + if field_name not in processed_data: + continue + + value = processed_data[field_name] + + # Check if the field expects a list type + annotation = field_info.annotation + origin = get_origin(annotation) + + # Handle list[X] or List[X] types + is_list_type = origin is list or ( + origin is not None and str(origin).startswith("list") + ) + + if is_list_type and isinstance(value, str): + # Try to parse markdown-style bullet points or numbered lists + lines = value.strip().split("\n") + parsed_items = [] + + for line in lines: + line = line.strip() + if not line: + continue + + # Remove common bullet point prefixes + # Matches: "- item", "* item", "• item", "1. item", "1) item" + cleaned = re.sub(r"^[-*•]\s*", "", line) + cleaned = re.sub(r"^\d+[.)]\s*", "", cleaned) + cleaned = cleaned.strip() + + if cleaned: + parsed_items.append(cleaned) + + if parsed_items: + processed_data[field_name] = parsed_items + logging.debug( + f"Converted markdown-formatted string to list for field '{field_name}': " + f"{len(parsed_items)} items" + ) + + return processed_data + + try: from aiobotocore.session import ( # type: ignore[import-untyped] get_session as get_aiobotocore_session, @@ -545,27 +617,56 @@ class BedrockCompletion(BaseLLM): ) -> str | Any: """Handle non-streaming converse API call following AWS best practices.""" if response_model: - structured_tool: ConverseToolTypeDef = { - "toolSpec": { - "name": "structured_output", - "description": "Returns structured data according to the schema", - "inputSchema": { - "json": generate_model_description(response_model) - .get("json_schema", {}) - .get("schema", {}) - }, + # Check if structured_output tool already exists (from a previous recursive call) + existing_tool_config = body.get("toolConfig") + existing_tools: list[Any] = [] + structured_output_already_exists = False + + if existing_tool_config: + existing_tools = list(existing_tool_config.get("tools", [])) + for tool in existing_tools: + tool_spec = tool.get("toolSpec", {}) + if tool_spec.get("name") == STRUCTURED_OUTPUT_TOOL_NAME: + structured_output_already_exists = True + break + + if not structured_output_already_exists: + structured_tool: ConverseToolTypeDef = { + "toolSpec": { + "name": STRUCTURED_OUTPUT_TOOL_NAME, + "description": ( + "Use this tool to provide your final structured response. " + "Call this tool when you have gathered all necessary information " + "and are ready to provide the final answer in the required format." + ), + "inputSchema": { + "json": generate_model_description(response_model) + .get("json_schema", {}) + .get("schema", {}) + }, + } } - } - body["toolConfig"] = cast( - "ToolConfigurationTypeDef", - cast( - object, - { - "tools": [structured_tool], - "toolChoice": {"tool": {"name": "structured_output"}}, - }, - ), - ) + + if existing_tools: + existing_tools.append(structured_tool) + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast(object, {"tools": existing_tools}), + ) + else: + # No existing tools, use only structured_output with forced toolChoice + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast( + object, + { + "tools": [structured_tool], + "toolChoice": { + "tool": {"name": STRUCTURED_OUTPUT_TOOL_NAME} + }, + }, + ), + ) try: if not messages: @@ -616,29 +717,46 @@ class BedrockCompletion(BaseLLM): # If there are tool uses but no available_functions, return them for the executor to handle tool_uses = [block["toolUse"] for block in content if "toolUse" in block] + # Check for structured_output tool call first if response_model and tool_uses: for tool_use in tool_uses: - if tool_use.get("name") == "structured_output": + if tool_use.get("name") == STRUCTURED_OUTPUT_TOOL_NAME: structured_data = tool_use.get("input", {}) - result = response_model.model_validate(structured_data) - self._emit_call_completed_event( - response=result.model_dump_json(), - call_type=LLMCallType.LLM_CALL, - from_task=from_task, - from_agent=from_agent, - messages=messages, + structured_data = _preprocess_structured_data( + structured_data, response_model ) - return result + try: + result = response_model.model_validate(structured_data) + self._emit_call_completed_event( + response=result.model_dump_json(), + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=messages, + ) + return result + except Exception as e: + error_msg = ( + f"Failed to validate {STRUCTURED_OUTPUT_TOOL_NAME} tool response " + f"with model {response_model.__name__}: {e}" + ) + logging.error(error_msg) + raise ValueError(error_msg) from e - if tool_uses and not available_functions: + # Filter out structured_output from tool_uses returned to executor + non_structured_output_tool_uses = [ + tu for tu in tool_uses if tu.get("name") != STRUCTURED_OUTPUT_TOOL_NAME + ] + + if non_structured_output_tool_uses and not available_functions: self._emit_call_completed_event( - response=tool_uses, + response=non_structured_output_tool_uses, call_type=LLMCallType.TOOL_CALL, from_task=from_task, from_agent=from_agent, messages=messages, ) - return tool_uses + return non_structured_output_tool_uses # Process content blocks and handle tool use correctly text_content = "" @@ -655,6 +773,9 @@ class BedrockCompletion(BaseLLM): function_name = tool_use_block["name"] function_args = tool_use_block.get("input", {}) + if function_name == STRUCTURED_OUTPUT_TOOL_NAME: + continue + logging.debug( f"Tool use requested: {function_name} with ID {tool_use_id}" ) @@ -691,7 +812,12 @@ class BedrockCompletion(BaseLLM): ) return self._handle_converse( - messages, body, available_functions, from_task, from_agent + messages, + body, + available_functions, + from_task, + from_agent, + response_model, ) # Apply stop sequences if configured @@ -780,27 +906,58 @@ class BedrockCompletion(BaseLLM): ) -> str: """Handle streaming converse API call with comprehensive event handling.""" if response_model: - structured_tool: ConverseToolTypeDef = { - "toolSpec": { - "name": "structured_output", - "description": "Returns structured data according to the schema", - "inputSchema": { - "json": generate_model_description(response_model) - .get("json_schema", {}) - .get("schema", {}) - }, + # Check if structured_output tool already exists (from a previous recursive call) + existing_tool_config = body.get("toolConfig") + existing_tools: list[Any] = [] + structured_output_already_exists = False + + if existing_tool_config: + existing_tools = list(existing_tool_config.get("tools", [])) + # Check if structured_output tool is already in the tools list + for tool in existing_tools: + tool_spec = tool.get("toolSpec", {}) + if tool_spec.get("name") == STRUCTURED_OUTPUT_TOOL_NAME: + structured_output_already_exists = True + break + + if not structured_output_already_exists: + structured_tool: ConverseToolTypeDef = { + "toolSpec": { + "name": STRUCTURED_OUTPUT_TOOL_NAME, + "description": ( + "Use this tool to provide your final structured response. " + "Call this tool when you have gathered all necessary information " + "and are ready to provide the final answer in the required format." + ), + "inputSchema": { + "json": generate_model_description(response_model) + .get("json_schema", {}) + .get("schema", {}) + }, + } } - } - body["toolConfig"] = cast( - "ToolConfigurationTypeDef", - cast( - object, - { - "tools": [structured_tool], - "toolChoice": {"tool": {"name": "structured_output"}}, - }, - ), - ) + + if existing_tools: + # Append structured_output to existing tools, don't force toolChoice + existing_tools.append(structured_tool) + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast(object, {"tools": existing_tools}), + ) + else: + # No existing tools, use only structured_output with forced toolChoice + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast( + object, + { + "tools": [structured_tool], + "toolChoice": { + "tool": {"name": STRUCTURED_OUTPUT_TOOL_NAME} + }, + }, + ), + ) full_response = "" current_tool_use: dict[str, Any] | None = None @@ -892,47 +1049,79 @@ class BedrockCompletion(BaseLLM): ) elif "contentBlockStop" in event: logging.debug("Content block stopped in stream") - if current_tool_use and available_functions: + if current_tool_use: function_name = current_tool_use["name"] function_args = cast( dict[str, Any], current_tool_use.get("input", {}) ) - tool_result = self._handle_tool_execution( - function_name=function_name, - function_args=function_args, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) - if tool_result is not None and tool_use_id: - messages.append( - { - "role": "assistant", - "content": [{"toolUse": current_tool_use}], - } + + # Check if this is the structured_output tool + if ( + function_name == STRUCTURED_OUTPUT_TOOL_NAME + and response_model + ): + function_args = _preprocess_structured_data( + function_args, response_model ) - messages.append( - { - "role": "user", - "content": [ - { - "toolResult": { - "toolUseId": tool_use_id, - "content": [ - {"text": str(tool_result)} - ], + try: + result = response_model.model_validate( + function_args + ) + self._emit_call_completed_event( + response=result.model_dump_json(), + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=messages, + ) + return result # type: ignore[return-value] + except Exception as e: + error_msg = ( + f"Failed to validate {STRUCTURED_OUTPUT_TOOL_NAME} tool response " + f"with model {response_model.__name__}: {e}" + ) + logging.error(error_msg) + raise ValueError(error_msg) from e + + # Handle regular tool execution + if available_functions: + tool_result = self._handle_tool_execution( + function_name=function_name, + function_args=function_args, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) + if tool_result is not None and tool_use_id: + messages.append( + { + "role": "assistant", + "content": [{"toolUse": current_tool_use}], + } + ) + messages.append( + { + "role": "user", + "content": [ + { + "toolResult": { + "toolUseId": tool_use_id, + "content": [ + {"text": str(tool_result)} + ], + } } - } - ], - } - ) - return self._handle_converse( - messages, - body, - available_functions, - from_task, - from_agent, - ) + ], + } + ) + return self._handle_converse( + messages, + body, + available_functions, + from_task, + from_agent, + response_model, + ) current_tool_use = None tool_use_id = None elif "messageStop" in event: @@ -1016,27 +1205,58 @@ class BedrockCompletion(BaseLLM): ) -> str | Any: """Handle async non-streaming converse API call.""" if response_model: - structured_tool: ConverseToolTypeDef = { - "toolSpec": { - "name": "structured_output", - "description": "Returns structured data according to the schema", - "inputSchema": { - "json": generate_model_description(response_model) - .get("json_schema", {}) - .get("schema", {}) - }, + # Check if structured_output tool already exists (from a previous recursive call) + existing_tool_config = body.get("toolConfig") + existing_tools: list[Any] = [] + structured_output_already_exists = False + + if existing_tool_config: + existing_tools = list(existing_tool_config.get("tools", [])) + # Check if structured_output tool is already in the tools list + for tool in existing_tools: + tool_spec = tool.get("toolSpec", {}) + if tool_spec.get("name") == STRUCTURED_OUTPUT_TOOL_NAME: + structured_output_already_exists = True + break + + if not structured_output_already_exists: + structured_tool: ConverseToolTypeDef = { + "toolSpec": { + "name": STRUCTURED_OUTPUT_TOOL_NAME, + "description": ( + "Use this tool to provide your final structured response. " + "Call this tool when you have gathered all necessary information " + "and are ready to provide the final answer in the required format." + ), + "inputSchema": { + "json": generate_model_description(response_model) + .get("json_schema", {}) + .get("schema", {}) + }, + } } - } - body["toolConfig"] = cast( - "ToolConfigurationTypeDef", - cast( - object, - { - "tools": [structured_tool], - "toolChoice": {"tool": {"name": "structured_output"}}, - }, - ), - ) + + if existing_tools: + # Append structured_output to existing tools, don't force toolChoice + existing_tools.append(structured_tool) + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast(object, {"tools": existing_tools}), + ) + else: + # No existing tools, use only structured_output with forced toolChoice + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast( + object, + { + "tools": [structured_tool], + "toolChoice": { + "tool": {"name": STRUCTURED_OUTPUT_TOOL_NAME} + }, + }, + ), + ) try: if not messages: @@ -1084,29 +1304,46 @@ class BedrockCompletion(BaseLLM): # If there are tool uses but no available_functions, return them for the executor to handle tool_uses = [block["toolUse"] for block in content if "toolUse" in block] + # Check for structured_output tool call first if response_model and tool_uses: for tool_use in tool_uses: - if tool_use.get("name") == "structured_output": + if tool_use.get("name") == STRUCTURED_OUTPUT_TOOL_NAME: structured_data = tool_use.get("input", {}) - result = response_model.model_validate(structured_data) - self._emit_call_completed_event( - response=result.model_dump_json(), - call_type=LLMCallType.LLM_CALL, - from_task=from_task, - from_agent=from_agent, - messages=messages, + structured_data = _preprocess_structured_data( + structured_data, response_model ) - return result + try: + result = response_model.model_validate(structured_data) + self._emit_call_completed_event( + response=result.model_dump_json(), + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=messages, + ) + return result + except Exception as e: + error_msg = ( + f"Failed to validate {STRUCTURED_OUTPUT_TOOL_NAME} tool response " + f"with model {response_model.__name__}: {e}" + ) + logging.error(error_msg) + raise ValueError(error_msg) from e - if tool_uses and not available_functions: + # Filter out structured_output from tool_uses returned to executor + non_structured_output_tool_uses = [ + tu for tu in tool_uses if tu.get("name") != STRUCTURED_OUTPUT_TOOL_NAME + ] + + if non_structured_output_tool_uses and not available_functions: self._emit_call_completed_event( - response=tool_uses, + response=non_structured_output_tool_uses, call_type=LLMCallType.TOOL_CALL, from_task=from_task, from_agent=from_agent, messages=messages, ) - return tool_uses + return non_structured_output_tool_uses text_content = "" @@ -1120,6 +1357,10 @@ class BedrockCompletion(BaseLLM): function_name = tool_use_block["name"] function_args = tool_use_block.get("input", {}) + # Skip structured_output - it's handled above + if function_name == STRUCTURED_OUTPUT_TOOL_NAME: + continue + logging.debug( f"Tool use requested: {function_name} with ID {tool_use_id}" ) @@ -1155,7 +1396,12 @@ class BedrockCompletion(BaseLLM): ) return await self._ahandle_converse( - messages, body, available_functions, from_task, from_agent + messages, + body, + available_functions, + from_task, + from_agent, + response_model, ) text_content = self._apply_stop_words(text_content) @@ -1232,27 +1478,58 @@ class BedrockCompletion(BaseLLM): ) -> str: """Handle async streaming converse API call.""" if response_model: - structured_tool: ConverseToolTypeDef = { - "toolSpec": { - "name": "structured_output", - "description": "Returns structured data according to the schema", - "inputSchema": { - "json": generate_model_description(response_model) - .get("json_schema", {}) - .get("schema", {}) - }, + # Check if structured_output tool already exists (from a previous recursive call) + existing_tool_config = body.get("toolConfig") + existing_tools: list[Any] = [] + structured_output_already_exists = False + + if existing_tool_config: + existing_tools = list(existing_tool_config.get("tools", [])) + # Check if structured_output tool is already in the tools list + for tool in existing_tools: + tool_spec = tool.get("toolSpec", {}) + if tool_spec.get("name") == STRUCTURED_OUTPUT_TOOL_NAME: + structured_output_already_exists = True + break + + if not structured_output_already_exists: + structured_tool: ConverseToolTypeDef = { + "toolSpec": { + "name": STRUCTURED_OUTPUT_TOOL_NAME, + "description": ( + "Use this tool to provide your final structured response. " + "Call this tool when you have gathered all necessary information " + "and are ready to provide the final answer in the required format." + ), + "inputSchema": { + "json": generate_model_description(response_model) + .get("json_schema", {}) + .get("schema", {}) + }, + } } - } - body["toolConfig"] = cast( - "ToolConfigurationTypeDef", - cast( - object, - { - "tools": [structured_tool], - "toolChoice": {"tool": {"name": "structured_output"}}, - }, - ), - ) + + if existing_tools: + # Append structured_output to existing tools, don't force toolChoice + existing_tools.append(structured_tool) + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast(object, {"tools": existing_tools}), + ) + else: + # No existing tools, use only structured_output with forced toolChoice + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast( + object, + { + "tools": [structured_tool], + "toolChoice": { + "tool": {"name": STRUCTURED_OUTPUT_TOOL_NAME} + }, + }, + ), + ) full_response = "" current_tool_use: dict[str, Any] | None = None @@ -1346,54 +1623,84 @@ class BedrockCompletion(BaseLLM): elif "contentBlockStop" in event: logging.debug("Content block stopped in stream") - if current_tool_use and available_functions: + if current_tool_use: function_name = current_tool_use["name"] function_args = cast( dict[str, Any], current_tool_use.get("input", {}) ) - tool_result = self._handle_tool_execution( - function_name=function_name, - function_args=function_args, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + # Check if this is the structured_output tool + if ( + function_name == STRUCTURED_OUTPUT_TOOL_NAME + and response_model + ): + function_args = _preprocess_structured_data( + function_args, response_model + ) + try: + result = response_model.model_validate( + function_args + ) + self._emit_call_completed_event( + response=result.model_dump_json(), + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=messages, + ) + return result # type: ignore[return-value] + except Exception as e: + error_msg = ( + f"Failed to validate {STRUCTURED_OUTPUT_TOOL_NAME} tool response " + f"with model {response_model.__name__}: {e}" + ) + logging.error(error_msg) + raise ValueError(error_msg) from e - if tool_result is not None and tool_use_id: - messages.append( - { - "role": "assistant", - "content": [{"toolUse": current_tool_use}], - } + # Handle regular tool execution + if available_functions: + tool_result = self._handle_tool_execution( + function_name=function_name, + function_args=function_args, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, ) - messages.append( - { - "role": "user", - "content": [ - { - "toolResult": { - "toolUseId": tool_use_id, - "content": [ - {"text": str(tool_result)} - ], + if tool_result is not None and tool_use_id: + messages.append( + { + "role": "assistant", + "content": [{"toolUse": current_tool_use}], + } + ) + + messages.append( + { + "role": "user", + "content": [ + { + "toolResult": { + "toolUseId": tool_use_id, + "content": [ + {"text": str(tool_result)} + ], + } } - } - ], - } - ) + ], + } + ) - return await self._ahandle_converse( - messages, - body, - available_functions, - from_task, - from_agent, - ) - - current_tool_use = None - tool_use_id = None + return await self._ahandle_converse( + messages, + body, + available_functions, + from_task, + from_agent, + response_model, + ) + current_tool_use = None + tool_use_id = None elif "messageStop" in event: stop_reason = event["messageStop"].get("stopReason") diff --git a/lib/crewai/src/crewai/utilities/agent_utils.py b/lib/crewai/src/crewai/utilities/agent_utils.py index 0c0ffbe89..1ede7f07d 100644 --- a/lib/crewai/src/crewai/utilities/agent_utils.py +++ b/lib/crewai/src/crewai/utilities/agent_utils.py @@ -925,7 +925,7 @@ def extract_tool_call_info( ) func_info = tool_call.get("function", {}) func_name = func_info.get("name", "") or tool_call.get("name", "") - func_args = func_info.get("arguments", "{}") or tool_call.get("input", {}) + func_args = func_info.get("arguments") or tool_call.get("input") or {} return call_id, sanitize_tool_name(func_name), func_args return None