feat(otel): surface real finish_reason + sampling params + response.id on LLM events

Companion to the OTel GenAI emitter compliance work in crewai-enterprise
(CON-172). Today the enterprise emitter reads these fields off the OSS
LLM events via `getattr(..., None)`, so it produces valid (but partial)
spans against the existing OSS surface. This change makes those fields
first-class on the events so spans can carry the real provider data.

What this adds:

- `LLMCallStartedEvent` gains the sampling-param fields the emitter needs
  for `gen_ai.request.*`: `temperature`, `top_p`, `max_tokens`, `stream`,
  `seed`, `stop_sequences`, `frequency_penalty`, `presence_penalty`, `n`.
  All optional; existing call sites keep working.
- `BaseLLM._emit_call_started_event` introspects those values off `self`
  (the LLM instance) via `getattr(..., None)` so every provider gets the
  fields propagated for free without per-provider plumbing.
- `LLMCallCompletedEvent` gains `finish_reason: str | None` and
  `response_id: str | None`. A field validator coerces any non-string
  value (MagicMock, unexpected provider object) to None so the event
  never raises on construction.
- `LLM._emit_call_completed_event` accepts both as kwargs.
- `LLM` (LiteLLM path) gets a defensive `_extract_finish_reason_and_response_id`
  helper that handles both streaming (`StreamingChoices`) and non-streaming
  (`Choices`) shapes and is wired into every completion-event emission site.
- Provider completions extract native values from their SDK responses and
  pass them through:
  - OpenAI: `_extract_responses_finish_reason_and_id` for Responses-API,
    `_extract_finish_reason_and_id` for Chat-Completions.
  - Anthropic: `_extract_finish_reason_and_id` (Messages API + streaming).
  - Bedrock: `_extract_finish_reason_and_id` (`stopReason` from converse).
  - Gemini: `_extract_finish_reason_and_id` (`finish_reason` from candidates).
  - Azure: inherits via OpenAI sub-class; adds the helper for Azure-specific
    response shapes.
  - openai_compatible: inherits from OpenAICompletion, no edits needed.

Compatibility:

- All new fields are optional with sensible defaults. No existing call
  sites need to change.
- The validator on `LLMCallCompletedEvent` swallows non-string values for
  the new fields so legacy mocks / exotic provider types don't blow up
  event construction.
- Enterprise side already reads these fields defensively, so OSS and
  enterprise can merge independently and cut on the same synchronized
  release.

Tested against the full LLM + events + provider test suite — all green;
the 14 pre-existing multimodal failures on main are unrelated and
reproduce without this diff.
This commit is contained in:
Lucas Gomide
2026-05-26 20:09:50 -03:00
parent 2e36f06732
commit 44c95fbcb9
8 changed files with 644 additions and 10 deletions

View File

@@ -1,7 +1,7 @@
from enum import Enum
from typing import Any, Literal
from pydantic import BaseModel
from pydantic import BaseModel, field_validator
from crewai.events.base_events import BaseEvent
@@ -48,6 +48,17 @@ class LLMCallStartedEvent(LLMEventBase):
tools: list[dict[str, Any]] | None = None
callbacks: list[Any] | None = None
available_functions: dict[str, Any] | None = None
# Sampling/request parameters forwarded for OTel GenAI compliance.
# All optional so legacy emitters keep working unchanged.
temperature: float | None = None
top_p: float | None = None
max_tokens: int | None = None
stream: bool | None = None
seed: int | None = None
stop_sequences: list[str] | None = None
frequency_penalty: float | None = None
presence_penalty: float | None = None
n: int | None = None
class LLMCallCompletedEvent(LLMEventBase):
@@ -58,6 +69,23 @@ class LLMCallCompletedEvent(LLMEventBase):
response: Any
call_type: LLMCallType
usage: dict[str, Any] | None = None
finish_reason: str | None = None
response_id: str | None = None
@field_validator("finish_reason", "response_id", mode="before")
@classmethod
def _coerce_non_string_to_none(cls, value: Any) -> str | None:
"""Drop non-string values so test mocks and exotic provider types
(MagicMock, protobuf enums, etc.) never crash event construction.
Provider helpers are best-effort: when extraction returns something
non-string (e.g. a ``MagicMock`` in unit tests), we treat it as
"no value" rather than raising. Downstream telemetry already
handles the missing-attribute case.
"""
if value is None or isinstance(value, str):
return value
return None
class LLMCallFailedEvent(LLMEventBase):

View File

@@ -912,6 +912,10 @@ class LLM(BaseLLM):
if tool_calls_list:
return tool_calls_list
finish_reason, response_id_last = (
self._extract_finish_reason_and_response_id(last_chunk)
)
if not tool_calls or not available_functions:
if response_model and self.is_litellm:
instructor_instance = InternalInstructor(
@@ -929,6 +933,8 @@ class LLM(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage_dict,
finish_reason=finish_reason,
response_id=response_id_last,
)
return structured_response
@@ -940,6 +946,8 @@ class LLM(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage_dict,
finish_reason=finish_reason,
response_id=response_id_last,
)
return full_response
@@ -955,6 +963,8 @@ class LLM(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage_dict,
finish_reason=finish_reason,
response_id=response_id_last,
)
return full_response
@@ -968,6 +978,9 @@ class LLM(BaseLLM):
logging.error(f"Error in streaming response: {e!s}")
if full_response.strip():
logging.warning(f"Returning partial response despite error: {e!s}")
finish_reason, response_id_last = (
self._extract_finish_reason_and_response_id(last_chunk)
)
self._handle_emit_call_events(
response=full_response,
call_type=LLMCallType.LLM_CALL,
@@ -975,6 +988,8 @@ class LLM(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=self._usage_to_dict(usage_info),
finish_reason=finish_reason,
response_id=response_id_last,
)
return full_response
@@ -1159,6 +1174,10 @@ class LLM(BaseLLM):
else None
)
finish_reason, response_id = self._extract_finish_reason_and_response_id(
response
)
if response_model is not None:
# When using instructor/response_model, litellm returns a Pydantic model instance
if isinstance(response, BaseModel):
@@ -1170,6 +1189,8 @@ class LLM(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=response_usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_response
@@ -1206,6 +1227,8 @@ class LLM(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=response_usage,
finish_reason=finish_reason,
response_id=response_id,
)
return text_response
@@ -1223,6 +1246,8 @@ class LLM(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=response_usage,
finish_reason=finish_reason,
response_id=response_id,
)
return text_response
@@ -1300,6 +1325,10 @@ class LLM(BaseLLM):
else None
)
finish_reason, response_id = self._extract_finish_reason_and_response_id(
response
)
if response_model is not None:
if isinstance(response, BaseModel):
structured_response = response.model_dump_json()
@@ -1310,6 +1339,8 @@ class LLM(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=response_usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_response
@@ -1348,6 +1379,8 @@ class LLM(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=response_usage,
finish_reason=finish_reason,
response_id=response_id,
)
return text_response
@@ -1365,6 +1398,8 @@ class LLM(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=response_usage,
finish_reason=finish_reason,
response_id=response_id,
)
return text_response
@@ -1402,11 +1437,13 @@ class LLM(BaseLLM):
params["stream"] = True
params["stream_options"] = {"include_usage": True}
response_id = None
last_chunk: Any | None = None
try:
async for chunk in await litellm.acompletion(**params):
chunk_count += 1
chunk_content = None
last_chunk = chunk
response_id = chunk.id if isinstance(chunk, ModelResponseBase) else None
try:
@@ -1515,6 +1552,9 @@ class LLM(BaseLLM):
return tool_calls_list
usage_dict = self._usage_to_dict(usage_info)
finish_reason, response_id_last = (
self._extract_finish_reason_and_response_id(last_chunk)
)
self._handle_emit_call_events(
response=full_response,
call_type=LLMCallType.LLM_CALL,
@@ -1522,6 +1562,8 @@ class LLM(BaseLLM):
from_agent=from_agent,
messages=params.get("messages"),
usage=usage_dict,
finish_reason=finish_reason,
response_id=response_id_last or response_id,
)
return full_response
@@ -1535,6 +1577,9 @@ class LLM(BaseLLM):
if chunk_count == 0:
raise
if full_response:
finish_reason, response_id_last = (
self._extract_finish_reason_and_response_id(last_chunk)
)
self._handle_emit_call_events(
response=full_response,
call_type=LLMCallType.LLM_CALL,
@@ -1542,6 +1587,8 @@ class LLM(BaseLLM):
from_agent=from_agent,
messages=params.get("messages"),
usage=self._usage_to_dict(usage_info),
finish_reason=finish_reason,
response_id=response_id_last or response_id,
)
return full_response
raise
@@ -1680,6 +1727,15 @@ class LLM(BaseLLM):
from_agent=from_agent,
model=self.model,
call_id=call_id,
temperature=getattr(self, "temperature", None),
top_p=getattr(self, "top_p", None),
max_tokens=getattr(self, "max_tokens", None),
stream=getattr(self, "stream", None),
seed=getattr(self, "seed", None),
stop_sequences=getattr(self, "stop", None),
frequency_penalty=getattr(self, "frequency_penalty", None),
presence_penalty=getattr(self, "presence_penalty", None),
n=getattr(self, "n", None),
),
)
@@ -1824,6 +1880,15 @@ class LLM(BaseLLM):
from_agent=from_agent,
model=self.model,
call_id=call_id,
temperature=getattr(self, "temperature", None),
top_p=getattr(self, "top_p", None),
max_tokens=getattr(self, "max_tokens", None),
stream=getattr(self, "stream", None),
seed=getattr(self, "seed", None),
stop_sequences=getattr(self, "stop", None),
frequency_penalty=getattr(self, "frequency_penalty", None),
presence_penalty=getattr(self, "presence_penalty", None),
n=getattr(self, "n", None),
),
)
@@ -1934,6 +1999,8 @@ class LLM(BaseLLM):
from_agent: BaseAgent | None = None,
messages: str | list[LLMMessage] | None = None,
usage: dict[str, Any] | None = None,
finish_reason: str | None = None,
response_id: str | None = None,
) -> None:
"""Handle the events for the LLM call.
@@ -1944,6 +2011,10 @@ class LLM(BaseLLM):
from_agent: Optional agent object
messages: Optional messages object
usage: Optional token usage data
finish_reason: Raw provider finish reason (e.g. "stop", "length",
"tool_calls"). Optional; downstream telemetry coerces to the
OTel GenAI enum.
response_id: Raw provider response identifier. Optional.
"""
crewai_event_bus.emit(
self,
@@ -1956,9 +2027,53 @@ class LLM(BaseLLM):
model=self.model,
call_id=get_current_call_id(),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
),
)
@staticmethod
def _extract_finish_reason_and_response_id(
response_or_chunk: Any,
) -> tuple[str | None, str | None]:
"""Extract raw finish_reason and response_id from a LiteLLM response
or accumulated streaming chunk.
Defensive: never raises; returns (None, None) on any failure. Keeps
raw provider values without coercion — downstream telemetry owns the
OTel GenAI enum mapping.
"""
def _as_str(value: Any) -> str | None:
return value if isinstance(value, str) else None
finish_reason: str | None = None
response_id: str | None = None
try:
raw_id = getattr(response_or_chunk, "id", None)
if raw_id is None and isinstance(response_or_chunk, dict):
raw_id = response_or_chunk.get("id")
response_id = _as_str(raw_id)
except (AttributeError, TypeError):
response_id = None
try:
choices = None
if isinstance(response_or_chunk, dict) and "choices" in response_or_chunk:
choices = response_or_chunk["choices"]
else:
choices = getattr(response_or_chunk, "choices", None)
if choices:
first = choices[0]
if isinstance(first, dict):
raw_finish = first.get("finish_reason")
else:
raw_finish = getattr(first, "finish_reason", None)
finish_reason = _as_str(raw_finish)
except (AttributeError, IndexError, TypeError, KeyError):
finish_reason = None
return finish_reason, response_id
def _process_message_files(self, messages: list[LLMMessage]) -> list[LLMMessage]:
"""Process files attached to messages and format for provider.

View File

@@ -472,10 +472,47 @@ class BaseLLM(BaseModel, ABC):
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: BaseAgent | None = None,
temperature: float | None = None,
top_p: float | None = None,
max_tokens: int | None = None,
stream: bool | None = None,
seed: int | None = None,
stop_sequences: list[str] | None = None,
frequency_penalty: float | None = None,
presence_penalty: float | None = None,
n: int | None = None,
) -> None:
"""Emit LLM call started event."""
"""Emit LLM call started event.
Sampling params default to introspecting ``self`` (``self.temperature``,
``self.top_p``, ``self.stop`` -> ``stop_sequences``, ...) so providers
don't need to thread them through every emission site. Explicit
kwargs override the introspection.
"""
from crewai.utilities.serialization import to_serializable
if temperature is None:
temperature = getattr(self, "temperature", None)
if top_p is None:
top_p = getattr(self, "top_p", None)
if max_tokens is None:
max_tokens = getattr(self, "max_tokens", None)
if stream is None:
stream = getattr(self, "stream", None)
if seed is None:
seed = getattr(self, "seed", None)
if stop_sequences is None:
stop_attr = getattr(self, "stop", None) or getattr(
self, "stop_sequences", None
)
stop_sequences = stop_attr or None
if frequency_penalty is None:
frequency_penalty = getattr(self, "frequency_penalty", None)
if presence_penalty is None:
presence_penalty = getattr(self, "presence_penalty", None)
if n is None:
n = getattr(self, "n", None)
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
@@ -487,6 +524,15 @@ class BaseLLM(BaseModel, ABC):
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
stream=stream,
seed=seed,
stop_sequences=stop_sequences,
frequency_penalty=frequency_penalty,
presence_penalty=presence_penalty,
n=n,
),
)
@@ -498,6 +544,8 @@ class BaseLLM(BaseModel, ABC):
from_agent: BaseAgent | None = None,
messages: str | list[LLMMessage] | None = None,
usage: dict[str, Any] | None = None,
finish_reason: str | None = None,
response_id: str | None = None,
) -> None:
"""Emit LLM call completed event."""
from crewai.utilities.serialization import to_serializable
@@ -513,6 +561,8 @@ class BaseLLM(BaseModel, ABC):
model=self.model,
call_id=get_current_call_id(),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
),
)

View File

@@ -923,6 +923,8 @@ class AnthropicCompletion(BaseLLM):
usage = self._extract_anthropic_token_usage(response)
self._track_token_usage_internal(usage)
finish_reason, response_id = self._extract_finish_reason_and_id(response)
if _is_pydantic_model_class(response_model) and response.content:
if use_native_structured_output:
for block in response.content:
@@ -935,6 +937,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_data
else:
@@ -951,6 +955,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_data
@@ -973,6 +979,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return list(tool_uses)
@@ -1005,6 +1013,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
if usage.get("total_tokens", 0) > 0:
@@ -1147,6 +1157,10 @@ class AnthropicCompletion(BaseLLM):
usage = self._extract_anthropic_token_usage(final_message)
self._track_token_usage_internal(usage)
finish_reason, final_response_id = self._extract_finish_reason_and_id(
final_message
)
if _is_pydantic_model_class(response_model):
if use_native_structured_output:
structured_data = response_model.model_validate_json(full_response)
@@ -1157,6 +1171,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=final_response_id,
)
return structured_data
for block in final_message.content:
@@ -1172,6 +1188,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=final_response_id,
)
return structured_data
@@ -1201,6 +1219,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=final_response_id,
)
return self._invoke_after_llm_call_hooks(
@@ -1361,6 +1381,10 @@ class AnthropicCompletion(BaseLLM):
final_content = self._apply_stop_words(final_content)
finish_reason, final_response_id = self._extract_finish_reason_and_id(
final_response
)
self._emit_call_completed_event(
response=final_content,
call_type=LLMCallType.LLM_CALL,
@@ -1368,6 +1392,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=follow_up_params["messages"],
usage=follow_up_usage,
finish_reason=finish_reason,
response_id=final_response_id,
)
total_usage = {
@@ -1447,6 +1473,8 @@ class AnthropicCompletion(BaseLLM):
usage = self._extract_anthropic_token_usage(response)
self._track_token_usage_internal(usage)
finish_reason, response_id = self._extract_finish_reason_and_id(response)
if _is_pydantic_model_class(response_model) and response.content:
if use_native_structured_output:
for block in response.content:
@@ -1459,6 +1487,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_data
else:
@@ -1475,6 +1505,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_data
@@ -1495,6 +1527,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return list(tool_uses)
@@ -1519,6 +1553,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
if usage.get("total_tokens", 0) > 0:
@@ -1647,6 +1683,10 @@ class AnthropicCompletion(BaseLLM):
usage = self._extract_anthropic_token_usage(final_message)
self._track_token_usage_internal(usage)
finish_reason, final_response_id = self._extract_finish_reason_and_id(
final_message
)
if _is_pydantic_model_class(response_model):
if use_native_structured_output:
structured_data = response_model.model_validate_json(full_response)
@@ -1657,6 +1697,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=final_response_id,
)
return structured_data
for block in final_message.content:
@@ -1672,6 +1714,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=final_response_id,
)
return structured_data
@@ -1701,6 +1745,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=final_response_id,
)
return full_response
@@ -1753,6 +1799,10 @@ class AnthropicCompletion(BaseLLM):
final_content = self._apply_stop_words(final_content)
finish_reason, final_response_id = self._extract_finish_reason_and_id(
final_response
)
self._emit_call_completed_event(
response=final_content,
call_type=LLMCallType.LLM_CALL,
@@ -1760,6 +1810,8 @@ class AnthropicCompletion(BaseLLM):
from_agent=from_agent,
messages=follow_up_params["messages"],
usage=follow_up_usage,
finish_reason=finish_reason,
response_id=final_response_id,
)
total_usage = {
@@ -1813,6 +1865,27 @@ class AnthropicCompletion(BaseLLM):
return int(200000 * CONTEXT_WINDOW_USAGE_RATIO)
@staticmethod
def _extract_finish_reason_and_id(
message: Any,
) -> tuple[str | None, str | None]:
"""Extract raw finish_reason and response_id from an Anthropic
``Message`` / ``BetaMessage``. Anthropic exposes ``stop_reason`` (e.g.
``"end_turn"``, ``"max_tokens"``, ``"tool_use"``); we forward it raw
and let downstream telemetry map to the OTel GenAI enum.
"""
finish_reason: str | None = None
response_id: str | None = None
try:
response_id = getattr(message, "id", None)
except (AttributeError, TypeError):
response_id = None
try:
finish_reason = getattr(message, "stop_reason", None)
except (AttributeError, TypeError):
finish_reason = None
return finish_reason, response_id
@staticmethod
def _extract_anthropic_token_usage(
response: Message | BetaMessage,

View File

@@ -783,6 +783,8 @@ class AzureCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
usage: dict[str, Any] | None = None,
finish_reason: str | None = None,
response_id: str | None = None,
) -> BaseModel:
"""Validate content against response model and emit completion event.
@@ -792,6 +794,8 @@ class AzureCompletion(BaseLLM):
params: Completion parameters containing messages
from_task: Task that initiated the call
from_agent: Agent that initiated the call
finish_reason: Raw provider finish reason.
response_id: Raw provider response id.
Returns:
Validated Pydantic model instance
@@ -809,6 +813,8 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_data
@@ -848,6 +854,8 @@ class AzureCompletion(BaseLLM):
usage = self._extract_azure_token_usage(response)
self._track_token_usage_internal(usage)
finish_reason, response_id = self._extract_finish_reason_and_id(response)
# Without available_functions, return tool_calls so the caller (executor) handles execution
if message.tool_calls and not available_functions:
self._emit_call_completed_event(
@@ -857,6 +865,8 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return list(message.tool_calls)
@@ -892,6 +902,8 @@ class AzureCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
content = self._apply_stop_words(content)
@@ -903,6 +915,8 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return self._invoke_after_llm_call_hooks(
@@ -1011,6 +1025,8 @@ class AzureCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
finish_reason: str | None = None,
response_id: str | None = None,
) -> str | Any:
"""Finalize streaming response with usage tracking, tool execution, and events.
@@ -1039,6 +1055,8 @@ class AzureCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
usage=usage_data,
finish_reason=finish_reason,
response_id=response_id,
)
# Without available_functions, return tool calls in OpenAI-compatible format for the executor
@@ -1061,6 +1079,8 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage_data,
finish_reason=finish_reason,
response_id=response_id,
)
return formatted_tool_calls
@@ -1094,6 +1114,8 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage_data,
finish_reason=finish_reason,
response_id=response_id,
)
return self._invoke_after_llm_call_hooks(
@@ -1113,6 +1135,8 @@ class AzureCompletion(BaseLLM):
tool_calls: dict[int, dict[str, Any]] = {}
usage_data: dict[str, Any] | None = None
stream_finish_reason: str | None = None
stream_response_id: str | None = None
for update in self._get_sync_client().complete(**params):
if isinstance(update, StreamingChatCompletionsUpdate):
if update.usage:
@@ -1124,6 +1148,12 @@ class AzureCompletion(BaseLLM):
}
continue
chunk_finish, chunk_id = self._extract_finish_reason_and_id(update)
if chunk_finish:
stream_finish_reason = chunk_finish
if chunk_id:
stream_response_id = chunk_id
full_response = self._process_streaming_update(
update=update,
full_response=full_response,
@@ -1141,6 +1171,8 @@ class AzureCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
finish_reason=stream_finish_reason,
response_id=stream_response_id,
)
async def _ahandle_completion(
@@ -1180,6 +1212,8 @@ class AzureCompletion(BaseLLM):
tool_calls: dict[int, dict[str, Any]] = {}
usage_data: dict[str, Any] | None = None
stream_finish_reason: str | None = None
stream_response_id: str | None = None
stream = await self._get_async_client().complete(**params)
async for update in stream:
@@ -1193,6 +1227,12 @@ class AzureCompletion(BaseLLM):
}
continue
chunk_finish, chunk_id = self._extract_finish_reason_and_id(update)
if chunk_finish:
stream_finish_reason = chunk_finish
if chunk_id:
stream_response_id = chunk_id
full_response = self._process_streaming_update(
update=update,
full_response=full_response,
@@ -1210,6 +1250,8 @@ class AzureCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
finish_reason=stream_finish_reason,
response_id=stream_response_id,
)
def supports_function_calling(self) -> bool:
@@ -1271,6 +1313,29 @@ class AzureCompletion(BaseLLM):
return int(8192 * CONTEXT_WINDOW_USAGE_RATIO)
@staticmethod
def _extract_finish_reason_and_id(
response_or_update: Any,
) -> tuple[str | None, str | None]:
"""Extract raw finish_reason and response_id from an Azure
``ChatCompletions`` or ``StreamingChatCompletionsUpdate`` object.
Defensive — returns (None, None) on any failure. Raw provider value
is kept; downstream telemetry owns OTel enum coercion.
"""
finish_reason: str | None = None
response_id: str | None = None
try:
response_id = getattr(response_or_update, "id", None)
except (AttributeError, TypeError):
response_id = None
try:
choices = getattr(response_or_update, "choices", None)
if choices:
finish_reason = getattr(choices[0], "finish_reason", None)
except (AttributeError, IndexError, TypeError):
finish_reason = None
return finish_reason, response_id
@staticmethod
def _extract_azure_token_usage(response: ChatCompletions) -> dict[str, Any]:
"""Extract token usage and response metadata from Azure response."""

View File

@@ -677,7 +677,7 @@ class BedrockCompletion(BaseLLM):
if usage:
self._track_token_usage_internal(usage)
stop_reason = response.get("stopReason")
stop_reason, response_id = self._extract_finish_reason_and_id(response)
if stop_reason:
logging.debug(f"Response stop reason: {stop_reason}")
if stop_reason == "max_tokens":
@@ -716,6 +716,8 @@ class BedrockCompletion(BaseLLM):
from_agent=from_agent,
messages=messages,
usage=usage,
finish_reason=stop_reason,
response_id=response_id,
)
return result
except Exception as e:
@@ -738,6 +740,8 @@ class BedrockCompletion(BaseLLM):
from_agent=from_agent,
messages=messages,
usage=usage,
finish_reason=stop_reason,
response_id=response_id,
)
return non_structured_output_tool_uses
@@ -812,6 +816,8 @@ class BedrockCompletion(BaseLLM):
from_agent=from_agent,
messages=messages,
usage=usage,
finish_reason=stop_reason,
response_id=response_id,
)
return self._invoke_after_llm_call_hooks(
@@ -951,7 +957,9 @@ class BedrockCompletion(BaseLLM):
)
stream = response.get("stream")
response_id = None
_, stream_response_id = self._extract_finish_reason_and_id(response)
response_id = stream_response_id
stream_finish_reason: str | None = None
if stream:
for event in stream:
if "messageStart" in event:
@@ -1049,6 +1057,8 @@ class BedrockCompletion(BaseLLM):
from_agent=from_agent,
messages=messages,
usage=usage_data,
finish_reason=stream_finish_reason,
response_id=response_id,
)
return result # type: ignore[return-value]
except Exception as e:
@@ -1102,6 +1112,7 @@ class BedrockCompletion(BaseLLM):
tool_use_id = None
elif "messageStop" in event:
stop_reason = event["messageStop"].get("stopReason")
stream_finish_reason = stop_reason
logging.debug(f"Streaming message stopped: {stop_reason}")
if stop_reason == "max_tokens":
logging.warning(
@@ -1147,6 +1158,8 @@ class BedrockCompletion(BaseLLM):
from_agent=from_agent,
messages=messages,
usage=usage_data,
finish_reason=stream_finish_reason,
response_id=response_id,
)
return full_response
@@ -1262,7 +1275,7 @@ class BedrockCompletion(BaseLLM):
if usage:
self._track_token_usage_internal(usage)
stop_reason = response.get("stopReason")
stop_reason, response_id = self._extract_finish_reason_and_id(response)
if stop_reason:
logging.debug(f"Response stop reason: {stop_reason}")
if stop_reason == "max_tokens":
@@ -1300,6 +1313,8 @@ class BedrockCompletion(BaseLLM):
from_agent=from_agent,
messages=messages,
usage=usage,
finish_reason=stop_reason,
response_id=response_id,
)
return result
except Exception as e:
@@ -1322,6 +1337,8 @@ class BedrockCompletion(BaseLLM):
from_agent=from_agent,
messages=messages,
usage=usage,
finish_reason=stop_reason,
response_id=response_id,
)
return non_structured_output_tool_uses
@@ -1988,6 +2005,27 @@ class BedrockCompletion(BaseLLM):
return config
@staticmethod
def _extract_finish_reason_and_id(
response: Any,
) -> tuple[str | None, str | None]:
"""Extract raw finish_reason (``stopReason``) and response_id
(``ResponseMetadata.RequestId``) from a Bedrock Converse response
dict. Defensive — returns (None, None) on any failure. Raw provider
value is kept; downstream telemetry owns the OTel enum coercion.
"""
finish_reason: str | None = None
response_id: str | None = None
try:
if isinstance(response, dict):
finish_reason = response.get("stopReason")
metadata = response.get("ResponseMetadata") or {}
response_id = metadata.get("RequestId") if metadata else None
except (AttributeError, KeyError, TypeError, IndexError):
finish_reason = None
response_id = None
return finish_reason, response_id
def _handle_client_error(self, e: ClientError) -> str:
"""Handle AWS ClientError with specific error codes and return error message."""
error_code = e.response.get("Error", {}).get("Code", "Unknown")

View File

@@ -682,6 +682,8 @@ class GeminiCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
usage: dict[str, Any] | None = None,
finish_reason: str | None = None,
response_id: str | None = None,
) -> BaseModel:
"""Validate content against response model and emit completion event.
@@ -691,6 +693,8 @@ class GeminiCompletion(BaseLLM):
messages_for_event: Messages to include in event
from_task: Task that initiated the call
from_agent: Agent that initiated the call
finish_reason: Raw provider finish reason.
response_id: Raw provider response id.
Returns:
Validated Pydantic model instance
@@ -708,6 +712,8 @@ class GeminiCompletion(BaseLLM):
from_agent=from_agent,
messages=messages_for_event,
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_data
@@ -724,6 +730,8 @@ class GeminiCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
usage: dict[str, Any] | None = None,
finish_reason: str | None = None,
response_id: str | None = None,
) -> str | BaseModel:
"""Finalize completion response with validation and event emission.
@@ -747,6 +755,8 @@ class GeminiCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
self._emit_call_completed_event(
@@ -756,6 +766,8 @@ class GeminiCompletion(BaseLLM):
from_agent=from_agent,
messages=messages_for_event,
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return self._invoke_after_llm_call_hooks(
@@ -770,6 +782,8 @@ class GeminiCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
usage: dict[str, Any] | None = None,
finish_reason: str | None = None,
response_id: str | None = None,
) -> BaseModel:
"""Validate and emit event for structured_output tool call.
@@ -795,6 +809,8 @@ class GeminiCompletion(BaseLLM):
from_agent=from_agent,
messages=self._convert_contents_to_dict(contents),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return validated_data
except Exception as e:
@@ -828,6 +844,8 @@ class GeminiCompletion(BaseLLM):
Returns:
Final response content or function call result
"""
finish_reason, response_id = self._extract_finish_reason_and_id(response)
if response.candidates and (self.tools or available_functions):
candidate = response.candidates[0]
if candidate.content and candidate.content.parts:
@@ -854,6 +872,8 @@ class GeminiCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
non_structured_output_parts = [
@@ -875,6 +895,8 @@ class GeminiCompletion(BaseLLM):
from_agent=from_agent,
messages=self._convert_contents_to_dict(contents),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return non_structured_output_parts
@@ -915,6 +937,8 @@ class GeminiCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
def _process_stream_chunk(
@@ -925,7 +949,13 @@ class GeminiCompletion(BaseLLM):
usage_data: dict[str, int] | None,
from_task: Any | None = None,
from_agent: Any | None = None,
) -> tuple[str, dict[int, dict[str, Any]], dict[str, int] | None]:
) -> tuple[
str,
dict[int, dict[str, Any]],
dict[str, int] | None,
str | None,
str | None,
]:
"""Process a single streaming chunk.
Args:
@@ -937,9 +967,13 @@ class GeminiCompletion(BaseLLM):
from_agent: Agent that initiated the call
Returns:
Tuple of (updated full_response, updated function_calls, updated usage_data)
Tuple of (updated full_response, updated function_calls, updated
usage_data, chunk finish_reason, chunk response_id).
"""
response_id = chunk.response_id if hasattr(chunk, "response_id") else None
chunk_finish_reason, chunk_response_id = self._extract_finish_reason_and_id(
chunk
)
if chunk.usage_metadata:
usage_data = self._extract_token_usage(chunk)
@@ -996,7 +1030,13 @@ class GeminiCompletion(BaseLLM):
response_id=response_id,
)
return full_response, function_calls, usage_data
return (
full_response,
function_calls,
usage_data,
chunk_finish_reason,
chunk_response_id,
)
def _finalize_streaming_response(
self,
@@ -1008,6 +1048,8 @@ class GeminiCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
finish_reason: str | None = None,
response_id: str | None = None,
) -> str | BaseModel | list[dict[str, Any]]:
"""Finalize streaming response with usage tracking, function execution, and events.
@@ -1038,6 +1080,8 @@ class GeminiCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
usage=usage_data,
finish_reason=finish_reason,
response_id=response_id,
)
non_structured_output_calls = {
@@ -1058,6 +1102,8 @@ class GeminiCompletion(BaseLLM):
from_agent=from_agent,
messages=self._convert_contents_to_dict(contents),
usage=usage_data,
finish_reason=finish_reason,
response_id=response_id,
)
return raw_parts
@@ -1095,6 +1141,8 @@ class GeminiCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
usage=usage_data,
finish_reason=finish_reason,
response_id=response_id,
)
def _handle_completion(
@@ -1148,6 +1196,8 @@ class GeminiCompletion(BaseLLM):
full_response = ""
function_calls: dict[int, dict[str, Any]] = {}
usage_data: dict[str, int] | None = None
stream_finish_reason: str | None = None
stream_response_id: str | None = None
# The API accepts list[Content] but mypy is overly strict about variance
contents_for_api: Any = contents
@@ -1156,7 +1206,13 @@ class GeminiCompletion(BaseLLM):
contents=contents_for_api,
config=config,
):
full_response, function_calls, usage_data = self._process_stream_chunk(
(
full_response,
function_calls,
usage_data,
chunk_finish_reason,
chunk_response_id,
) = self._process_stream_chunk(
chunk=chunk,
full_response=full_response,
function_calls=function_calls,
@@ -1164,6 +1220,10 @@ class GeminiCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
)
if chunk_finish_reason:
stream_finish_reason = chunk_finish_reason
if chunk_response_id:
stream_response_id = chunk_response_id
return self._finalize_streaming_response(
full_response=full_response,
@@ -1174,6 +1234,8 @@ class GeminiCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
finish_reason=stream_finish_reason,
response_id=stream_response_id,
)
async def _ahandle_completion(
@@ -1227,6 +1289,8 @@ class GeminiCompletion(BaseLLM):
full_response = ""
function_calls: dict[int, dict[str, Any]] = {}
usage_data: dict[str, int] | None = None
stream_finish_reason: str | None = None
stream_response_id: str | None = None
# The API accepts list[Content] but mypy is overly strict about variance
contents_for_api: Any = contents
@@ -1236,7 +1300,13 @@ class GeminiCompletion(BaseLLM):
config=config,
)
async for chunk in stream:
full_response, function_calls, usage_data = self._process_stream_chunk(
(
full_response,
function_calls,
usage_data,
chunk_finish_reason,
chunk_response_id,
) = self._process_stream_chunk(
chunk=chunk,
full_response=full_response,
function_calls=function_calls,
@@ -1244,6 +1314,10 @@ class GeminiCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
)
if chunk_finish_reason:
stream_finish_reason = chunk_finish_reason
if chunk_response_id:
stream_response_id = chunk_response_id
return self._finalize_streaming_response(
full_response=full_response,
@@ -1254,6 +1328,8 @@ class GeminiCompletion(BaseLLM):
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
finish_reason=stream_finish_reason,
response_id=stream_response_id,
)
def supports_function_calling(self) -> bool:
@@ -1300,6 +1376,33 @@ class GeminiCompletion(BaseLLM):
return int(1048576 * CONTEXT_WINDOW_USAGE_RATIO) # 1M tokens default
@staticmethod
def _extract_finish_reason_and_id(
response: Any,
) -> tuple[str | None, str | None]:
"""Extract raw finish_reason and response_id from a Gemini
``GenerateContentResponse``. ``finish_reason`` is the protobuf enum's
``.name`` attribute (e.g. ``"STOP"``, ``"MAX_TOKENS"``); we forward
it raw and let downstream telemetry map to the OTel GenAI enum.
"""
finish_reason: str | None = None
response_id: str | None = None
try:
response_id = getattr(response, "response_id", None)
except (AttributeError, TypeError):
response_id = None
try:
candidates = getattr(response, "candidates", None)
if candidates:
candidate_finish = getattr(candidates[0], "finish_reason", None)
if candidate_finish is not None:
finish_reason = getattr(candidate_finish, "name", None) or str(
candidate_finish
)
except (AttributeError, IndexError, TypeError):
finish_reason = None
return finish_reason, response_id
@staticmethod
def _extract_token_usage(response: GenerateContentResponse) -> dict[str, Any]:
"""Extract token usage and response metadata from Gemini response."""

View File

@@ -825,6 +825,10 @@ class OpenAICompletion(BaseLLM):
usage = self._extract_responses_token_usage(response)
self._track_token_usage_internal(usage)
finish_reason, response_id = self._extract_responses_finish_reason_and_id(
response
)
if self.parse_tool_outputs:
parsed_result = self._extract_builtin_tool_outputs(response)
parsed_result.text = self._apply_stop_words(parsed_result.text)
@@ -836,6 +840,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return parsed_result
@@ -849,6 +855,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return function_calls
@@ -887,6 +895,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_result
except ValueError as e:
@@ -901,6 +911,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
content = self._invoke_after_llm_call_hooks(
@@ -960,6 +972,10 @@ class OpenAICompletion(BaseLLM):
usage = self._extract_responses_token_usage(response)
self._track_token_usage_internal(usage)
finish_reason, response_id = self._extract_responses_finish_reason_and_id(
response
)
if self.parse_tool_outputs:
parsed_result = self._extract_builtin_tool_outputs(response)
parsed_result.text = self._apply_stop_words(parsed_result.text)
@@ -971,6 +987,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return parsed_result
@@ -984,6 +1002,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return function_calls
@@ -1022,6 +1042,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_result
except ValueError as e:
@@ -1036,6 +1058,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
except NotFoundError as e:
@@ -1123,6 +1147,12 @@ class OpenAICompletion(BaseLLM):
usage = self._extract_responses_token_usage(event.response)
self._track_token_usage_internal(usage)
finish_reason, response_id = (
self._extract_responses_finish_reason_and_id(final_response)
if final_response is not None
else (None, response_id_stream)
)
if self.parse_tool_outputs and final_response:
parsed_result = self._extract_builtin_tool_outputs(final_response)
parsed_result.text = self._apply_stop_words(parsed_result.text)
@@ -1134,6 +1164,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return parsed_result
@@ -1171,6 +1203,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_result
except ValueError as e:
@@ -1185,6 +1219,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return self._invoke_after_llm_call_hooks(
@@ -1248,6 +1284,12 @@ class OpenAICompletion(BaseLLM):
usage = self._extract_responses_token_usage(event.response)
self._track_token_usage_internal(usage)
finish_reason, response_id = (
self._extract_responses_finish_reason_and_id(final_response)
if final_response is not None
else (None, response_id_stream)
)
if self.parse_tool_outputs and final_response:
parsed_result = self._extract_builtin_tool_outputs(final_response)
parsed_result.text = self._apply_stop_words(parsed_result.text)
@@ -1259,6 +1301,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return parsed_result
@@ -1296,6 +1340,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_result
except ValueError as e:
@@ -1310,6 +1356,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params.get("input", []),
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return full_response
@@ -1603,6 +1651,9 @@ class OpenAICompletion(BaseLLM):
usage = self._extract_openai_token_usage(parsed_response)
self._track_token_usage_internal(usage)
parsed_finish_reason, parsed_response_id = (
self._extract_chat_finish_reason_and_id(parsed_response)
)
parsed_object = parsed_response.choices[0].message.parsed
if parsed_object:
self._emit_call_completed_event(
@@ -1612,6 +1663,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=parsed_finish_reason,
response_id=parsed_response_id,
)
return parsed_object
@@ -1625,6 +1678,9 @@ class OpenAICompletion(BaseLLM):
choice: Choice = response.choices[0]
message = choice.message
finish_reason, response_id = self._extract_chat_finish_reason_and_id(
response
)
# Without available_functions, return tool_calls so the caller (executor) handles execution
if message.tool_calls and not available_functions:
@@ -1635,6 +1691,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return list(message.tool_calls)
@@ -1675,6 +1733,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_result
except ValueError as e:
@@ -1689,6 +1749,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
if usage.get("total_tokens", 0) > 0:
@@ -1734,6 +1796,8 @@ class OpenAICompletion(BaseLLM):
available_functions: dict[str, Any] | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
finish_reason: str | None = None,
response_id: str | None = None,
) -> str | list[dict[str, Any]]:
"""Finalize a streaming response with usage tracking, tool call handling, and events.
@@ -1745,6 +1809,9 @@ class OpenAICompletion(BaseLLM):
available_functions: Available functions for tool calling.
from_task: Task that initiated the call.
from_agent: Agent that initiated the call.
finish_reason: Raw provider finish reason (e.g. "stop", "length",
"tool_calls") extracted from the last streaming chunk.
response_id: Raw provider response id from any chunk.
Returns:
Tool calls list when tools were invoked without available_functions,
@@ -1774,6 +1841,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage_data,
finish_reason=finish_reason,
response_id=response_id,
)
return tool_calls_list
@@ -1817,6 +1886,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage_data,
finish_reason=finish_reason,
response_id=response_id,
)
return full_response
@@ -1861,6 +1932,9 @@ class OpenAICompletion(BaseLLM):
if final_completion:
usage = self._extract_openai_token_usage(final_completion)
self._track_token_usage_internal(usage)
parsed_finish_reason, parsed_response_id = (
self._extract_chat_finish_reason_and_id(final_completion)
)
if final_completion.choices:
parsed_result = final_completion.choices[0].message.parsed
if parsed_result:
@@ -1871,6 +1945,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=parsed_finish_reason,
response_id=parsed_response_id,
)
return parsed_result
@@ -1882,11 +1958,15 @@ class OpenAICompletion(BaseLLM):
)
usage_data: dict[str, Any] | None = None
stream_finish_reason: str | None = None
stream_response_id: str | None = None
for completion_chunk in completion_stream:
response_id_stream = (
completion_chunk.id if hasattr(completion_chunk, "id") else None
)
if response_id_stream:
stream_response_id = response_id_stream
if hasattr(completion_chunk, "usage") and completion_chunk.usage:
usage_data = self._extract_openai_token_usage(completion_chunk)
@@ -1897,6 +1977,9 @@ class OpenAICompletion(BaseLLM):
choice = completion_chunk.choices[0]
chunk_delta: ChoiceDelta = choice.delta
chunk_finish = getattr(choice, "finish_reason", None)
if chunk_finish:
stream_finish_reason = chunk_finish
if chunk_delta.content:
full_response += chunk_delta.content
@@ -1954,6 +2037,8 @@ class OpenAICompletion(BaseLLM):
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
finish_reason=stream_finish_reason,
response_id=stream_response_id,
)
if isinstance(result, str):
return self._invoke_after_llm_call_hooks(
@@ -1989,6 +2074,9 @@ class OpenAICompletion(BaseLLM):
usage = self._extract_openai_token_usage(parsed_response)
self._track_token_usage_internal(usage)
parsed_finish_reason, parsed_response_id = (
self._extract_chat_finish_reason_and_id(parsed_response)
)
parsed_object = parsed_response.choices[0].message.parsed
if parsed_object:
self._emit_call_completed_event(
@@ -1998,6 +2086,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=parsed_finish_reason,
response_id=parsed_response_id,
)
return parsed_object
@@ -2011,6 +2101,9 @@ class OpenAICompletion(BaseLLM):
choice: Choice = response.choices[0]
message = choice.message
finish_reason, response_id = self._extract_chat_finish_reason_and_id(
response
)
# Without available_functions, return tool_calls so the caller (executor) handles execution
if message.tool_calls and not available_functions:
@@ -2021,6 +2114,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return list(message.tool_calls)
@@ -2065,6 +2160,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_result
except ValueError as e:
@@ -2079,6 +2176,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage,
finish_reason=finish_reason,
response_id=response_id,
)
if usage.get("total_tokens", 0) > 0:
@@ -2130,8 +2229,12 @@ class OpenAICompletion(BaseLLM):
accumulated_content = ""
usage_data: dict[str, Any] | None = None
parsed_stream_finish_reason: str | None = None
parsed_stream_response_id: str | None = None
async for chunk in completion_stream:
response_id_stream = chunk.id if hasattr(chunk, "id") else None
if response_id_stream:
parsed_stream_response_id = response_id_stream
if hasattr(chunk, "usage") and chunk.usage:
usage_data = self._extract_openai_token_usage(chunk)
@@ -2142,6 +2245,9 @@ class OpenAICompletion(BaseLLM):
choice = chunk.choices[0]
delta: ChoiceDelta = choice.delta
chunk_finish = getattr(choice, "finish_reason", None)
if chunk_finish:
parsed_stream_finish_reason = chunk_finish
if delta.content:
accumulated_content += delta.content
@@ -2165,6 +2271,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage_data,
finish_reason=parsed_stream_finish_reason,
response_id=parsed_stream_response_id,
)
return parsed_object
@@ -2177,6 +2285,8 @@ class OpenAICompletion(BaseLLM):
from_agent=from_agent,
messages=params["messages"],
usage=usage_data,
finish_reason=parsed_stream_finish_reason,
response_id=parsed_stream_response_id,
)
return accumulated_content
@@ -2185,9 +2295,13 @@ class OpenAICompletion(BaseLLM):
] = await self._get_async_client().chat.completions.create(**params)
usage_data = None
stream_finish_reason: str | None = None
stream_response_id: str | None = None
async for chunk in stream:
response_id_stream = chunk.id if hasattr(chunk, "id") else None
if response_id_stream:
stream_response_id = response_id_stream
if hasattr(chunk, "usage") and chunk.usage:
usage_data = self._extract_openai_token_usage(chunk)
@@ -2198,6 +2312,9 @@ class OpenAICompletion(BaseLLM):
choice = chunk.choices[0]
chunk_delta: ChoiceDelta = choice.delta
chunk_finish = getattr(choice, "finish_reason", None)
if chunk_finish:
stream_finish_reason = chunk_finish
if chunk_delta.content:
full_response += chunk_delta.content
@@ -2255,6 +2372,8 @@ class OpenAICompletion(BaseLLM):
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
finish_reason=stream_finish_reason,
response_id=stream_response_id,
)
def supports_function_calling(self) -> bool:
@@ -2305,6 +2424,49 @@ class OpenAICompletion(BaseLLM):
return int(8192 * CONTEXT_WINDOW_USAGE_RATIO)
@staticmethod
def _extract_chat_finish_reason_and_id(
response: Any,
) -> tuple[str | None, str | None]:
"""Extract raw finish_reason and response_id from a ChatCompletion or
ChatCompletionChunk-like object. Defensive — returns (None, None) on
any failure. Raw provider value is kept; downstream telemetry owns
OTel enum coercion.
"""
finish_reason: str | None = None
response_id: str | None = None
try:
response_id = getattr(response, "id", None)
except (AttributeError, TypeError):
response_id = None
try:
choices = getattr(response, "choices", None)
if choices:
finish_reason = getattr(choices[0], "finish_reason", None)
except (AttributeError, IndexError, TypeError):
finish_reason = None
return finish_reason, response_id
@staticmethod
def _extract_responses_finish_reason_and_id(
response: Any,
) -> tuple[str | None, str | None]:
"""Extract finish_reason and response_id from an OpenAI Responses
API ``Response`` object. The Responses API exposes ``status`` rather
than ``finish_reason``; we forward the raw status value.
"""
finish_reason: str | None = None
response_id: str | None = None
try:
response_id = getattr(response, "id", None)
except (AttributeError, TypeError):
response_id = None
try:
finish_reason = getattr(response, "status", None)
except (AttributeError, TypeError):
finish_reason = None
return finish_reason, response_id
def _extract_openai_token_usage(
self, response: ChatCompletion | ChatCompletionChunk
) -> dict[str, Any]: