diff --git a/lib/crewai/src/crewai/llm.py b/lib/crewai/src/crewai/llm.py index dd48988dc..1c7e7e5af 100644 --- a/lib/crewai/src/crewai/llm.py +++ b/lib/crewai/src/crewai/llm.py @@ -722,6 +722,11 @@ class LLM(BaseLLM): last_chunk = None chunk_count = 0 usage_info = None + # Tracked across the loop: LiteLLM with include_usage emits a final + # usage-only chunk with empty choices, so the post-loop last_chunk has + # no finish_reason. Capture both incrementally instead. + stream_finish_reason: str | None = None + stream_response_id: str | None = None accumulated_tool_args: defaultdict[int, AccumulatedToolArgs] = defaultdict( AccumulatedToolArgs @@ -741,6 +746,14 @@ class LLM(BaseLLM): if isinstance(chunk, ModelResponseBase): response_id = chunk.id + chunk_finish, chunk_id = self._extract_finish_reason_and_response_id( + chunk + ) + if chunk_finish: + stream_finish_reason = chunk_finish + if chunk_id and not stream_response_id: + stream_response_id = chunk_id + try: choices = None if isinstance(chunk, dict) and "choices" in chunk: @@ -913,7 +926,8 @@ class LLM(BaseLLM): return tool_calls_list finish_reason, response_id_last = ( - self._extract_finish_reason_and_response_id(last_chunk) + stream_finish_reason, + stream_response_id, ) if not tool_calls or not available_functions: @@ -979,7 +993,8 @@ class LLM(BaseLLM): if full_response.strip(): logging.warning(f"Returning partial response despite error: {e!s}") finish_reason, response_id_last = ( - self._extract_finish_reason_and_response_id(last_chunk) + stream_finish_reason, + stream_response_id, ) self._handle_emit_call_events( response=full_response, @@ -1438,6 +1453,10 @@ class LLM(BaseLLM): params["stream_options"] = {"include_usage": True} response_id = None last_chunk: Any | None = None + # See sync sibling: incrementally track finish_reason/response_id so the + # usage-only final chunk doesn't wipe them. + stream_finish_reason: str | None = None + stream_response_id: str | None = None try: async for chunk in await litellm.acompletion(**params): @@ -1446,6 +1465,14 @@ class LLM(BaseLLM): last_chunk = chunk response_id = chunk.id if isinstance(chunk, ModelResponseBase) else None + chunk_finish, chunk_id = self._extract_finish_reason_and_response_id( + chunk + ) + if chunk_finish: + stream_finish_reason = chunk_finish + if chunk_id and not stream_response_id: + stream_response_id = chunk_id + try: choices = None if isinstance(chunk, dict) and "choices" in chunk: @@ -1553,7 +1580,8 @@ class LLM(BaseLLM): usage_dict = self._usage_to_dict(usage_info) finish_reason, response_id_last = ( - self._extract_finish_reason_and_response_id(last_chunk) + stream_finish_reason, + stream_response_id, ) self._handle_emit_call_events( response=full_response, @@ -1578,7 +1606,8 @@ class LLM(BaseLLM): raise if full_response: finish_reason, response_id_last = ( - self._extract_finish_reason_and_response_id(last_chunk) + stream_finish_reason, + stream_response_id, ) self._handle_emit_call_events( response=full_response, diff --git a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py index 14e36d944..a15a4e2aa 100644 --- a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py +++ b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py @@ -2018,22 +2018,20 @@ class BedrockCompletion(BaseLLM): def _extract_finish_reason_and_id( response: Any, ) -> tuple[str | None, str | None]: - """Extract raw finish_reason (``stopReason``) and response_id - (``ResponseMetadata.RequestId``) from a Bedrock Converse response - dict. Defensive — returns (None, None) on any failure. Raw provider - value is kept; downstream telemetry owns the OTel enum coercion. + """Extract raw finish_reason (``stopReason``) from a Bedrock Converse + response dict. Defensive — returns (None, None) on any failure. + + Bedrock Converse has no model-level response id; ResponseMetadata.RequestId + is an AWS infra trace id (semantically different from OpenAI's chatcmpl-XXX), + so we omit response_id rather than mislead downstream telemetry consumers. """ finish_reason: str | None = None - response_id: str | None = None try: if isinstance(response, dict): finish_reason = response.get("stopReason") - metadata = response.get("ResponseMetadata") or {} - response_id = metadata.get("RequestId") if metadata else None except (AttributeError, KeyError, TypeError, IndexError): finish_reason = None - response_id = None - return finish_reason, response_id + return finish_reason, None def _handle_client_error(self, e: ClientError) -> str: """Handle AWS ClientError with specific error codes and return error message.""" diff --git a/lib/crewai/tests/test_llm_streaming_finish_reason.py b/lib/crewai/tests/test_llm_streaming_finish_reason.py new file mode 100644 index 000000000..ff8a94d4e --- /dev/null +++ b/lib/crewai/tests/test_llm_streaming_finish_reason.py @@ -0,0 +1,96 @@ +"""Regression: LiteLLM emits a final usage-only chunk (choices=[]) when +``stream_options.include_usage`` is set. The old post-loop +``_extract_finish_reason_and_response_id(last_chunk)`` then silently returned +(None, None). These tests pin that we capture finish_reason/response_id +incrementally during the stream loop instead. +""" +from __future__ import annotations + +from typing import Any +from unittest.mock import patch + +import pytest + +from crewai.events.event_bus import CrewAIEventsBus +from crewai.events.types.llm_events import LLMCallCompletedEvent +from crewai.llm import LLM + + +@pytest.fixture +def mock_emit(): + with patch.object(CrewAIEventsBus, "emit") as mock: + yield mock + + +def _completed_event(mock_emit) -> LLMCallCompletedEvent: + matches = [ + call.kwargs["event"] + for call in mock_emit.call_args_list + if isinstance(call.kwargs.get("event"), LLMCallCompletedEvent) + ] + assert matches, "expected an LLMCallCompletedEvent to be emitted" + assert len(matches) == 1, f"expected one completed event, got {len(matches)}" + return matches[0] + + +def _chunks_with_usage_tail() -> list[dict[str, Any]]: + """Three-chunk stream mirroring LiteLLM's include_usage behavior: + two content chunks where the second carries finish_reason="stop", + then a final usage-only chunk with choices=[].""" + return [ + { + "id": "chatcmpl-stream-1", + "choices": [ + {"delta": {"content": "hi"}, "finish_reason": None} + ], + }, + { + "id": "chatcmpl-stream-1", + "choices": [ + {"delta": {"content": " there"}, "finish_reason": "stop"} + ], + }, + { + "id": "chatcmpl-stream-1", + "choices": [], + "usage": { + "prompt_tokens": 1, + "completion_tokens": 2, + "total_tokens": 3, + }, + }, + ] + + +def test_sync_stream_emits_finish_reason_and_response_id_from_loop(mock_emit): + llm = LLM(model="gpt-4o-mini", is_litellm=True, stream=True) + + with patch("crewai.llm.litellm.completion", return_value=iter(_chunks_with_usage_tail())): + result = llm.call("anything") + + assert result == "hi there" + + event = _completed_event(mock_emit) + assert event.finish_reason == "stop" + assert event.response_id == "chatcmpl-stream-1" + + +@pytest.mark.asyncio +async def test_async_stream_emits_finish_reason_and_response_id_from_loop(mock_emit): + llm = LLM(model="gpt-4o-mini", is_litellm=True, stream=True) + + async def _aiter(): + for chunk in _chunks_with_usage_tail(): + yield chunk + + async def _acompletion(*_args, **_kwargs): + return _aiter() + + with patch("crewai.llm.litellm.acompletion", side_effect=_acompletion): + result = await llm.acall("anything") + + assert result == "hi there" + + event = _completed_event(mock_emit) + assert event.finish_reason == "stop" + assert event.response_id == "chatcmpl-stream-1"