fix(otel): correct streaming finish_reason + bedrock response_id semantics

Two correctness fixes uncovered while landing the OTel finish_reason +
response_id plumbing:

- LiteLLM streaming (sync + async): `stream_options={"include_usage": True}`
  causes LiteLLM to emit a final usage-only chunk with `choices=[]`. The
  post-loop `_extract_finish_reason_and_response_id(last_chunk)` silently
  returned `(None, None)` because the last chunk has no choices, even though
  earlier chunks carried `finish_reason="stop"`. Track both fields
  incrementally inside the loop (mirroring how OpenAI/Gemini/Azure already
  handle their native streams) and use the tracked values for the
  LLMCallCompletedEvent emission and the partial-response error path.

- Bedrock Converse: `ResponseMetadata.RequestId` is an AWS infra trace id,
  not a model-level response id (semantically different from OpenAI's
  `chatcmpl-XXX`). Return None for `response_id` rather than mislead
  downstream telemetry consumers. The audit-fix's async propagation chain
  still works — None propagates through unchanged.

Adds `test_llm_streaming_finish_reason.py` pinning both the sync and async
LiteLLM streaming paths against the include_usage chunk shape.
This commit is contained in:
Lucas Gomide
2026-05-27 10:08:10 -03:00
parent 86f667bb3a
commit 49e5581b58
3 changed files with 136 additions and 13 deletions

View File

@@ -722,6 +722,11 @@ class LLM(BaseLLM):
last_chunk = None
chunk_count = 0
usage_info = None
# Tracked across the loop: LiteLLM with include_usage emits a final
# usage-only chunk with empty choices, so the post-loop last_chunk has
# no finish_reason. Capture both incrementally instead.
stream_finish_reason: str | None = None
stream_response_id: str | None = None
accumulated_tool_args: defaultdict[int, AccumulatedToolArgs] = defaultdict(
AccumulatedToolArgs
@@ -741,6 +746,14 @@ class LLM(BaseLLM):
if isinstance(chunk, ModelResponseBase):
response_id = chunk.id
chunk_finish, chunk_id = self._extract_finish_reason_and_response_id(
chunk
)
if chunk_finish:
stream_finish_reason = chunk_finish
if chunk_id and not stream_response_id:
stream_response_id = chunk_id
try:
choices = None
if isinstance(chunk, dict) and "choices" in chunk:
@@ -913,7 +926,8 @@ class LLM(BaseLLM):
return tool_calls_list
finish_reason, response_id_last = (
self._extract_finish_reason_and_response_id(last_chunk)
stream_finish_reason,
stream_response_id,
)
if not tool_calls or not available_functions:
@@ -979,7 +993,8 @@ class LLM(BaseLLM):
if full_response.strip():
logging.warning(f"Returning partial response despite error: {e!s}")
finish_reason, response_id_last = (
self._extract_finish_reason_and_response_id(last_chunk)
stream_finish_reason,
stream_response_id,
)
self._handle_emit_call_events(
response=full_response,
@@ -1438,6 +1453,10 @@ class LLM(BaseLLM):
params["stream_options"] = {"include_usage": True}
response_id = None
last_chunk: Any | None = None
# See sync sibling: incrementally track finish_reason/response_id so the
# usage-only final chunk doesn't wipe them.
stream_finish_reason: str | None = None
stream_response_id: str | None = None
try:
async for chunk in await litellm.acompletion(**params):
@@ -1446,6 +1465,14 @@ class LLM(BaseLLM):
last_chunk = chunk
response_id = chunk.id if isinstance(chunk, ModelResponseBase) else None
chunk_finish, chunk_id = self._extract_finish_reason_and_response_id(
chunk
)
if chunk_finish:
stream_finish_reason = chunk_finish
if chunk_id and not stream_response_id:
stream_response_id = chunk_id
try:
choices = None
if isinstance(chunk, dict) and "choices" in chunk:
@@ -1553,7 +1580,8 @@ class LLM(BaseLLM):
usage_dict = self._usage_to_dict(usage_info)
finish_reason, response_id_last = (
self._extract_finish_reason_and_response_id(last_chunk)
stream_finish_reason,
stream_response_id,
)
self._handle_emit_call_events(
response=full_response,
@@ -1578,7 +1606,8 @@ class LLM(BaseLLM):
raise
if full_response:
finish_reason, response_id_last = (
self._extract_finish_reason_and_response_id(last_chunk)
stream_finish_reason,
stream_response_id,
)
self._handle_emit_call_events(
response=full_response,

View File

@@ -2018,22 +2018,20 @@ class BedrockCompletion(BaseLLM):
def _extract_finish_reason_and_id(
response: Any,
) -> tuple[str | None, str | None]:
"""Extract raw finish_reason (``stopReason``) and response_id
(``ResponseMetadata.RequestId``) from a Bedrock Converse response
dict. Defensive — returns (None, None) on any failure. Raw provider
value is kept; downstream telemetry owns the OTel enum coercion.
"""Extract raw finish_reason (``stopReason``) from a Bedrock Converse
response dict. Defensive — returns (None, None) on any failure.
Bedrock Converse has no model-level response id; ResponseMetadata.RequestId
is an AWS infra trace id (semantically different from OpenAI's chatcmpl-XXX),
so we omit response_id rather than mislead downstream telemetry consumers.
"""
finish_reason: str | None = None
response_id: str | None = None
try:
if isinstance(response, dict):
finish_reason = response.get("stopReason")
metadata = response.get("ResponseMetadata") or {}
response_id = metadata.get("RequestId") if metadata else None
except (AttributeError, KeyError, TypeError, IndexError):
finish_reason = None
response_id = None
return finish_reason, response_id
return finish_reason, None
def _handle_client_error(self, e: ClientError) -> str:
"""Handle AWS ClientError with specific error codes and return error message."""

View File

@@ -0,0 +1,96 @@
"""Regression: LiteLLM emits a final usage-only chunk (choices=[]) when
``stream_options.include_usage`` is set. The old post-loop
``_extract_finish_reason_and_response_id(last_chunk)`` then silently returned
(None, None). These tests pin that we capture finish_reason/response_id
incrementally during the stream loop instead.
"""
from __future__ import annotations
from typing import Any
from unittest.mock import patch
import pytest
from crewai.events.event_bus import CrewAIEventsBus
from crewai.events.types.llm_events import LLMCallCompletedEvent
from crewai.llm import LLM
@pytest.fixture
def mock_emit():
with patch.object(CrewAIEventsBus, "emit") as mock:
yield mock
def _completed_event(mock_emit) -> LLMCallCompletedEvent:
matches = [
call.kwargs["event"]
for call in mock_emit.call_args_list
if isinstance(call.kwargs.get("event"), LLMCallCompletedEvent)
]
assert matches, "expected an LLMCallCompletedEvent to be emitted"
assert len(matches) == 1, f"expected one completed event, got {len(matches)}"
return matches[0]
def _chunks_with_usage_tail() -> list[dict[str, Any]]:
"""Three-chunk stream mirroring LiteLLM's include_usage behavior:
two content chunks where the second carries finish_reason="stop",
then a final usage-only chunk with choices=[]."""
return [
{
"id": "chatcmpl-stream-1",
"choices": [
{"delta": {"content": "hi"}, "finish_reason": None}
],
},
{
"id": "chatcmpl-stream-1",
"choices": [
{"delta": {"content": " there"}, "finish_reason": "stop"}
],
},
{
"id": "chatcmpl-stream-1",
"choices": [],
"usage": {
"prompt_tokens": 1,
"completion_tokens": 2,
"total_tokens": 3,
},
},
]
def test_sync_stream_emits_finish_reason_and_response_id_from_loop(mock_emit):
llm = LLM(model="gpt-4o-mini", is_litellm=True, stream=True)
with patch("crewai.llm.litellm.completion", return_value=iter(_chunks_with_usage_tail())):
result = llm.call("anything")
assert result == "hi there"
event = _completed_event(mock_emit)
assert event.finish_reason == "stop"
assert event.response_id == "chatcmpl-stream-1"
@pytest.mark.asyncio
async def test_async_stream_emits_finish_reason_and_response_id_from_loop(mock_emit):
llm = LLM(model="gpt-4o-mini", is_litellm=True, stream=True)
async def _aiter():
for chunk in _chunks_with_usage_tail():
yield chunk
async def _acompletion(*_args, **_kwargs):
return _aiter()
with patch("crewai.llm.litellm.acompletion", side_effect=_acompletion):
result = await llm.acall("anything")
assert result == "hi there"
event = _completed_event(mock_emit)
assert event.finish_reason == "stop"
assert event.response_id == "chatcmpl-stream-1"