Fix #6149: Parse accumulated tool input in Bedrock streaming

In _handle_streaming_converse and _ahandle_streaming_converse, the
accumulated_tool_input string was never folded back into
current_tool_use['input'] at contentBlockStop, causing tool calls to
receive empty arguments ({}).

Parse the accumulated JSON string and assign it to
current_tool_use['input'] before reading function_args.

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2026-06-13 13:45:55 +00:00
parent d80719df81
commit 4f3243520a
2 changed files with 283 additions and 0 deletions

View File

@@ -1033,6 +1033,18 @@ class BedrockCompletion(BaseLLM):
elif "contentBlockStop" in event:
logging.debug("Content block stopped in stream")
if current_tool_use:
# Fold accumulated streaming deltas back into
# current_tool_use so downstream code (and the
# toolUse block appended to messages) carries
# the parsed arguments.
if accumulated_tool_input:
try:
current_tool_use["input"] = json.loads(
accumulated_tool_input
)
except json.JSONDecodeError:
current_tool_use["input"] = {}
function_name = current_tool_use["name"]
function_args = cast(
dict[str, Any], current_tool_use.get("input", {})
@@ -1631,6 +1643,18 @@ class BedrockCompletion(BaseLLM):
elif "contentBlockStop" in event:
logging.debug("Content block stopped in stream")
if current_tool_use:
# Fold accumulated streaming deltas back into
# current_tool_use so downstream code (and the
# toolUse block appended to messages) carries
# the parsed arguments.
if accumulated_tool_input:
try:
current_tool_use["input"] = json.loads(
accumulated_tool_input
)
except json.JSONDecodeError:
current_tool_use["input"] = {}
function_name = current_tool_use["name"]
function_args = cast(
dict[str, Any], current_tool_use.get("input", {})

View File

@@ -1185,3 +1185,262 @@ def test_bedrock_no_cache_tokens_defaults_to_zero():
llm.call("Hello")
assert llm._token_usage['cached_prompt_tokens'] == 0
# ---------------------------------------------------------------------------
# Streaming tool-call argument accumulation tests (issue #6149)
# ---------------------------------------------------------------------------
def _make_streaming_tool_events(
tool_name: str,
tool_use_id: str,
input_deltas: list[str],
) -> list[dict]:
"""Build a synthetic Converse stream that exercises the tool-call path.
The sequence mirrors what Bedrock actually sends:
messageStart → contentBlockStart (toolUse) → contentBlockDelta * N
→ contentBlockStop → messageStop → metadata
"""
events: list[dict] = [
{"messageStart": {"role": "assistant"}},
{
"contentBlockStart": {
"contentBlockIndex": 0,
"start": {
"toolUse": {
"toolUseId": tool_use_id,
"name": tool_name,
}
},
}
},
]
for chunk in input_deltas:
events.append(
{
"contentBlockDelta": {
"contentBlockIndex": 0,
"delta": {"toolUse": {"input": chunk}},
}
}
)
events.append({"contentBlockStop": {"contentBlockIndex": 0}})
events.append({"messageStop": {"stopReason": "tool_use"}})
events.append(
{
"metadata": {
"usage": {
"inputTokens": 10,
"outputTokens": 5,
"totalTokens": 15,
}
}
}
)
return events
def test_streaming_tool_call_receives_parsed_arguments():
"""Streaming tool calls must receive the fully-parsed arguments dict.
Before the fix for #6149, ``accumulated_tool_input`` was never folded
back into ``current_tool_use["input"]``, so the tool executor received
``{}`` instead of the real arguments.
"""
llm = LLM(
model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0",
stream=True,
)
captured_args: list[dict] = []
def mock_tool(city: str) -> str:
captured_args.append({"city": city})
return f"Weather in {city}: sunny"
available_functions = {"get_weather": mock_tool}
stream_events = _make_streaming_tool_events(
tool_name="get_weather",
tool_use_id="tool-abc-123",
input_deltas=['{"city":', ' "Paris"}'],
)
final_response = {
"output": {
"message": {
"role": "assistant",
"content": [{"text": "It is sunny in Paris."}],
}
},
"usage": {"inputTokens": 20, "outputTokens": 10, "totalTokens": 30},
}
with patch.object(llm._client, "converse_stream") as mock_stream, \
patch.object(llm._client, "converse") as mock_converse:
mock_stream.return_value = {"stream": iter(stream_events)}
mock_converse.return_value = final_response
messages = [{"role": "user", "content": "Weather in Paris?"}]
result = llm.call(messages=messages, available_functions=available_functions)
assert len(captured_args) == 1
assert captured_args[0] == {"city": "Paris"}
assert "sunny" in result.lower() or "Paris" in result
def test_streaming_tool_call_with_multi_chunk_json():
"""Verify argument accumulation works when JSON is split across many chunks."""
llm = LLM(
model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0",
stream=True,
)
captured_args: list[dict] = []
def mock_tool(city: str, units: str) -> str:
captured_args.append({"city": city, "units": units})
return "25°C"
available_functions = {"get_temperature": mock_tool}
stream_events = _make_streaming_tool_events(
tool_name="get_temperature",
tool_use_id="tool-multi-chunk",
input_deltas=['{"ci', 'ty": "Tok', 'yo", "un', 'its": "celsius"}'],
)
final_response = {
"output": {
"message": {
"role": "assistant",
"content": [{"text": "25°C in Tokyo."}],
}
},
"usage": {"inputTokens": 20, "outputTokens": 10, "totalTokens": 30},
}
with patch.object(llm._client, "converse_stream") as mock_stream, \
patch.object(llm._client, "converse") as mock_converse:
mock_stream.return_value = {"stream": iter(stream_events)}
mock_converse.return_value = final_response
result = llm.call(
messages=[{"role": "user", "content": "Temp in Tokyo?"}],
available_functions=available_functions,
)
assert len(captured_args) == 1
assert captured_args[0] == {"city": "Tokyo", "units": "celsius"}
def test_streaming_tool_call_single_chunk():
"""Tool input arriving as a single delta should also work."""
llm = LLM(
model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0",
stream=True,
)
captured_args: list[dict] = []
def mock_tool(query: str) -> str:
captured_args.append({"query": query})
return "42"
available_functions = {"search": mock_tool}
stream_events = _make_streaming_tool_events(
tool_name="search",
tool_use_id="tool-single",
input_deltas=['{"query": "meaning of life"}'],
)
final_response = {
"output": {
"message": {
"role": "assistant",
"content": [{"text": "The answer is 42."}],
}
},
"usage": {"inputTokens": 15, "outputTokens": 8, "totalTokens": 23},
}
with patch.object(llm._client, "converse_stream") as mock_stream, \
patch.object(llm._client, "converse") as mock_converse:
mock_stream.return_value = {"stream": iter(stream_events)}
mock_converse.return_value = final_response
result = llm.call(
messages=[{"role": "user", "content": "What is the meaning of life?"}],
available_functions=available_functions,
)
assert len(captured_args) == 1
assert captured_args[0] == {"query": "meaning of life"}
@pytest.mark.asyncio
async def test_async_streaming_tool_call_receives_parsed_arguments():
"""Async streaming variant of the #6149 fix.
Mirrors ``test_streaming_tool_call_receives_parsed_arguments`` but
exercises ``_ahandle_streaming_converse``.
"""
llm = LLM(
model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0",
stream=True,
)
captured_args: list[dict] = []
def mock_tool(city: str) -> str:
captured_args.append({"city": city})
return f"Weather in {city}: rainy"
available_functions = {"get_weather": mock_tool}
stream_events = _make_streaming_tool_events(
tool_name="get_weather",
tool_use_id="tool-async-456",
input_deltas=['{"city":', ' "London"}'],
)
final_response = {
"output": {
"message": {
"role": "assistant",
"content": [{"text": "It is rainy in London."}],
}
},
"usage": {"inputTokens": 20, "outputTokens": 10, "totalTokens": 30},
}
async def async_stream_iter(events):
for e in events:
yield e
from unittest.mock import AsyncMock
mock_async_client = MagicMock()
mock_async_client.converse_stream = AsyncMock(
return_value={"stream": async_stream_iter(stream_events)}
)
mock_async_client.converse = AsyncMock(return_value=final_response)
async def fake_ensure():
return mock_async_client
async def fake_ahandle_converse(*args, **kwargs):
return "It is rainy in London."
with patch.object(llm, "_ensure_async_client", side_effect=fake_ensure), \
patch.object(llm, "_ahandle_converse", side_effect=fake_ahandle_converse):
messages = [{"role": "user", "content": "Weather in London?"}]
result = await llm.acall(
messages=messages,
available_functions=available_functions,
)
assert len(captured_args) == 1
assert captured_args[0] == {"city": "London"}