Files
crewAI/lib/crewai/tests/llms/bedrock/test_bedrock.py
Lorenze Jay a1033e4bfe Fix structured output leaks in tool-calling loops (#5897)
* Fix structured output leaks in tool-calling loops

* addressing comments

* drop scripts

* Update Gemini agent tests to include structured output with thoughts and bump model version to 2.5-flash

* merge

* Update Anthropic test cases to use new model and tool structure

- Changed the model from "claude-3-5-haiku-20241022" to "claude-sonnet-4-6" in the test setup.
- Updated the request and response formats in the YAML test cassette to reflect the new tool structure and improved content formatting.
- Adjusted the expected response body to match the new output format from the assistant, including changes in tool usage and response details.
- Increased rate limit values in the response headers for better testing scenarios.

* adjusted bedrock cassettes

* adjusting cassettes for bedrock

* fix test

* Update VCR configuration to use 'host' instead of 'bedrock_host' for request matching
2026-05-27 13:20:53 -07:00

1188 lines
39 KiB
Python

import os
import sys
import types
from unittest.mock import patch, MagicMock
import pytest
from crewai.llm import LLM
from crewai.crew import Crew
from crewai.agent import Agent
from crewai.task import Task
def _create_bedrock_mocks():
"""Helper to create Bedrock mocks."""
mock_session_class = MagicMock()
mock_session_instance = MagicMock()
mock_client = MagicMock()
# Set up default mock responses to prevent hanging
default_response = {
'output': {
'message': {
'role': 'assistant',
'content': [
{'text': 'Test response'}
]
}
},
'usage': {
'inputTokens': 10,
'outputTokens': 5,
'totalTokens': 15
}
}
mock_client.converse.return_value = default_response
mock_client.converse_stream.return_value = {'stream': []}
mock_session_instance.client.return_value = mock_client
mock_session_class.return_value = mock_session_instance
return mock_session_class, mock_client
@pytest.fixture(autouse=True)
def mock_aws_credentials():
"""Mock AWS credentials and boto3 Session for tests only if real credentials are not set."""
if "AWS_ACCESS_KEY_ID" in os.environ and "AWS_SECRET_ACCESS_KEY" in os.environ:
yield None, None
return
with patch.dict(os.environ, {
"AWS_ACCESS_KEY_ID": "test-access-key",
"AWS_SECRET_ACCESS_KEY": "test-secret-key",
"AWS_DEFAULT_REGION": "us-east-1"
}):
# Mock boto3 Session to prevent actual AWS connections
with patch('crewai.llms.providers.bedrock.completion.Session') as mock_session_class:
mock_session_instance = MagicMock()
mock_client = MagicMock()
# Set up default mock responses to prevent hanging
default_response = {
'output': {
'message': {
'role': 'assistant',
'content': [
{'text': 'Test response'}
]
}
},
'usage': {
'inputTokens': 10,
'outputTokens': 5,
'totalTokens': 15
}
}
mock_client.converse.return_value = default_response
mock_client.converse_stream.return_value = {'stream': []}
mock_session_instance.client.return_value = mock_client
mock_session_class.return_value = mock_session_instance
yield mock_session_class, mock_client
@pytest.fixture
def bedrock_mocks():
"""Fixture that always provides Bedrock mocks, regardless of real credentials.
Use this fixture for tests that explicitly need to test mock behavior.
"""
with patch.dict(os.environ, {
"AWS_ACCESS_KEY_ID": "test-access-key",
"AWS_SECRET_ACCESS_KEY": "test-secret-key",
"AWS_DEFAULT_REGION": "us-east-1"
}):
with patch('crewai.llms.providers.bedrock.completion.Session') as mock_session_class:
mock_session_instance = MagicMock()
mock_client = MagicMock()
default_response = {
'output': {
'message': {
'role': 'assistant',
'content': [
{'text': 'Test response'}
]
}
},
'usage': {
'inputTokens': 10,
'outputTokens': 5,
'totalTokens': 15
}
}
mock_client.converse.return_value = default_response
mock_client.converse_stream.return_value = {'stream': []}
mock_session_instance.client.return_value = mock_client
mock_session_class.return_value = mock_session_instance
yield mock_session_class, mock_client
def test_bedrock_completion_is_used_when_bedrock_provider():
"""
Test that BedrockCompletion from completion.py is used when LLM uses provider 'bedrock'
"""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
assert llm.__class__.__name__ == "BedrockCompletion"
assert llm.provider == "bedrock"
assert llm.model == "anthropic.claude-3-5-sonnet-20241022-v2:0"
def test_bedrock_completion_module_is_imported():
"""
Test that the completion module is properly imported when using Bedrock provider
"""
module_name = "crewai.llms.providers.bedrock.completion"
if module_name in sys.modules:
del sys.modules[module_name]
LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
assert module_name in sys.modules
completion_mod = sys.modules[module_name]
assert isinstance(completion_mod, types.ModuleType)
assert hasattr(completion_mod, 'BedrockCompletion')
def test_native_bedrock_raises_error_when_initialization_fails():
"""
Test that LLM raises ImportError when native Bedrock completion fails.
With the new behavior, when a native provider is in SUPPORTED_NATIVE_PROVIDERS
but fails to instantiate, we raise an ImportError instead of silently falling back.
This provides clearer error messages to users about missing dependencies.
"""
with patch('crewai.llm.LLM._get_native_provider') as mock_get_provider:
class FailingCompletion:
def __init__(self, *args, **kwargs):
raise Exception("Native AWS Bedrock SDK failed")
mock_get_provider.return_value = FailingCompletion
with pytest.raises(ImportError) as excinfo:
LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
assert "Error importing native provider" in str(excinfo.value)
assert "Native AWS Bedrock SDK failed" in str(excinfo.value)
def test_bedrock_completion_initialization_parameters():
"""
Test that BedrockCompletion is initialized with correct parameters
"""
llm = LLM(
model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0",
temperature=0.7,
max_tokens=2000,
top_p=0.9,
top_k=40,
region_name="us-west-2"
)
from crewai.llms.providers.bedrock.completion import BedrockCompletion
assert isinstance(llm, BedrockCompletion)
assert llm.model == "anthropic.claude-3-5-sonnet-20241022-v2:0"
assert llm.temperature == 0.7
assert llm.max_tokens == 2000
assert llm.top_p == 0.9
assert llm.top_k == 40
assert llm.region_name == "us-west-2"
def test_bedrock_specific_parameters():
"""
Test Bedrock-specific parameters like stop_sequences and streaming
"""
llm = LLM(
model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0",
stop_sequences=["Human:", "Assistant:"],
stream=True,
region_name="us-east-1"
)
from crewai.llms.providers.bedrock.completion import BedrockCompletion
assert isinstance(llm, BedrockCompletion)
assert llm.stop_sequences == ["Human:", "Assistant:"]
assert llm.stream == True
assert llm.region_name == "us-east-1"
def test_bedrock_completion_call():
"""
Test that BedrockCompletion call method works
"""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
with patch.object(llm, 'call', return_value="Hello! I'm Claude on Bedrock, ready to help.") as mock_call:
result = llm.call("Hello, how are you?")
assert result == "Hello! I'm Claude on Bedrock, ready to help."
mock_call.assert_called_once_with("Hello, how are you?")
def test_bedrock_completion_called_during_crew_execution():
"""
Test that BedrockCompletion.call is actually invoked when running a crew
"""
bedrock_llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
with patch.object(bedrock_llm, 'call', return_value="Tokyo has 14 million people.") as mock_call:
agent = Agent(
role="Research Assistant",
goal="Find population info",
backstory="You research populations.",
llm=bedrock_llm,
)
task = Task(
description="Find Tokyo population",
expected_output="Population number",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert "14 million" in str(result)
@pytest.mark.skip(reason="Crew execution test - may hang, needs investigation")
def test_bedrock_completion_call_arguments():
"""
Test that BedrockCompletion.call is invoked with correct arguments
"""
bedrock_llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
with patch.object(bedrock_llm, 'call') as mock_call:
mock_call.return_value = "Task completed successfully."
agent = Agent(
role="Test Agent",
goal="Complete a simple task",
backstory="You are a test agent.",
llm=bedrock_llm
)
task = Task(
description="Say hello world",
expected_output="Hello world",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
crew.kickoff()
assert mock_call.called
call_args = mock_call.call_args
assert call_args is not None
messages = call_args[0][0]
assert isinstance(messages, (str, list))
if isinstance(messages, str):
assert "hello world" in messages.lower()
elif isinstance(messages, list):
message_content = str(messages).lower()
assert "hello world" in message_content
def test_multiple_bedrock_calls_in_crew():
"""
Test that BedrockCompletion.call is invoked multiple times for multiple tasks
"""
bedrock_llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
with patch.object(bedrock_llm, 'call') as mock_call:
mock_call.return_value = "Task completed."
agent = Agent(
role="Multi-task Agent",
goal="Complete multiple tasks",
backstory="You can handle multiple tasks.",
llm=bedrock_llm
)
task1 = Task(
description="First task",
expected_output="First result",
agent=agent,
)
task2 = Task(
description="Second task",
expected_output="Second result",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task1, task2]
)
crew.kickoff()
assert mock_call.call_count >= 2 # At least one call per task
for call in mock_call.call_args_list:
assert len(call[0]) > 0
messages = call[0][0]
assert messages is not None
def test_bedrock_completion_with_tools():
"""
Test that BedrockCompletion.call is invoked with tools when agent has tools
"""
from crewai.tools import tool
@tool
def sample_tool(query: str) -> str:
"""A sample tool for testing"""
return f"Tool result for: {query}"
bedrock_llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
with patch.object(bedrock_llm, 'call') as mock_call:
mock_call.return_value = "Task completed with tools."
agent = Agent(
role="Tool User",
goal="Use tools to complete tasks",
backstory="You can use tools.",
llm=bedrock_llm,
tools=[sample_tool]
)
task = Task(
description="Use the sample tool",
expected_output="Tool usage result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
crew.kickoff()
assert mock_call.called
call_args = mock_call.call_args
call_kwargs = call_args[1] if len(call_args) > 1 else {}
if 'tools' in call_kwargs:
assert call_kwargs['tools'] is not None
assert len(call_kwargs['tools']) > 0
def test_bedrock_raises_error_when_model_not_found(bedrock_mocks):
"""Test that BedrockCompletion raises appropriate error when model not found"""
from botocore.exceptions import ClientError
_, mock_client = bedrock_mocks
error_response = {
'Error': {
'Code': 'ResourceNotFoundException',
'Message': 'Could not resolve the foundation model from the model identifier'
}
}
mock_client.converse.side_effect = ClientError(error_response, 'converse')
llm = LLM(model="bedrock/model-doesnt-exist")
with pytest.raises(Exception): # Should raise some error for unsupported model
llm.call("Hello")
def test_bedrock_aws_credentials_configuration():
"""
Test that AWS credentials configuration works properly
"""
aws_access_key_id = "test-access-key"
aws_secret_access_key = "test-secret-key"
aws_region_name = "us-east-1"
with patch.dict(os.environ, {
"AWS_ACCESS_KEY_ID": aws_access_key_id,
"AWS_SECRET_ACCESS_KEY": aws_secret_access_key,
"AWS_DEFAULT_REGION": aws_region_name
}):
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
from crewai.llms.providers.bedrock.completion import BedrockCompletion
assert isinstance(llm, BedrockCompletion)
assert llm.region_name == aws_region_name
assert llm.aws_access_key_id == aws_access_key_id
assert llm.aws_secret_access_key == aws_secret_access_key
# Test with litellm environment variables
with patch.dict(os.environ, {
"AWS_ACCESS_KEY_ID": aws_access_key_id,
"AWS_SECRET_ACCESS_KEY": aws_secret_access_key,
"AWS_REGION_NAME": aws_region_name
}):
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
from crewai.llms.providers.bedrock.completion import BedrockCompletion
assert isinstance(llm, BedrockCompletion)
assert llm.region_name == aws_region_name
llm_explicit = LLM(
model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0",
aws_access_key_id="explicit-key",
aws_secret_access_key="explicit-secret",
region_name="us-west-2"
)
assert isinstance(llm_explicit, BedrockCompletion)
assert llm_explicit.region_name == "us-west-2"
def test_bedrock_model_capabilities():
"""
Test that model capabilities are correctly identified
"""
llm_claude = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
from crewai.llms.providers.bedrock.completion import BedrockCompletion
assert isinstance(llm_claude, BedrockCompletion)
assert llm_claude.is_claude_model == True
assert llm_claude.supports_tools == True
# Test other Bedrock model
llm_titan = LLM(model="bedrock/amazon.titan-text-express-v1")
assert isinstance(llm_titan, BedrockCompletion)
assert llm_titan.supports_tools == True
def test_bedrock_inference_config():
"""
Test that inference config is properly prepared
"""
llm = LLM(
model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0",
temperature=0.7,
top_p=0.9,
top_k=40,
max_tokens=1000
)
from crewai.llms.providers.bedrock.completion import BedrockCompletion
assert isinstance(llm, BedrockCompletion)
config = llm._get_inference_config()
assert 'temperature' in config
assert config['temperature'] == 0.7
assert 'topP' in config
assert config['topP'] == 0.9
assert 'maxTokens' in config
assert config['maxTokens'] == 1000
assert 'topK' in config
assert config['topK'] == 40
def test_bedrock_model_detection():
"""
Test that various Bedrock model formats are properly detected
"""
# Test Bedrock model naming patterns
bedrock_test_cases = [
"bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0",
"bedrock/anthropic.claude-3-haiku-20240307-v1:0",
"bedrock/amazon.titan-text-express-v1",
"bedrock/meta.llama3-70b-instruct-v1:0"
]
for model_name in bedrock_test_cases:
llm = LLM(model=model_name)
from crewai.llms.providers.bedrock.completion import BedrockCompletion
assert isinstance(llm, BedrockCompletion), f"Failed for model: {model_name}"
def test_bedrock_supports_stop_words():
"""
Test that Bedrock models support stop sequences
"""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
assert llm.supports_stop_words() == True
def test_bedrock_context_window_size():
"""
Test that Bedrock models return correct context window sizes
"""
llm_claude = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
context_size_claude = llm_claude.get_context_window_size()
assert context_size_claude > 150000 # Should be substantial (200K tokens with ratio)
llm_titan = LLM(model="bedrock/amazon.titan-text-express-v1")
context_size_titan = llm_titan.get_context_window_size()
assert context_size_titan > 5000
def test_bedrock_message_formatting():
"""
Test that messages are properly formatted for Bedrock Converse API
"""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
test_messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
{"role": "user", "content": "How are you?"}
]
formatted_messages, system_message = llm._format_messages_for_converse(test_messages)
# System message should be extracted
assert system_message == "You are a helpful assistant."
# Remaining messages should be in Converse format
assert len(formatted_messages) >= 3
assert formatted_messages[0]["role"] == "user"
assert formatted_messages[1]["role"] == "assistant"
# Messages should have content array with text
assert isinstance(formatted_messages[0]["content"], list)
assert "text" in formatted_messages[0]["content"][0]
def test_bedrock_streaming_parameter():
"""
Test that streaming parameter is properly handled
"""
llm_no_stream = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0", stream=False)
assert llm_no_stream.stream == False
llm_stream = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0", stream=True)
assert llm_stream.stream == True
def test_bedrock_tool_conversion():
"""
Test that tools are properly converted to Bedrock Converse format
"""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
crewai_tools = [{
"type": "function",
"function": {
"name": "test_tool",
"description": "A test tool",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
}
}]
bedrock_tools = llm._format_tools_for_converse(crewai_tools)
assert len(bedrock_tools) == 1
# Bedrock tools should have toolSpec structure
assert "toolSpec" in bedrock_tools[0]
assert bedrock_tools[0]["toolSpec"]["name"] == "test_tool"
assert bedrock_tools[0]["toolSpec"]["description"] == "A test tool"
assert "inputSchema" in bedrock_tools[0]["toolSpec"]
def test_bedrock_environment_variable_credentials(bedrock_mocks):
"""
Test that AWS credentials are properly loaded from environment
"""
mock_session_class, _ = bedrock_mocks
mock_session_class.reset_mock()
with patch.dict(os.environ, {
"AWS_ACCESS_KEY_ID": "test-access-key-123",
"AWS_SECRET_ACCESS_KEY": "test-secret-key-456"
}):
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
assert mock_session_class.called
call_kwargs = mock_session_class.call_args[1] if mock_session_class.call_args else {}
assert call_kwargs.get('aws_access_key_id') == "test-access-key-123"
assert call_kwargs.get('aws_secret_access_key') == "test-secret-key-456"
def test_bedrock_token_usage_tracking():
"""
Test that token usage is properly tracked for Bedrock responses
"""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
# Mock the Bedrock response with usage information
with patch.object(llm._client, 'converse') as mock_converse:
mock_response = {
'output': {
'message': {
'role': 'assistant',
'content': [
{'text': 'test response'}
]
}
},
'usage': {
'inputTokens': 50,
'outputTokens': 25,
'totalTokens': 75
}
}
mock_converse.return_value = mock_response
result = llm.call("Hello")
assert result == "test response"
assert llm._token_usage['prompt_tokens'] == 50
assert llm._token_usage['completion_tokens'] == 25
assert llm._token_usage['total_tokens'] == 75
def test_bedrock_tool_use_conversation_flow():
"""
Test that the Bedrock completion properly handles tool use conversation flow
"""
from unittest.mock import Mock
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
def mock_weather_tool(location: str) -> str:
return f"The weather in {location} is sunny and 75°F"
available_functions = {"get_weather": mock_weather_tool}
# Mock the Bedrock client responses
with patch.object(llm._client, 'converse') as mock_converse:
tool_use_response = {
'output': {
'message': {
'role': 'assistant',
'content': [
{
'toolUse': {
'toolUseId': 'tool-123',
'name': 'get_weather',
'input': {'location': 'San Francisco'}
}
}
]
}
},
'usage': {
'inputTokens': 100,
'outputTokens': 50,
'totalTokens': 150
}
}
final_response = {
'output': {
'message': {
'role': 'assistant',
'content': [
{'text': 'Based on the weather data, it is sunny and 75°F in San Francisco.'}
]
}
},
'usage': {
'inputTokens': 120,
'outputTokens': 30,
'totalTokens': 150
}
}
mock_converse.side_effect = [tool_use_response, final_response]
messages = [{"role": "user", "content": "What's the weather like in San Francisco?"}]
result = llm.call(
messages=messages,
available_functions=available_functions
)
assert "sunny" in result.lower() or "75" in result
# Verify that the API was called twice (once for tool use, once for final answer)
assert mock_converse.call_count == 2
def test_bedrock_handles_cohere_conversation_requirements():
"""
Test that Bedrock properly handles Cohere model's requirement for user message at end
"""
llm = LLM(model="bedrock/cohere.command-r-plus-v1:0")
test_messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"}
]
formatted_messages, system_message = llm._format_messages_for_converse(test_messages)
# For Cohere models, should add a user message at the end
assert formatted_messages[-1]["role"] == "user"
assert "continue" in formatted_messages[-1]["content"][0]["text"].lower()
def test_bedrock_client_error_handling():
"""
Test that Bedrock properly handles various AWS client errors
"""
from botocore.exceptions import ClientError
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
with patch.object(llm._client, 'converse') as mock_converse:
error_response = {
'Error': {
'Code': 'ValidationException',
'Message': 'Invalid request format'
}
}
mock_converse.side_effect = ClientError(error_response, 'converse')
with pytest.raises(ValueError) as exc_info:
llm.call("Hello")
assert "validation" in str(exc_info.value).lower()
with patch.object(llm._client, 'converse') as mock_converse:
error_response = {
'Error': {
'Code': 'ThrottlingException',
'Message': 'Rate limit exceeded'
}
}
mock_converse.side_effect = ClientError(error_response, 'converse')
with pytest.raises(RuntimeError) as exc_info:
llm.call("Hello")
assert "throttled" in str(exc_info.value).lower()
def test_bedrock_stop_sequences_sync():
"""Test that stop and stop_sequences attributes stay synchronized."""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
llm.stop = ["\nObservation:", "\nThought:"]
assert list(llm.stop_sequences) == ["\nObservation:", "\nThought:"]
assert llm.stop == ["\nObservation:", "\nThought:"]
llm.stop = "\nFinal Answer:"
assert list(llm.stop_sequences) == ["\nFinal Answer:"]
assert llm.stop == ["\nFinal Answer:"]
llm.stop = None
assert list(llm.stop_sequences) == []
assert llm.stop == []
def test_bedrock_stop_sequences_sent_to_api():
"""Test that stop_sequences are properly sent to the Bedrock API."""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
# Set stop sequences via the stop attribute (simulating CrewAgentExecutor)
llm.stop = ["\nObservation:", "\nThought:"]
with patch.object(llm._client, 'converse') as mock_converse:
mock_response = {
'output': {
'message': {
'role': 'assistant',
'content': [{'text': 'Hello'}]
}
},
'usage': {
'inputTokens': 10,
'outputTokens': 5,
'totalTokens': 15
}
}
mock_converse.return_value = mock_response
llm.call("Say hello in one word")
call_kwargs = mock_converse.call_args[1]
assert "inferenceConfig" in call_kwargs
assert "stopSequences" in call_kwargs["inferenceConfig"]
assert call_kwargs["inferenceConfig"]["stopSequences"] == ["\nObservation:", "\nThought:"]
# Agent Kickoff Structured Output Tests
@pytest.mark.vcr()
def test_bedrock_agent_kickoff_structured_output_without_tools():
"""
Test that agent kickoff returns structured output without tools.
This tests native structured output handling for Bedrock models.
"""
from pydantic import BaseModel, Field
class AnalysisResult(BaseModel):
"""Structured output for analysis results."""
topic: str = Field(description="The topic analyzed")
key_points: list[str] = Field(description="Key insights from the analysis")
summary: str = Field(description="Brief summary of findings")
agent = Agent(
role="Analyst",
goal="Provide structured analysis on topics",
backstory="You are an expert analyst who provides clear, structured insights.",
llm=LLM(model="bedrock/us.anthropic.claude-sonnet-4-6"),
tools=[],
verbose=True,
)
result = agent.kickoff(
messages="Analyze the benefits of remote work briefly. Keep it concise.",
response_format=AnalysisResult,
)
assert result.pydantic is not None, "Expected pydantic output but got None"
assert isinstance(result.pydantic, AnalysisResult), f"Expected AnalysisResult but got {type(result.pydantic)}"
assert result.pydantic.topic, "Topic should not be empty"
assert len(result.pydantic.key_points) > 0, "Should have at least one key point"
assert result.pydantic.summary, "Summary should not be empty"
@pytest.mark.vcr()
def test_bedrock_agent_kickoff_structured_output_with_tools():
"""
Test that agent kickoff returns structured output after using tools.
This tests post-tool-call structured output handling for Bedrock models.
"""
from pydantic import BaseModel, Field
from crewai.tools import tool
class CalculationResult(BaseModel):
"""Structured output for calculation results."""
operation: str = Field(description="The mathematical operation performed")
result: int = Field(description="The result of the calculation")
explanation: str = Field(description="Brief explanation of the calculation")
@tool
def add_numbers(a: int, b: int) -> int:
"""Add two numbers together and return the sum."""
return a + b
agent = Agent(
role="Calculator",
goal="Perform calculations using available tools",
backstory="You are a calculator assistant that uses tools to compute results.",
llm=LLM(model="bedrock/us.anthropic.claude-sonnet-4-6"),
tools=[add_numbers],
verbose=True,
)
result = agent.kickoff(
messages="Calculate 15 + 27 using your add_numbers tool. Report the result.",
response_format=CalculationResult,
)
assert result.pydantic is not None, "Expected pydantic output but got None"
assert isinstance(result.pydantic, CalculationResult), f"Expected CalculationResult but got {type(result.pydantic)}"
assert result.pydantic.result == 42, f"Expected result 42 but got {result.pydantic.result}"
assert result.pydantic.operation, "Operation should not be empty"
assert result.pydantic.explanation, "Explanation should not be empty"
def test_bedrock_groups_three_tool_results():
"""Consecutive tool results should be grouped into one Bedrock user message."""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
messages = [
{"role": "user", "content": "Use all three tools, then continue."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "tool-1",
"type": "function",
"function": {
"name": "lookup_weather",
"arguments": '{"location": "New York"}',
},
},
{
"id": "tool-2",
"type": "function",
"function": {
"name": "lookup_news",
"arguments": '{"topic": "AI"}',
},
},
{
"id": "tool-3",
"type": "function",
"function": {
"name": "lookup_stock",
"arguments": '{"ticker": "AMZN"}',
},
},
],
},
{"role": "tool", "tool_call_id": "tool-1", "content": "72F and sunny"},
{"role": "tool", "tool_call_id": "tool-2", "content": "AI news summary"},
{"role": "tool", "tool_call_id": "tool-3", "content": "AMZN up 1.2%"},
]
formatted_messages, system_message = llm._format_messages_for_converse(messages)
assert system_message is None
assert [message["role"] for message in formatted_messages] == [
"user",
"assistant",
"user",
]
assert len(formatted_messages[1]["content"]) == 3
tool_results = formatted_messages[2]["content"]
assert len(tool_results) == 3
assert [block["toolResult"]["toolUseId"] for block in tool_results] == [
"tool-1",
"tool-2",
"tool-3",
]
assert [block["toolResult"]["content"][0]["text"] for block in tool_results] == [
"72F and sunny",
"AI news summary",
"AMZN up 1.2%",
]
def test_bedrock_parallel_tool_results_grouped():
"""Regression test for issue #4749.
When an assistant message contains multiple parallel tool calls,
Bedrock requires all corresponding tool results to be grouped
in a single user message. Previously each tool result was emitted
as a separate user message, causing:
ValidationException: Expected toolResult blocks at messages.2.content
"""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
messages = [
{"role": "user", "content": "Calculate 25 + 17 AND 10 * 5"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_add",
"type": "function",
"function": {"name": "add_tool", "arguments": '{"a": 25, "b": 17}'},
},
{
"id": "call_mul",
"type": "function",
"function": {"name": "multiply_tool", "arguments": '{"a": 10, "b": 5}'},
},
],
},
{"role": "tool", "tool_call_id": "call_add", "content": "42"},
{"role": "tool", "tool_call_id": "call_mul", "content": "50"},
]
converse_msgs, system_msg = llm._format_messages_for_converse(messages)
tool_result_messages = [
m for m in converse_msgs
if m.get("role") == "user"
and any("toolResult" in b for b in m.get("content", []))
]
# There must be exactly ONE user message with tool results (not two)
assert len(tool_result_messages) == 1, (
f"Expected 1 grouped tool-result message, got {len(tool_result_messages)}. "
"Bedrock requires all parallel tool results in a single user message."
)
# That single message must contain both tool results
tool_results = tool_result_messages[0]["content"]
assert len(tool_results) == 2, (
f"Expected 2 toolResult blocks in grouped message, got {len(tool_results)}"
)
tool_use_ids = {
block["toolResult"]["toolUseId"] for block in tool_results
}
assert tool_use_ids == {"call_add", "call_mul"}
def test_bedrock_single_tool_result_still_works():
"""Ensure single tool call still produces a single-block user message."""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
messages = [
{"role": "user", "content": "Add 1 + 2"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_single",
"type": "function",
"function": {"name": "add_tool", "arguments": '{"a": 1, "b": 2}'},
},
],
},
{"role": "tool", "tool_call_id": "call_single", "content": "3"},
]
converse_msgs, _ = llm._format_messages_for_converse(messages)
tool_result_messages = [
m for m in converse_msgs
if m.get("role") == "user"
and any("toolResult" in b for b in m.get("content", []))
]
assert len(tool_result_messages) == 1
assert len(tool_result_messages[0]["content"]) == 1
assert tool_result_messages[0]["content"][0]["toolResult"]["toolUseId"] == "call_single"
def test_bedrock_tool_results_not_merged_across_assistant_messages():
"""Tool results from different assistant turns must NOT be merged."""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
messages = [
{"role": "user", "content": "First task"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_a",
"type": "function",
"function": {"name": "tool_a", "arguments": "{}"},
},
],
},
{"role": "tool", "tool_call_id": "call_a", "content": "result_a"},
{"role": "assistant", "content": "Now doing second task"},
{"role": "user", "content": "Second task"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_b",
"type": "function",
"function": {"name": "tool_b", "arguments": "{}"},
},
],
},
{"role": "tool", "tool_call_id": "call_b", "content": "result_b"},
]
converse_msgs, _ = llm._format_messages_for_converse(messages)
tool_result_messages = [
m for m in converse_msgs
if m.get("role") == "user"
and any("toolResult" in b for b in m.get("content", []))
]
# Two separate tool-result messages (one per assistant turn)
assert len(tool_result_messages) == 2, (
"Tool results from different assistant turns must remain separate"
)
assert tool_result_messages[0]["content"][0]["toolResult"]["toolUseId"] == "call_a"
assert tool_result_messages[1]["content"][0]["toolResult"]["toolUseId"] == "call_b"
def test_bedrock_cached_token_tracking():
"""Test that cached tokens (cacheReadInputTokenCount) are tracked for Bedrock."""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
with patch.object(llm._client, 'converse') as mock_converse:
mock_response = {
'output': {
'message': {
'role': 'assistant',
'content': [{'text': 'test response'}]
}
},
'usage': {
'inputTokens': 100,
'outputTokens': 50,
'totalTokens': 150,
'cacheReadInputTokenCount': 30,
}
}
mock_converse.return_value = mock_response
result = llm.call("Hello")
assert result == "test response"
assert llm._token_usage['prompt_tokens'] == 100
assert llm._token_usage['completion_tokens'] == 50
assert llm._token_usage['total_tokens'] == 150
assert llm._token_usage['cached_prompt_tokens'] == 30
def test_bedrock_cached_token_alternate_key():
"""Test that the alternate key cacheReadInputTokens also works."""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
with patch.object(llm._client, 'converse') as mock_converse:
mock_response = {
'output': {
'message': {
'role': 'assistant',
'content': [{'text': 'test response'}]
}
},
'usage': {
'inputTokens': 80,
'outputTokens': 40,
'totalTokens': 120,
'cacheReadInputTokens': 25,
}
}
mock_converse.return_value = mock_response
llm.call("Hello")
assert llm._token_usage['cached_prompt_tokens'] == 25
def test_bedrock_no_cache_tokens_defaults_to_zero():
"""Test that missing cache token keys default to zero."""
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
with patch.object(llm._client, 'converse') as mock_converse:
mock_response = {
'output': {
'message': {
'role': 'assistant',
'content': [{'text': 'test response'}]
}
},
'usage': {
'inputTokens': 60,
'outputTokens': 30,
'totalTokens': 90,
}
}
mock_converse.return_value = mock_response
llm.call("Hello")
assert llm._token_usage['cached_prompt_tokens'] == 0