mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-28 04:18:23 +00:00
fix(otel): correct streaming finish_reason + bedrock response_id semantics
Two correctness fixes uncovered while landing the OTel finish_reason +
response_id plumbing:
- LiteLLM streaming (sync + async): `stream_options={"include_usage": True}`
causes LiteLLM to emit a final usage-only chunk with `choices=[]`. The
post-loop `_extract_finish_reason_and_response_id(last_chunk)` silently
returned `(None, None)` because the last chunk has no choices, even though
earlier chunks carried `finish_reason="stop"`. Track both fields
incrementally inside the loop (mirroring how OpenAI/Gemini/Azure already
handle their native streams) and use the tracked values for the
LLMCallCompletedEvent emission and the partial-response error path.
- Bedrock Converse: `ResponseMetadata.RequestId` is an AWS infra trace id,
not a model-level response id (semantically different from OpenAI's
`chatcmpl-XXX`). Return None for `response_id` rather than mislead
downstream telemetry consumers. The audit-fix's async propagation chain
still works — None propagates through unchanged.
Adds `test_llm_streaming_finish_reason.py` pinning both the sync and async
LiteLLM streaming paths against the include_usage chunk shape.
This commit is contained in:
@@ -722,6 +722,11 @@ class LLM(BaseLLM):
|
||||
last_chunk = None
|
||||
chunk_count = 0
|
||||
usage_info = None
|
||||
# Tracked across the loop: LiteLLM with include_usage emits a final
|
||||
# usage-only chunk with empty choices, so the post-loop last_chunk has
|
||||
# no finish_reason. Capture both incrementally instead.
|
||||
stream_finish_reason: str | None = None
|
||||
stream_response_id: str | None = None
|
||||
|
||||
accumulated_tool_args: defaultdict[int, AccumulatedToolArgs] = defaultdict(
|
||||
AccumulatedToolArgs
|
||||
@@ -741,6 +746,14 @@ class LLM(BaseLLM):
|
||||
if isinstance(chunk, ModelResponseBase):
|
||||
response_id = chunk.id
|
||||
|
||||
chunk_finish, chunk_id = self._extract_finish_reason_and_response_id(
|
||||
chunk
|
||||
)
|
||||
if chunk_finish:
|
||||
stream_finish_reason = chunk_finish
|
||||
if chunk_id and not stream_response_id:
|
||||
stream_response_id = chunk_id
|
||||
|
||||
try:
|
||||
choices = None
|
||||
if isinstance(chunk, dict) and "choices" in chunk:
|
||||
@@ -913,7 +926,8 @@ class LLM(BaseLLM):
|
||||
return tool_calls_list
|
||||
|
||||
finish_reason, response_id_last = (
|
||||
self._extract_finish_reason_and_response_id(last_chunk)
|
||||
stream_finish_reason,
|
||||
stream_response_id,
|
||||
)
|
||||
|
||||
if not tool_calls or not available_functions:
|
||||
@@ -979,7 +993,8 @@ class LLM(BaseLLM):
|
||||
if full_response.strip():
|
||||
logging.warning(f"Returning partial response despite error: {e!s}")
|
||||
finish_reason, response_id_last = (
|
||||
self._extract_finish_reason_and_response_id(last_chunk)
|
||||
stream_finish_reason,
|
||||
stream_response_id,
|
||||
)
|
||||
self._handle_emit_call_events(
|
||||
response=full_response,
|
||||
@@ -1438,6 +1453,10 @@ class LLM(BaseLLM):
|
||||
params["stream_options"] = {"include_usage": True}
|
||||
response_id = None
|
||||
last_chunk: Any | None = None
|
||||
# See sync sibling: incrementally track finish_reason/response_id so the
|
||||
# usage-only final chunk doesn't wipe them.
|
||||
stream_finish_reason: str | None = None
|
||||
stream_response_id: str | None = None
|
||||
|
||||
try:
|
||||
async for chunk in await litellm.acompletion(**params):
|
||||
@@ -1446,6 +1465,14 @@ class LLM(BaseLLM):
|
||||
last_chunk = chunk
|
||||
response_id = chunk.id if isinstance(chunk, ModelResponseBase) else None
|
||||
|
||||
chunk_finish, chunk_id = self._extract_finish_reason_and_response_id(
|
||||
chunk
|
||||
)
|
||||
if chunk_finish:
|
||||
stream_finish_reason = chunk_finish
|
||||
if chunk_id and not stream_response_id:
|
||||
stream_response_id = chunk_id
|
||||
|
||||
try:
|
||||
choices = None
|
||||
if isinstance(chunk, dict) and "choices" in chunk:
|
||||
@@ -1553,7 +1580,8 @@ class LLM(BaseLLM):
|
||||
|
||||
usage_dict = self._usage_to_dict(usage_info)
|
||||
finish_reason, response_id_last = (
|
||||
self._extract_finish_reason_and_response_id(last_chunk)
|
||||
stream_finish_reason,
|
||||
stream_response_id,
|
||||
)
|
||||
self._handle_emit_call_events(
|
||||
response=full_response,
|
||||
@@ -1578,7 +1606,8 @@ class LLM(BaseLLM):
|
||||
raise
|
||||
if full_response:
|
||||
finish_reason, response_id_last = (
|
||||
self._extract_finish_reason_and_response_id(last_chunk)
|
||||
stream_finish_reason,
|
||||
stream_response_id,
|
||||
)
|
||||
self._handle_emit_call_events(
|
||||
response=full_response,
|
||||
|
||||
@@ -2018,22 +2018,20 @@ class BedrockCompletion(BaseLLM):
|
||||
def _extract_finish_reason_and_id(
|
||||
response: Any,
|
||||
) -> tuple[str | None, str | None]:
|
||||
"""Extract raw finish_reason (``stopReason``) and response_id
|
||||
(``ResponseMetadata.RequestId``) from a Bedrock Converse response
|
||||
dict. Defensive — returns (None, None) on any failure. Raw provider
|
||||
value is kept; downstream telemetry owns the OTel enum coercion.
|
||||
"""Extract raw finish_reason (``stopReason``) from a Bedrock Converse
|
||||
response dict. Defensive — returns (None, None) on any failure.
|
||||
|
||||
Bedrock Converse has no model-level response id; ResponseMetadata.RequestId
|
||||
is an AWS infra trace id (semantically different from OpenAI's chatcmpl-XXX),
|
||||
so we omit response_id rather than mislead downstream telemetry consumers.
|
||||
"""
|
||||
finish_reason: str | None = None
|
||||
response_id: str | None = None
|
||||
try:
|
||||
if isinstance(response, dict):
|
||||
finish_reason = response.get("stopReason")
|
||||
metadata = response.get("ResponseMetadata") or {}
|
||||
response_id = metadata.get("RequestId") if metadata else None
|
||||
except (AttributeError, KeyError, TypeError, IndexError):
|
||||
finish_reason = None
|
||||
response_id = None
|
||||
return finish_reason, response_id
|
||||
return finish_reason, None
|
||||
|
||||
def _handle_client_error(self, e: ClientError) -> str:
|
||||
"""Handle AWS ClientError with specific error codes and return error message."""
|
||||
|
||||
96
lib/crewai/tests/test_llm_streaming_finish_reason.py
Normal file
96
lib/crewai/tests/test_llm_streaming_finish_reason.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""Regression: LiteLLM emits a final usage-only chunk (choices=[]) when
|
||||
``stream_options.include_usage`` is set. The old post-loop
|
||||
``_extract_finish_reason_and_response_id(last_chunk)`` then silently returned
|
||||
(None, None). These tests pin that we capture finish_reason/response_id
|
||||
incrementally during the stream loop instead.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.events.event_bus import CrewAIEventsBus
|
||||
from crewai.events.types.llm_events import LLMCallCompletedEvent
|
||||
from crewai.llm import LLM
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_emit():
|
||||
with patch.object(CrewAIEventsBus, "emit") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
def _completed_event(mock_emit) -> LLMCallCompletedEvent:
|
||||
matches = [
|
||||
call.kwargs["event"]
|
||||
for call in mock_emit.call_args_list
|
||||
if isinstance(call.kwargs.get("event"), LLMCallCompletedEvent)
|
||||
]
|
||||
assert matches, "expected an LLMCallCompletedEvent to be emitted"
|
||||
assert len(matches) == 1, f"expected one completed event, got {len(matches)}"
|
||||
return matches[0]
|
||||
|
||||
|
||||
def _chunks_with_usage_tail() -> list[dict[str, Any]]:
|
||||
"""Three-chunk stream mirroring LiteLLM's include_usage behavior:
|
||||
two content chunks where the second carries finish_reason="stop",
|
||||
then a final usage-only chunk with choices=[]."""
|
||||
return [
|
||||
{
|
||||
"id": "chatcmpl-stream-1",
|
||||
"choices": [
|
||||
{"delta": {"content": "hi"}, "finish_reason": None}
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "chatcmpl-stream-1",
|
||||
"choices": [
|
||||
{"delta": {"content": " there"}, "finish_reason": "stop"}
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "chatcmpl-stream-1",
|
||||
"choices": [],
|
||||
"usage": {
|
||||
"prompt_tokens": 1,
|
||||
"completion_tokens": 2,
|
||||
"total_tokens": 3,
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def test_sync_stream_emits_finish_reason_and_response_id_from_loop(mock_emit):
|
||||
llm = LLM(model="gpt-4o-mini", is_litellm=True, stream=True)
|
||||
|
||||
with patch("crewai.llm.litellm.completion", return_value=iter(_chunks_with_usage_tail())):
|
||||
result = llm.call("anything")
|
||||
|
||||
assert result == "hi there"
|
||||
|
||||
event = _completed_event(mock_emit)
|
||||
assert event.finish_reason == "stop"
|
||||
assert event.response_id == "chatcmpl-stream-1"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_stream_emits_finish_reason_and_response_id_from_loop(mock_emit):
|
||||
llm = LLM(model="gpt-4o-mini", is_litellm=True, stream=True)
|
||||
|
||||
async def _aiter():
|
||||
for chunk in _chunks_with_usage_tail():
|
||||
yield chunk
|
||||
|
||||
async def _acompletion(*_args, **_kwargs):
|
||||
return _aiter()
|
||||
|
||||
with patch("crewai.llm.litellm.acompletion", side_effect=_acompletion):
|
||||
result = await llm.acall("anything")
|
||||
|
||||
assert result == "hi there"
|
||||
|
||||
event = _completed_event(mock_emit)
|
||||
assert event.finish_reason == "stop"
|
||||
assert event.response_id == "chatcmpl-stream-1"
|
||||
Reference in New Issue
Block a user