Compare commits

...

1 Commits

Author SHA1 Message Date
Joao Moura
42421740cf refactor: decouple internal plumbing from litellm (token counting, callbacks, feature detection, errors)
- Token counting: Make TokenCalcHandler standalone class that conditionally
  inherits from litellm.CustomLogger when litellm is available, works as
  plain object when not installed

- Callbacks: Guard set_callbacks() and set_env_callbacks() behind
  LITELLM_AVAILABLE checks - these only affect the litellm fallback path,
  native providers emit events via base_llm.py

- Feature detection: Guard supports_function_calling(), supports_stop_words(),
  and _validate_call_params() behind LITELLM_AVAILABLE checks with sensible
  defaults (True for function calling/stop words since all modern models
  support them)

- Error types: Replace litellm.exceptions.ContextWindowExceededError catches
  with pattern-based detection using LLMContextLengthExceededError._is_context_limit_error()

This decouples crewAI's internal infrastructure from litellm, allowing the
native providers (OpenAI, Anthropic, Azure, Bedrock, Gemini) to work without
litellm installed. The litellm fallback for niche providers still works when
litellm IS installed.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-24 07:21:08 -07:00
2 changed files with 121 additions and 65 deletions

View File

@@ -62,18 +62,6 @@ except ImportError:
if TYPE_CHECKING:
from litellm.exceptions import ContextWindowExceededError
from litellm.litellm_core_utils.get_supported_openai_params import (
get_supported_openai_params,
)
from litellm.types.utils import (
ChatCompletionDeltaToolCall,
Choices,
Function,
ModelResponse,
)
from litellm.utils import supports_response_schema
from crewai.agent.core import Agent
from crewai.llms.hooks.base import BaseInterceptor
from crewai.llms.providers.anthropic.completion import AnthropicThinkingConfig
@@ -83,8 +71,6 @@ if TYPE_CHECKING:
try:
import litellm
from litellm.exceptions import ContextWindowExceededError
from litellm.integrations.custom_logger import CustomLogger
from litellm.litellm_core_utils.get_supported_openai_params import (
get_supported_openai_params,
)
@@ -99,15 +85,13 @@ try:
LITELLM_AVAILABLE = True
except ImportError:
LITELLM_AVAILABLE = False
litellm = None # type: ignore
Choices = None # type: ignore
ContextWindowExceededError = Exception # type: ignore
get_supported_openai_params = None # type: ignore
ChatCompletionDeltaToolCall = None # type: ignore
Function = None # type: ignore
ModelResponse = None # type: ignore
supports_response_schema = None # type: ignore
CustomLogger = None # type: ignore
litellm = None # type: ignore[assignment]
Choices = None # type: ignore[assignment, misc]
get_supported_openai_params = None # type: ignore[assignment]
ChatCompletionDeltaToolCall = None # type: ignore[assignment, misc]
Function = None # type: ignore[assignment, misc]
ModelResponse = None # type: ignore[assignment, misc]
supports_response_schema = None # type: ignore[assignment]
load_dotenv()
@@ -1009,12 +993,15 @@ class LLM(BaseLLM):
)
return full_response
except ContextWindowExceededError as e:
# Catch context window errors from litellm and convert them to our own exception type.
# This exception is handled by CrewAgentExecutor._invoke_loop() which can then
# decide whether to summarize the content or abort based on the respect_context_window flag.
raise LLMContextLengthExceededError(str(e)) from e
except LLMContextLengthExceededError:
# Re-raise our own context length error
raise
except Exception as e:
# Check if this is a context window error and convert to our exception type
error_msg = str(e)
if LLMContextLengthExceededError._is_context_limit_error(error_msg):
raise LLMContextLengthExceededError(error_msg) from e
logging.error(f"Error in streaming response: {e!s}")
if full_response.strip():
logging.warning(f"Returning partial response despite error: {e!s}")
@@ -1195,10 +1182,15 @@ class LLM(BaseLLM):
usage_info = response.usage
self._track_token_usage_internal(usage_info)
except ContextWindowExceededError as e:
# Convert litellm's context window error to our own exception type
# for consistent handling in the rest of the codebase
raise LLMContextLengthExceededError(str(e)) from e
except LLMContextLengthExceededError:
# Re-raise our own context length error
raise
except Exception as e:
# Check if this is a context window error and convert to our exception type
error_msg = str(e)
if LLMContextLengthExceededError._is_context_limit_error(error_msg):
raise LLMContextLengthExceededError(error_msg) from e
raise
# --- 2) Handle structured output response (when response_model is provided)
if response_model is not None:
@@ -1330,8 +1322,15 @@ class LLM(BaseLLM):
usage_info = response.usage
self._track_token_usage_internal(usage_info)
except ContextWindowExceededError as e:
raise LLMContextLengthExceededError(str(e)) from e
except LLMContextLengthExceededError:
# Re-raise our own context length error
raise
except Exception as e:
# Check if this is a context window error and convert to our exception type
error_msg = str(e)
if LLMContextLengthExceededError._is_context_limit_error(error_msg):
raise LLMContextLengthExceededError(error_msg) from e
raise
if response_model is not None:
if isinstance(response, BaseModel):
@@ -1548,9 +1547,15 @@ class LLM(BaseLLM):
)
return full_response
except ContextWindowExceededError as e:
raise LLMContextLengthExceededError(str(e)) from e
except Exception:
except LLMContextLengthExceededError:
# Re-raise our own context length error
raise
except Exception as e:
# Check if this is a context window error and convert to our exception type
error_msg = str(e)
if LLMContextLengthExceededError._is_context_limit_error(error_msg):
raise LLMContextLengthExceededError(error_msg) from e
if chunk_count == 0:
raise
if full_response:
@@ -2157,7 +2162,15 @@ class LLM(BaseLLM):
- E.g., "openrouter/deepseek/deepseek-chat" yields "openrouter"
- "gemini/gemini-1.5-pro" yields "gemini"
- If no slash is present, "openai" is assumed.
Note: This validation only applies to the litellm fallback path.
Native providers have their own validation.
"""
if not LITELLM_AVAILABLE or supports_response_schema is None:
# When litellm is not available, skip validation
# (this path should only be reached for litellm fallback models)
return
provider = self._get_custom_llm_provider()
if self.response_format is not None and not supports_response_schema(
model=self.model,
@@ -2169,6 +2182,16 @@ class LLM(BaseLLM):
)
def supports_function_calling(self) -> bool:
"""Check if the model supports function calling.
Note: This method is only used by the litellm fallback path.
Native providers override this method with their own implementation.
"""
if not LITELLM_AVAILABLE:
# When litellm is not available, assume function calling is supported
# (all modern models support it)
return True
try:
provider = self._get_custom_llm_provider()
return litellm.utils.supports_function_calling(
@@ -2176,15 +2199,24 @@ class LLM(BaseLLM):
)
except Exception as e:
logging.error(f"Failed to check function calling support: {e!s}")
return False
return True # Default to True for modern models
def supports_stop_words(self) -> bool:
"""Check if the model supports stop words.
Note: This method is only used by the litellm fallback path.
Native providers override this method with their own implementation.
"""
if not LITELLM_AVAILABLE or get_supported_openai_params is None:
# When litellm is not available, assume stop words are supported
return True
try:
params = get_supported_openai_params(model=self.model)
return params is not None and "stop" in params
except Exception as e:
logging.error(f"Failed to get supported params: {e!s}")
return False
return True # Default to True
def get_context_window_size(self) -> int:
"""
@@ -2220,7 +2252,15 @@ class LLM(BaseLLM):
"""
Attempt to keep a single set of callbacks in litellm by removing old
duplicates and adding new ones.
Note: This only affects the litellm fallback path. Native providers
don't use litellm callbacks - they emit events via base_llm.py.
"""
if not LITELLM_AVAILABLE:
# When litellm is not available, callbacks are still stored
# but not registered with litellm globals
return
with suppress_warnings():
callback_types = [type(callback) for callback in callbacks]
for callback in litellm.success_callback[:]:
@@ -2245,6 +2285,9 @@ class LLM(BaseLLM):
If the environment variables are not set or are empty, the corresponding callback lists
will be set to empty lists.
Note: This only affects the litellm fallback path. Native providers
don't use litellm callbacks - they emit events via base_llm.py.
Examples:
LITELLM_SUCCESS_CALLBACKS="langfuse,langsmith"
LITELLM_FAILURE_CALLBACKS="langfuse"
@@ -2252,9 +2295,13 @@ class LLM(BaseLLM):
This will set `litellm.success_callback` to ["langfuse", "langsmith"] and
`litellm.failure_callback` to ["langfuse"].
"""
if not LITELLM_AVAILABLE:
# When litellm is not available, env callbacks have no effect
return
with suppress_warnings():
success_callbacks_str = os.environ.get("LITELLM_SUCCESS_CALLBACKS", "")
success_callbacks: list[str | Callable[..., Any] | CustomLogger] = []
success_callbacks: list[str | Callable[..., Any]] = []
if success_callbacks_str:
success_callbacks = [
cb.strip() for cb in success_callbacks_str.split(",") if cb.strip()
@@ -2262,7 +2309,7 @@ class LLM(BaseLLM):
failure_callbacks_str = os.environ.get("LITELLM_FAILURE_CALLBACKS", "")
if failure_callbacks_str:
failure_callbacks: list[str | Callable[..., Any] | CustomLogger] = [
failure_callbacks: list[str | Callable[..., Any]] = [
cb.strip() for cb in failure_callbacks_str.split(",") if cb.strip()
]

View File

@@ -1,37 +1,40 @@
"""Token counting callback handler for LLM interactions.
This module provides a callback handler that tracks token usage
for LLM API calls through the litellm library.
for LLM API calls. Works standalone and also integrates with litellm
when available (for the litellm fallback path).
"""
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from litellm.integrations.custom_logger import CustomLogger
from litellm.types.utils import Usage
else:
try:
from litellm.integrations.custom_logger import CustomLogger
from litellm.types.utils import Usage
except ImportError:
class CustomLogger:
"""Fallback CustomLogger when litellm is not available."""
class Usage:
"""Fallback Usage when litellm is not available."""
from typing import Any
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.utilities.logger_utils import suppress_warnings
class TokenCalcHandler(CustomLogger):
# Check if litellm is available for callback integration
try:
from litellm.integrations.custom_logger import CustomLogger as LiteLLMCustomLogger
LITELLM_AVAILABLE = True
except ImportError:
LiteLLMCustomLogger = None # type: ignore[misc, assignment]
LITELLM_AVAILABLE = False
# Create a base class that conditionally inherits from litellm's CustomLogger
# when available, or from object when not available
if LITELLM_AVAILABLE and LiteLLMCustomLogger is not None:
_BaseClass: type = LiteLLMCustomLogger
else:
_BaseClass = object
class TokenCalcHandler(_BaseClass): # type: ignore[misc]
"""Handler for calculating and tracking token usage in LLM calls.
This handler integrates with litellm's logging system to track
prompt tokens, completion tokens, and cached tokens across requests.
This handler tracks prompt tokens, completion tokens, and cached tokens
across requests. It works standalone and also integrates with litellm's
logging system when litellm is installed (for the fallback path).
Attributes:
token_cost_process: The token process tracker to accumulate usage metrics.
@@ -43,7 +46,9 @@ class TokenCalcHandler(CustomLogger):
Args:
token_cost_process: Optional token process tracker for accumulating metrics.
"""
super().__init__(**kwargs)
# Only call super().__init__ if we have a real parent class with __init__
if LITELLM_AVAILABLE and LiteLLMCustomLogger is not None:
super().__init__(**kwargs)
self.token_cost_process = token_cost_process
def log_success_event(
@@ -55,6 +60,10 @@ class TokenCalcHandler(CustomLogger):
) -> None:
"""Log successful LLM API call and track token usage.
This method has the same interface as litellm's CustomLogger.log_success_event()
so it can be used as a litellm callback when litellm is installed, or called
directly when litellm is not installed.
Args:
kwargs: The arguments passed to the LLM call.
response_obj: The response object from the LLM API.
@@ -66,7 +75,7 @@ class TokenCalcHandler(CustomLogger):
with suppress_warnings():
if isinstance(response_obj, dict) and "usage" in response_obj:
usage: Usage = response_obj["usage"]
usage = response_obj["usage"]
if usage:
self.token_cost_process.sum_successful_requests(1)
if hasattr(usage, "prompt_tokens"):