From 4f3243520a212fd2918a8b17e655e70b68777342 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 13 Jun 2026 13:45:55 +0000 Subject: [PATCH] Fix #6149: Parse accumulated tool input in Bedrock streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In _handle_streaming_converse and _ahandle_streaming_converse, the accumulated_tool_input string was never folded back into current_tool_use['input'] at contentBlockStop, causing tool calls to receive empty arguments ({}). Parse the accumulated JSON string and assign it to current_tool_use['input'] before reading function_args. Co-Authored-By: João --- .../llms/providers/bedrock/completion.py | 24 ++ lib/crewai/tests/llms/bedrock/test_bedrock.py | 259 ++++++++++++++++++ 2 files changed, 283 insertions(+) diff --git a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py index 0f34b6723..4db9b816c 100644 --- a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py +++ b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py @@ -1033,6 +1033,18 @@ class BedrockCompletion(BaseLLM): elif "contentBlockStop" in event: logging.debug("Content block stopped in stream") if current_tool_use: + # Fold accumulated streaming deltas back into + # current_tool_use so downstream code (and the + # toolUse block appended to messages) carries + # the parsed arguments. + if accumulated_tool_input: + try: + current_tool_use["input"] = json.loads( + accumulated_tool_input + ) + except json.JSONDecodeError: + current_tool_use["input"] = {} + function_name = current_tool_use["name"] function_args = cast( dict[str, Any], current_tool_use.get("input", {}) @@ -1631,6 +1643,18 @@ class BedrockCompletion(BaseLLM): elif "contentBlockStop" in event: logging.debug("Content block stopped in stream") if current_tool_use: + # Fold accumulated streaming deltas back into + # current_tool_use so downstream code (and the + # toolUse block appended to messages) carries + # the parsed arguments. + if accumulated_tool_input: + try: + current_tool_use["input"] = json.loads( + accumulated_tool_input + ) + except json.JSONDecodeError: + current_tool_use["input"] = {} + function_name = current_tool_use["name"] function_args = cast( dict[str, Any], current_tool_use.get("input", {}) diff --git a/lib/crewai/tests/llms/bedrock/test_bedrock.py b/lib/crewai/tests/llms/bedrock/test_bedrock.py index d7421e852..8acb93d6c 100644 --- a/lib/crewai/tests/llms/bedrock/test_bedrock.py +++ b/lib/crewai/tests/llms/bedrock/test_bedrock.py @@ -1185,3 +1185,262 @@ def test_bedrock_no_cache_tokens_defaults_to_zero(): llm.call("Hello") assert llm._token_usage['cached_prompt_tokens'] == 0 + + +# --------------------------------------------------------------------------- +# Streaming tool-call argument accumulation tests (issue #6149) +# --------------------------------------------------------------------------- + +def _make_streaming_tool_events( + tool_name: str, + tool_use_id: str, + input_deltas: list[str], +) -> list[dict]: + """Build a synthetic Converse stream that exercises the tool-call path. + + The sequence mirrors what Bedrock actually sends: + messageStart → contentBlockStart (toolUse) → contentBlockDelta * N + → contentBlockStop → messageStop → metadata + """ + events: list[dict] = [ + {"messageStart": {"role": "assistant"}}, + { + "contentBlockStart": { + "contentBlockIndex": 0, + "start": { + "toolUse": { + "toolUseId": tool_use_id, + "name": tool_name, + } + }, + } + }, + ] + for chunk in input_deltas: + events.append( + { + "contentBlockDelta": { + "contentBlockIndex": 0, + "delta": {"toolUse": {"input": chunk}}, + } + } + ) + events.append({"contentBlockStop": {"contentBlockIndex": 0}}) + events.append({"messageStop": {"stopReason": "tool_use"}}) + events.append( + { + "metadata": { + "usage": { + "inputTokens": 10, + "outputTokens": 5, + "totalTokens": 15, + } + } + } + ) + return events + + +def test_streaming_tool_call_receives_parsed_arguments(): + """Streaming tool calls must receive the fully-parsed arguments dict. + + Before the fix for #6149, ``accumulated_tool_input`` was never folded + back into ``current_tool_use["input"]``, so the tool executor received + ``{}`` instead of the real arguments. + """ + llm = LLM( + model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0", + stream=True, + ) + + captured_args: list[dict] = [] + + def mock_tool(city: str) -> str: + captured_args.append({"city": city}) + return f"Weather in {city}: sunny" + + available_functions = {"get_weather": mock_tool} + + stream_events = _make_streaming_tool_events( + tool_name="get_weather", + tool_use_id="tool-abc-123", + input_deltas=['{"city":', ' "Paris"}'], + ) + + final_response = { + "output": { + "message": { + "role": "assistant", + "content": [{"text": "It is sunny in Paris."}], + } + }, + "usage": {"inputTokens": 20, "outputTokens": 10, "totalTokens": 30}, + } + + with patch.object(llm._client, "converse_stream") as mock_stream, \ + patch.object(llm._client, "converse") as mock_converse: + mock_stream.return_value = {"stream": iter(stream_events)} + mock_converse.return_value = final_response + + messages = [{"role": "user", "content": "Weather in Paris?"}] + result = llm.call(messages=messages, available_functions=available_functions) + + assert len(captured_args) == 1 + assert captured_args[0] == {"city": "Paris"} + assert "sunny" in result.lower() or "Paris" in result + + +def test_streaming_tool_call_with_multi_chunk_json(): + """Verify argument accumulation works when JSON is split across many chunks.""" + llm = LLM( + model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0", + stream=True, + ) + + captured_args: list[dict] = [] + + def mock_tool(city: str, units: str) -> str: + captured_args.append({"city": city, "units": units}) + return "25°C" + + available_functions = {"get_temperature": mock_tool} + + stream_events = _make_streaming_tool_events( + tool_name="get_temperature", + tool_use_id="tool-multi-chunk", + input_deltas=['{"ci', 'ty": "Tok', 'yo", "un', 'its": "celsius"}'], + ) + + final_response = { + "output": { + "message": { + "role": "assistant", + "content": [{"text": "25°C in Tokyo."}], + } + }, + "usage": {"inputTokens": 20, "outputTokens": 10, "totalTokens": 30}, + } + + with patch.object(llm._client, "converse_stream") as mock_stream, \ + patch.object(llm._client, "converse") as mock_converse: + mock_stream.return_value = {"stream": iter(stream_events)} + mock_converse.return_value = final_response + + result = llm.call( + messages=[{"role": "user", "content": "Temp in Tokyo?"}], + available_functions=available_functions, + ) + + assert len(captured_args) == 1 + assert captured_args[0] == {"city": "Tokyo", "units": "celsius"} + + +def test_streaming_tool_call_single_chunk(): + """Tool input arriving as a single delta should also work.""" + llm = LLM( + model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0", + stream=True, + ) + + captured_args: list[dict] = [] + + def mock_tool(query: str) -> str: + captured_args.append({"query": query}) + return "42" + + available_functions = {"search": mock_tool} + + stream_events = _make_streaming_tool_events( + tool_name="search", + tool_use_id="tool-single", + input_deltas=['{"query": "meaning of life"}'], + ) + + final_response = { + "output": { + "message": { + "role": "assistant", + "content": [{"text": "The answer is 42."}], + } + }, + "usage": {"inputTokens": 15, "outputTokens": 8, "totalTokens": 23}, + } + + with patch.object(llm._client, "converse_stream") as mock_stream, \ + patch.object(llm._client, "converse") as mock_converse: + mock_stream.return_value = {"stream": iter(stream_events)} + mock_converse.return_value = final_response + + result = llm.call( + messages=[{"role": "user", "content": "What is the meaning of life?"}], + available_functions=available_functions, + ) + + assert len(captured_args) == 1 + assert captured_args[0] == {"query": "meaning of life"} + + +@pytest.mark.asyncio +async def test_async_streaming_tool_call_receives_parsed_arguments(): + """Async streaming variant of the #6149 fix. + + Mirrors ``test_streaming_tool_call_receives_parsed_arguments`` but + exercises ``_ahandle_streaming_converse``. + """ + llm = LLM( + model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0", + stream=True, + ) + + captured_args: list[dict] = [] + + def mock_tool(city: str) -> str: + captured_args.append({"city": city}) + return f"Weather in {city}: rainy" + + available_functions = {"get_weather": mock_tool} + + stream_events = _make_streaming_tool_events( + tool_name="get_weather", + tool_use_id="tool-async-456", + input_deltas=['{"city":', ' "London"}'], + ) + + final_response = { + "output": { + "message": { + "role": "assistant", + "content": [{"text": "It is rainy in London."}], + } + }, + "usage": {"inputTokens": 20, "outputTokens": 10, "totalTokens": 30}, + } + + async def async_stream_iter(events): + for e in events: + yield e + + from unittest.mock import AsyncMock + + mock_async_client = MagicMock() + mock_async_client.converse_stream = AsyncMock( + return_value={"stream": async_stream_iter(stream_events)} + ) + mock_async_client.converse = AsyncMock(return_value=final_response) + + async def fake_ensure(): + return mock_async_client + + async def fake_ahandle_converse(*args, **kwargs): + return "It is rainy in London." + + with patch.object(llm, "_ensure_async_client", side_effect=fake_ensure), \ + patch.object(llm, "_ahandle_converse", side_effect=fake_ahandle_converse): + messages = [{"role": "user", "content": "Weather in London?"}] + result = await llm.acall( + messages=messages, + available_functions=available_functions, + ) + + assert len(captured_args) == 1 + assert captured_args[0] == {"city": "London"}