mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-01 21:28:10 +00:00
refactor(otel): unify LLM event introspection + drop redundant defensive code
Three cohesion cleanups uncovered during PR review, all behavior-preserving: - LLM.call / LLM.acall in llm.py now delegate to BaseLLM._emit_call_started_event instead of constructing LLMCallStartedEvent inline. The base helper already introspects sampling params off self via getattr; the inline duplication was accidental, not justified, and a duplication risk if anyone adds a tenth OTel sampling param later. - Extracted lib/crewai/llms/_finish_reason_utils.py:extract_choices_finish_reason_and_id as the shared extractor for the choices-based response shape. OpenAI Chat, Azure, and LiteLLM all read the same shape (response.id + choices[0].finish_reason) as both object attrs and dict keys. Providers with genuinely different shapes - Anthropic (stop_reason), Bedrock (stopReason), Gemini (protobuf enum), OpenAI Responses (status) - keep their own provider-specific helpers. - Dropped redundant try/except (AttributeError, TypeError) wrappers around bare getattr(obj, "field", None) calls across the new extraction helpers. getattr with a default already suppresses AttributeError, and the inner isinstance / dict.get / int-coercion ops can't raise TypeError in practice. Kept the catches that legitimately guard against IndexError (e.g. choices[0] on an empty list). Tests: 600 passed, 23 skipped, 14 pre-existing multimodal failures unchanged. Added 12 parametrized tests for the shared helper covering object + dict shapes, missing fields, non-string coercion, and never-raises invariants.
This commit is contained in:
@@ -23,7 +23,6 @@ from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.llm_events import (
|
||||
LLMCallCompletedEvent,
|
||||
LLMCallFailedEvent,
|
||||
LLMCallStartedEvent,
|
||||
LLMCallType,
|
||||
LLMStreamChunkEvent,
|
||||
)
|
||||
@@ -32,6 +31,7 @@ from crewai.events.types.tool_usage_events import (
|
||||
ToolUsageFinishedEvent,
|
||||
ToolUsageStartedEvent,
|
||||
)
|
||||
from crewai.llms._finish_reason_utils import extract_choices_finish_reason_and_id
|
||||
from crewai.llms.base_llm import (
|
||||
BaseLLM,
|
||||
JsonResponseFormat,
|
||||
@@ -1744,28 +1744,14 @@ class LLM(BaseLLM):
|
||||
ValueError: If response format is not supported
|
||||
LLMContextLengthExceededError: If input exceeds model's context limit
|
||||
"""
|
||||
with llm_call_context() as call_id:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallStartedEvent(
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
callbacks=callbacks,
|
||||
available_functions=available_functions,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
model=self.model,
|
||||
call_id=call_id,
|
||||
temperature=getattr(self, "temperature", None),
|
||||
top_p=getattr(self, "top_p", None),
|
||||
max_tokens=getattr(self, "max_tokens", None),
|
||||
stream=getattr(self, "stream", None),
|
||||
seed=getattr(self, "seed", None),
|
||||
stop_sequences=getattr(self, "stop", None),
|
||||
frequency_penalty=getattr(self, "frequency_penalty", None),
|
||||
presence_penalty=getattr(self, "presence_penalty", None),
|
||||
n=getattr(self, "n", None),
|
||||
),
|
||||
with llm_call_context():
|
||||
self._emit_call_started_event(
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
callbacks=callbacks,
|
||||
available_functions=available_functions,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
)
|
||||
|
||||
self._validate_call_params()
|
||||
@@ -1897,28 +1883,14 @@ class LLM(BaseLLM):
|
||||
ValueError: If response format is not supported
|
||||
LLMContextLengthExceededError: If input exceeds model's context limit
|
||||
"""
|
||||
with llm_call_context() as call_id:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallStartedEvent(
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
callbacks=callbacks,
|
||||
available_functions=available_functions,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
model=self.model,
|
||||
call_id=call_id,
|
||||
temperature=getattr(self, "temperature", None),
|
||||
top_p=getattr(self, "top_p", None),
|
||||
max_tokens=getattr(self, "max_tokens", None),
|
||||
stream=getattr(self, "stream", None),
|
||||
seed=getattr(self, "seed", None),
|
||||
stop_sequences=getattr(self, "stop", None),
|
||||
frequency_penalty=getattr(self, "frequency_penalty", None),
|
||||
presence_penalty=getattr(self, "presence_penalty", None),
|
||||
n=getattr(self, "n", None),
|
||||
),
|
||||
with llm_call_context():
|
||||
self._emit_call_started_event(
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
callbacks=callbacks,
|
||||
available_functions=available_functions,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
)
|
||||
|
||||
self._validate_call_params()
|
||||
@@ -2065,43 +2037,10 @@ class LLM(BaseLLM):
|
||||
def _extract_finish_reason_and_response_id(
|
||||
response_or_chunk: Any,
|
||||
) -> tuple[str | None, str | None]:
|
||||
"""Extract raw finish_reason and response_id from a LiteLLM response
|
||||
or accumulated streaming chunk.
|
||||
|
||||
Defensive: never raises; returns (None, None) on any failure. Keeps
|
||||
raw provider values without coercion — downstream telemetry owns the
|
||||
OTel GenAI enum mapping.
|
||||
"""LiteLLM responses/chunks share the choices-shape with OpenAI/Azure;
|
||||
delegate to the shared extractor.
|
||||
"""
|
||||
def _as_str(value: Any) -> str | None:
|
||||
return value if isinstance(value, str) else None
|
||||
|
||||
finish_reason: str | None = None
|
||||
response_id: str | None = None
|
||||
try:
|
||||
raw_id = getattr(response_or_chunk, "id", None)
|
||||
if raw_id is None and isinstance(response_or_chunk, dict):
|
||||
raw_id = response_or_chunk.get("id")
|
||||
response_id = _as_str(raw_id)
|
||||
except (AttributeError, TypeError):
|
||||
response_id = None
|
||||
|
||||
try:
|
||||
choices = None
|
||||
if isinstance(response_or_chunk, dict) and "choices" in response_or_chunk:
|
||||
choices = response_or_chunk["choices"]
|
||||
else:
|
||||
choices = getattr(response_or_chunk, "choices", None)
|
||||
if choices:
|
||||
first = choices[0]
|
||||
if isinstance(first, dict):
|
||||
raw_finish = first.get("finish_reason")
|
||||
else:
|
||||
raw_finish = getattr(first, "finish_reason", None)
|
||||
finish_reason = _as_str(raw_finish)
|
||||
except (AttributeError, IndexError, TypeError, KeyError):
|
||||
finish_reason = None
|
||||
|
||||
return finish_reason, response_id
|
||||
return extract_choices_finish_reason_and_id(response_or_chunk)
|
||||
|
||||
def _process_message_files(self, messages: list[LLMMessage]) -> list[LLMMessage]:
|
||||
"""Process files attached to messages and format for provider.
|
||||
|
||||
54
lib/crewai/src/crewai/llms/_finish_reason_utils.py
Normal file
54
lib/crewai/src/crewai/llms/_finish_reason_utils.py
Normal file
@@ -0,0 +1,54 @@
|
||||
"""Shared extractors for ``finish_reason`` + ``response_id`` across LLM providers.
|
||||
|
||||
OpenAI Chat Completions, Azure AI Inference, and LiteLLM all expose the same
|
||||
choices-based response shape (``response.id`` + ``response.choices[0].finish_reason``),
|
||||
both as object attributes and (for LiteLLM stream chunks) as dict keys. This
|
||||
module centralises that introspection so every provider doesn't reinvent the
|
||||
defensive walk. Providers with genuinely different shapes — Anthropic
|
||||
(``stop_reason``), Bedrock (``stopReason``), Gemini (protobuf enum), OpenAI
|
||||
Responses (``status``) — keep their own helpers.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
def _as_str(value: Any) -> str | None:
|
||||
return value if isinstance(value, str) else None
|
||||
|
||||
|
||||
def extract_choices_finish_reason_and_id(
|
||||
response_or_chunk: Any,
|
||||
) -> tuple[str | None, str | None]:
|
||||
"""Extract ``(finish_reason, response_id)`` from a choices-shaped response.
|
||||
|
||||
Handles both object-style (``response.id``, ``response.choices[0].finish_reason``)
|
||||
and dict-style (``response["id"]``, ``response["choices"][0]["finish_reason"]``)
|
||||
inputs. Returns ``(None, None)`` on any failure; never raises. Non-string
|
||||
raw values are coerced to ``None`` so test mocks and exotic provider types
|
||||
(MagicMock, protobuf enums, etc.) don't propagate downstream.
|
||||
"""
|
||||
raw_id = getattr(response_or_chunk, "id", None)
|
||||
if raw_id is None and isinstance(response_or_chunk, dict):
|
||||
raw_id = response_or_chunk.get("id")
|
||||
response_id = _as_str(raw_id)
|
||||
|
||||
if isinstance(response_or_chunk, dict):
|
||||
choices = response_or_chunk.get("choices")
|
||||
else:
|
||||
choices = getattr(response_or_chunk, "choices", None)
|
||||
|
||||
finish_reason: str | None = None
|
||||
if choices:
|
||||
try:
|
||||
first = choices[0]
|
||||
except (IndexError, TypeError, KeyError):
|
||||
first = None
|
||||
if first is not None:
|
||||
if isinstance(first, dict):
|
||||
raw_finish = first.get("finish_reason")
|
||||
else:
|
||||
raw_finish = getattr(first, "finish_reason", None)
|
||||
finish_reason = _as_str(raw_finish)
|
||||
|
||||
return finish_reason, response_id
|
||||
@@ -1874,17 +1874,10 @@ class AnthropicCompletion(BaseLLM):
|
||||
``"end_turn"``, ``"max_tokens"``, ``"tool_use"``); we forward it raw
|
||||
and let downstream telemetry map to the OTel GenAI enum.
|
||||
"""
|
||||
finish_reason: str | None = None
|
||||
response_id: str | None = None
|
||||
try:
|
||||
response_id = getattr(message, "id", None)
|
||||
except (AttributeError, TypeError):
|
||||
response_id = None
|
||||
try:
|
||||
finish_reason = getattr(message, "stop_reason", None)
|
||||
except (AttributeError, TypeError):
|
||||
finish_reason = None
|
||||
return finish_reason, response_id
|
||||
return (
|
||||
getattr(message, "stop_reason", None),
|
||||
getattr(message, "id", None),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _extract_anthropic_token_usage(
|
||||
|
||||
@@ -9,6 +9,7 @@ from urllib.parse import urlparse
|
||||
from pydantic import BaseModel, PrivateAttr, model_validator
|
||||
from typing_extensions import Self
|
||||
|
||||
from crewai.llms._finish_reason_utils import extract_choices_finish_reason_and_id
|
||||
from crewai.llms.hooks.base import BaseInterceptor
|
||||
from crewai.utilities.agent_utils import is_context_length_exceeded
|
||||
from crewai.utilities.exceptions.context_window_exceeding_exception import (
|
||||
@@ -1317,24 +1318,10 @@ class AzureCompletion(BaseLLM):
|
||||
def _extract_finish_reason_and_id(
|
||||
response_or_update: Any,
|
||||
) -> tuple[str | None, str | None]:
|
||||
"""Extract raw finish_reason and response_id from an Azure
|
||||
``ChatCompletions`` or ``StreamingChatCompletionsUpdate`` object.
|
||||
Defensive — returns (None, None) on any failure. Raw provider value
|
||||
is kept; downstream telemetry owns OTel enum coercion.
|
||||
"""Azure ``ChatCompletions`` / ``StreamingChatCompletionsUpdate``
|
||||
share the choices-shape; delegate to the shared extractor.
|
||||
"""
|
||||
finish_reason: str | None = None
|
||||
response_id: str | None = None
|
||||
try:
|
||||
response_id = getattr(response_or_update, "id", None)
|
||||
except (AttributeError, TypeError):
|
||||
response_id = None
|
||||
try:
|
||||
choices = getattr(response_or_update, "choices", None)
|
||||
if choices:
|
||||
finish_reason = getattr(choices[0], "finish_reason", None)
|
||||
except (AttributeError, IndexError, TypeError):
|
||||
finish_reason = None
|
||||
return finish_reason, response_id
|
||||
return extract_choices_finish_reason_and_id(response_or_update)
|
||||
|
||||
@staticmethod
|
||||
def _extract_azure_token_usage(response: ChatCompletions) -> dict[str, Any]:
|
||||
|
||||
@@ -1385,22 +1385,18 @@ class GeminiCompletion(BaseLLM):
|
||||
``.name`` attribute (e.g. ``"STOP"``, ``"MAX_TOKENS"``); we forward
|
||||
it raw and let downstream telemetry map to the OTel GenAI enum.
|
||||
"""
|
||||
response_id = getattr(response, "response_id", None)
|
||||
finish_reason: str | None = None
|
||||
response_id: str | None = None
|
||||
try:
|
||||
response_id = getattr(response, "response_id", None)
|
||||
except (AttributeError, TypeError):
|
||||
response_id = None
|
||||
try:
|
||||
candidates = getattr(response, "candidates", None)
|
||||
if candidates:
|
||||
candidates = getattr(response, "candidates", None)
|
||||
if candidates:
|
||||
try:
|
||||
candidate_finish = getattr(candidates[0], "finish_reason", None)
|
||||
if candidate_finish is not None:
|
||||
finish_reason = getattr(candidate_finish, "name", None) or str(
|
||||
candidate_finish
|
||||
)
|
||||
except (AttributeError, IndexError, TypeError):
|
||||
finish_reason = None
|
||||
except (IndexError, TypeError, KeyError):
|
||||
candidate_finish = None
|
||||
if candidate_finish is not None:
|
||||
finish_reason = getattr(candidate_finish, "name", None) or str(
|
||||
candidate_finish
|
||||
)
|
||||
return finish_reason, response_id
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -29,6 +29,7 @@ from openai.types.responses import (
|
||||
from pydantic import BaseModel, PrivateAttr, model_validator
|
||||
|
||||
from crewai.events.types.llm_events import LLMCallType
|
||||
from crewai.llms._finish_reason_utils import extract_choices_finish_reason_and_id
|
||||
from crewai.llms.base_llm import BaseLLM, JsonResponseFormat, llm_call_context
|
||||
from crewai.llms.hooks.base import BaseInterceptor
|
||||
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
|
||||
@@ -2428,24 +2429,10 @@ class OpenAICompletion(BaseLLM):
|
||||
def _extract_chat_finish_reason_and_id(
|
||||
response: Any,
|
||||
) -> tuple[str | None, str | None]:
|
||||
"""Extract raw finish_reason and response_id from a ChatCompletion or
|
||||
ChatCompletionChunk-like object. Defensive — returns (None, None) on
|
||||
any failure. Raw provider value is kept; downstream telemetry owns
|
||||
OTel enum coercion.
|
||||
"""ChatCompletion / ChatCompletionChunk share the choices-shape;
|
||||
delegate to the shared extractor.
|
||||
"""
|
||||
finish_reason: str | None = None
|
||||
response_id: str | None = None
|
||||
try:
|
||||
response_id = getattr(response, "id", None)
|
||||
except (AttributeError, TypeError):
|
||||
response_id = None
|
||||
try:
|
||||
choices = getattr(response, "choices", None)
|
||||
if choices:
|
||||
finish_reason = getattr(choices[0], "finish_reason", None)
|
||||
except (AttributeError, IndexError, TypeError):
|
||||
finish_reason = None
|
||||
return finish_reason, response_id
|
||||
return extract_choices_finish_reason_and_id(response)
|
||||
|
||||
@staticmethod
|
||||
def _extract_responses_finish_reason_and_id(
|
||||
@@ -2455,17 +2442,10 @@ class OpenAICompletion(BaseLLM):
|
||||
API ``Response`` object. The Responses API exposes ``status`` rather
|
||||
than ``finish_reason``; we forward the raw status value.
|
||||
"""
|
||||
finish_reason: str | None = None
|
||||
response_id: str | None = None
|
||||
try:
|
||||
response_id = getattr(response, "id", None)
|
||||
except (AttributeError, TypeError):
|
||||
response_id = None
|
||||
try:
|
||||
finish_reason = getattr(response, "status", None)
|
||||
except (AttributeError, TypeError):
|
||||
finish_reason = None
|
||||
return finish_reason, response_id
|
||||
return (
|
||||
getattr(response, "status", None),
|
||||
getattr(response, "id", None),
|
||||
)
|
||||
|
||||
def _extract_openai_token_usage(
|
||||
self, response: ChatCompletion | ChatCompletionChunk
|
||||
|
||||
@@ -11,6 +11,7 @@ from crewai.events.types.llm_events import (
|
||||
LLMCallType,
|
||||
)
|
||||
from crewai.llm import LLM
|
||||
from crewai.llms._finish_reason_utils import extract_choices_finish_reason_and_id
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
|
||||
@@ -234,3 +235,60 @@ class TestLLMExtractFinishReasonAndResponseId:
|
||||
assert LLM._extract_finish_reason_and_response_id(None) == (None, None)
|
||||
assert LLM._extract_finish_reason_and_response_id(42) == (None, None)
|
||||
assert LLM._extract_finish_reason_and_response_id("string") == (None, None)
|
||||
|
||||
|
||||
class TestExtractChoicesFinishReasonAndIdHelper:
|
||||
# The shared extractor is consumed by LLM (LiteLLM), OpenAI Chat, and Azure.
|
||||
# TestLLMExtractFinishReasonAndResponseId exercises the choices-shape paths
|
||||
# transitively; these tests cover the direct-call surface and the
|
||||
# import contract.
|
||||
@pytest.mark.parametrize(
|
||||
"response, expected",
|
||||
[
|
||||
(
|
||||
SimpleNamespace(
|
||||
id="resp-1", choices=[SimpleNamespace(finish_reason="stop")]
|
||||
),
|
||||
("stop", "resp-1"),
|
||||
),
|
||||
(
|
||||
{"id": "resp-2", "choices": [{"finish_reason": "length"}]},
|
||||
("length", "resp-2"),
|
||||
),
|
||||
(
|
||||
SimpleNamespace(
|
||||
id="resp-3", choices=[{"finish_reason": "tool_calls"}]
|
||||
),
|
||||
("tool_calls", "resp-3"),
|
||||
),
|
||||
(
|
||||
{
|
||||
"id": "resp-4",
|
||||
"choices": [SimpleNamespace(finish_reason="content_filter")],
|
||||
},
|
||||
("content_filter", "resp-4"),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_extracts_choices_shape(
|
||||
self, response: Any, expected: tuple[str | None, str | None]
|
||||
) -> None:
|
||||
assert extract_choices_finish_reason_and_id(response) == expected
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"bad_input",
|
||||
[
|
||||
None,
|
||||
42,
|
||||
"string",
|
||||
{},
|
||||
SimpleNamespace(),
|
||||
SimpleNamespace(choices=[]),
|
||||
SimpleNamespace(choices=[SimpleNamespace()]),
|
||||
{"id": 12345, "choices": [{"finish_reason": MagicMock()}]},
|
||||
],
|
||||
)
|
||||
def test_never_raises_returns_nones_or_coerces(self, bad_input: Any) -> None:
|
||||
finish_reason, response_id = extract_choices_finish_reason_and_id(bad_input)
|
||||
assert finish_reason is None or isinstance(finish_reason, str)
|
||||
assert response_id is None or isinstance(response_id, str)
|
||||
|
||||
Reference in New Issue
Block a user