mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-09 20:42:35 +00:00
refactor: decouple internal plumbing from litellm (token counting, callbacks, feature detection, errors)
- Token counting: Make TokenCalcHandler standalone class that conditionally inherits from litellm.CustomLogger when litellm is available, works as plain object when not installed - Callbacks: Guard set_callbacks() and set_env_callbacks() behind LITELLM_AVAILABLE checks - these only affect the litellm fallback path, native providers emit events via base_llm.py - Feature detection: Guard supports_function_calling(), supports_stop_words(), and _validate_call_params() behind LITELLM_AVAILABLE checks with sensible defaults (True for function calling/stop words since all modern models support them) - Error types: Replace litellm.exceptions.ContextWindowExceededError catches with pattern-based detection using LLMContextLengthExceededError._is_context_limit_error() This decouples crewAI's internal infrastructure from litellm, allowing the native providers (OpenAI, Anthropic, Azure, Bedrock, Gemini) to work without litellm installed. The litellm fallback for niche providers still works when litellm IS installed. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -62,18 +62,6 @@ except ImportError:
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from litellm.exceptions import ContextWindowExceededError
|
||||
from litellm.litellm_core_utils.get_supported_openai_params import (
|
||||
get_supported_openai_params,
|
||||
)
|
||||
from litellm.types.utils import (
|
||||
ChatCompletionDeltaToolCall,
|
||||
Choices,
|
||||
Function,
|
||||
ModelResponse,
|
||||
)
|
||||
from litellm.utils import supports_response_schema
|
||||
|
||||
from crewai.agent.core import Agent
|
||||
from crewai.llms.hooks.base import BaseInterceptor
|
||||
from crewai.llms.providers.anthropic.completion import AnthropicThinkingConfig
|
||||
@@ -83,8 +71,6 @@ if TYPE_CHECKING:
|
||||
|
||||
try:
|
||||
import litellm
|
||||
from litellm.exceptions import ContextWindowExceededError
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.litellm_core_utils.get_supported_openai_params import (
|
||||
get_supported_openai_params,
|
||||
)
|
||||
@@ -99,15 +85,13 @@ try:
|
||||
LITELLM_AVAILABLE = True
|
||||
except ImportError:
|
||||
LITELLM_AVAILABLE = False
|
||||
litellm = None # type: ignore
|
||||
Choices = None # type: ignore
|
||||
ContextWindowExceededError = Exception # type: ignore
|
||||
get_supported_openai_params = None # type: ignore
|
||||
ChatCompletionDeltaToolCall = None # type: ignore
|
||||
Function = None # type: ignore
|
||||
ModelResponse = None # type: ignore
|
||||
supports_response_schema = None # type: ignore
|
||||
CustomLogger = None # type: ignore
|
||||
litellm = None # type: ignore[assignment]
|
||||
Choices = None # type: ignore[assignment, misc]
|
||||
get_supported_openai_params = None # type: ignore[assignment]
|
||||
ChatCompletionDeltaToolCall = None # type: ignore[assignment, misc]
|
||||
Function = None # type: ignore[assignment, misc]
|
||||
ModelResponse = None # type: ignore[assignment, misc]
|
||||
supports_response_schema = None # type: ignore[assignment]
|
||||
|
||||
|
||||
load_dotenv()
|
||||
@@ -1009,12 +993,15 @@ class LLM(BaseLLM):
|
||||
)
|
||||
return full_response
|
||||
|
||||
except ContextWindowExceededError as e:
|
||||
# Catch context window errors from litellm and convert them to our own exception type.
|
||||
# This exception is handled by CrewAgentExecutor._invoke_loop() which can then
|
||||
# decide whether to summarize the content or abort based on the respect_context_window flag.
|
||||
raise LLMContextLengthExceededError(str(e)) from e
|
||||
except LLMContextLengthExceededError:
|
||||
# Re-raise our own context length error
|
||||
raise
|
||||
except Exception as e:
|
||||
# Check if this is a context window error and convert to our exception type
|
||||
error_msg = str(e)
|
||||
if LLMContextLengthExceededError._is_context_limit_error(error_msg):
|
||||
raise LLMContextLengthExceededError(error_msg) from e
|
||||
|
||||
logging.error(f"Error in streaming response: {e!s}")
|
||||
if full_response.strip():
|
||||
logging.warning(f"Returning partial response despite error: {e!s}")
|
||||
@@ -1195,10 +1182,15 @@ class LLM(BaseLLM):
|
||||
usage_info = response.usage
|
||||
self._track_token_usage_internal(usage_info)
|
||||
|
||||
except ContextWindowExceededError as e:
|
||||
# Convert litellm's context window error to our own exception type
|
||||
# for consistent handling in the rest of the codebase
|
||||
raise LLMContextLengthExceededError(str(e)) from e
|
||||
except LLMContextLengthExceededError:
|
||||
# Re-raise our own context length error
|
||||
raise
|
||||
except Exception as e:
|
||||
# Check if this is a context window error and convert to our exception type
|
||||
error_msg = str(e)
|
||||
if LLMContextLengthExceededError._is_context_limit_error(error_msg):
|
||||
raise LLMContextLengthExceededError(error_msg) from e
|
||||
raise
|
||||
|
||||
# --- 2) Handle structured output response (when response_model is provided)
|
||||
if response_model is not None:
|
||||
@@ -1330,8 +1322,15 @@ class LLM(BaseLLM):
|
||||
usage_info = response.usage
|
||||
self._track_token_usage_internal(usage_info)
|
||||
|
||||
except ContextWindowExceededError as e:
|
||||
raise LLMContextLengthExceededError(str(e)) from e
|
||||
except LLMContextLengthExceededError:
|
||||
# Re-raise our own context length error
|
||||
raise
|
||||
except Exception as e:
|
||||
# Check if this is a context window error and convert to our exception type
|
||||
error_msg = str(e)
|
||||
if LLMContextLengthExceededError._is_context_limit_error(error_msg):
|
||||
raise LLMContextLengthExceededError(error_msg) from e
|
||||
raise
|
||||
|
||||
if response_model is not None:
|
||||
if isinstance(response, BaseModel):
|
||||
@@ -1548,9 +1547,15 @@ class LLM(BaseLLM):
|
||||
)
|
||||
return full_response
|
||||
|
||||
except ContextWindowExceededError as e:
|
||||
raise LLMContextLengthExceededError(str(e)) from e
|
||||
except Exception:
|
||||
except LLMContextLengthExceededError:
|
||||
# Re-raise our own context length error
|
||||
raise
|
||||
except Exception as e:
|
||||
# Check if this is a context window error and convert to our exception type
|
||||
error_msg = str(e)
|
||||
if LLMContextLengthExceededError._is_context_limit_error(error_msg):
|
||||
raise LLMContextLengthExceededError(error_msg) from e
|
||||
|
||||
if chunk_count == 0:
|
||||
raise
|
||||
if full_response:
|
||||
@@ -2157,7 +2162,15 @@ class LLM(BaseLLM):
|
||||
- E.g., "openrouter/deepseek/deepseek-chat" yields "openrouter"
|
||||
- "gemini/gemini-1.5-pro" yields "gemini"
|
||||
- If no slash is present, "openai" is assumed.
|
||||
|
||||
Note: This validation only applies to the litellm fallback path.
|
||||
Native providers have their own validation.
|
||||
"""
|
||||
if not LITELLM_AVAILABLE or supports_response_schema is None:
|
||||
# When litellm is not available, skip validation
|
||||
# (this path should only be reached for litellm fallback models)
|
||||
return
|
||||
|
||||
provider = self._get_custom_llm_provider()
|
||||
if self.response_format is not None and not supports_response_schema(
|
||||
model=self.model,
|
||||
@@ -2169,6 +2182,16 @@ class LLM(BaseLLM):
|
||||
)
|
||||
|
||||
def supports_function_calling(self) -> bool:
|
||||
"""Check if the model supports function calling.
|
||||
|
||||
Note: This method is only used by the litellm fallback path.
|
||||
Native providers override this method with their own implementation.
|
||||
"""
|
||||
if not LITELLM_AVAILABLE:
|
||||
# When litellm is not available, assume function calling is supported
|
||||
# (all modern models support it)
|
||||
return True
|
||||
|
||||
try:
|
||||
provider = self._get_custom_llm_provider()
|
||||
return litellm.utils.supports_function_calling(
|
||||
@@ -2176,15 +2199,24 @@ class LLM(BaseLLM):
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to check function calling support: {e!s}")
|
||||
return False
|
||||
return True # Default to True for modern models
|
||||
|
||||
def supports_stop_words(self) -> bool:
|
||||
"""Check if the model supports stop words.
|
||||
|
||||
Note: This method is only used by the litellm fallback path.
|
||||
Native providers override this method with their own implementation.
|
||||
"""
|
||||
if not LITELLM_AVAILABLE or get_supported_openai_params is None:
|
||||
# When litellm is not available, assume stop words are supported
|
||||
return True
|
||||
|
||||
try:
|
||||
params = get_supported_openai_params(model=self.model)
|
||||
return params is not None and "stop" in params
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to get supported params: {e!s}")
|
||||
return False
|
||||
return True # Default to True
|
||||
|
||||
def get_context_window_size(self) -> int:
|
||||
"""
|
||||
@@ -2220,7 +2252,15 @@ class LLM(BaseLLM):
|
||||
"""
|
||||
Attempt to keep a single set of callbacks in litellm by removing old
|
||||
duplicates and adding new ones.
|
||||
|
||||
Note: This only affects the litellm fallback path. Native providers
|
||||
don't use litellm callbacks - they emit events via base_llm.py.
|
||||
"""
|
||||
if not LITELLM_AVAILABLE:
|
||||
# When litellm is not available, callbacks are still stored
|
||||
# but not registered with litellm globals
|
||||
return
|
||||
|
||||
with suppress_warnings():
|
||||
callback_types = [type(callback) for callback in callbacks]
|
||||
for callback in litellm.success_callback[:]:
|
||||
@@ -2245,6 +2285,9 @@ class LLM(BaseLLM):
|
||||
If the environment variables are not set or are empty, the corresponding callback lists
|
||||
will be set to empty lists.
|
||||
|
||||
Note: This only affects the litellm fallback path. Native providers
|
||||
don't use litellm callbacks - they emit events via base_llm.py.
|
||||
|
||||
Examples:
|
||||
LITELLM_SUCCESS_CALLBACKS="langfuse,langsmith"
|
||||
LITELLM_FAILURE_CALLBACKS="langfuse"
|
||||
@@ -2252,9 +2295,13 @@ class LLM(BaseLLM):
|
||||
This will set `litellm.success_callback` to ["langfuse", "langsmith"] and
|
||||
`litellm.failure_callback` to ["langfuse"].
|
||||
"""
|
||||
if not LITELLM_AVAILABLE:
|
||||
# When litellm is not available, env callbacks have no effect
|
||||
return
|
||||
|
||||
with suppress_warnings():
|
||||
success_callbacks_str = os.environ.get("LITELLM_SUCCESS_CALLBACKS", "")
|
||||
success_callbacks: list[str | Callable[..., Any] | CustomLogger] = []
|
||||
success_callbacks: list[str | Callable[..., Any]] = []
|
||||
if success_callbacks_str:
|
||||
success_callbacks = [
|
||||
cb.strip() for cb in success_callbacks_str.split(",") if cb.strip()
|
||||
@@ -2262,7 +2309,7 @@ class LLM(BaseLLM):
|
||||
|
||||
failure_callbacks_str = os.environ.get("LITELLM_FAILURE_CALLBACKS", "")
|
||||
if failure_callbacks_str:
|
||||
failure_callbacks: list[str | Callable[..., Any] | CustomLogger] = [
|
||||
failure_callbacks: list[str | Callable[..., Any]] = [
|
||||
cb.strip() for cb in failure_callbacks_str.split(",") if cb.strip()
|
||||
]
|
||||
|
||||
|
||||
@@ -1,37 +1,40 @@
|
||||
"""Token counting callback handler for LLM interactions.
|
||||
|
||||
This module provides a callback handler that tracks token usage
|
||||
for LLM API calls through the litellm library.
|
||||
for LLM API calls. Works standalone and also integrates with litellm
|
||||
when available (for the litellm fallback path).
|
||||
"""
|
||||
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.types.utils import Usage
|
||||
else:
|
||||
try:
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.types.utils import Usage
|
||||
except ImportError:
|
||||
|
||||
class CustomLogger:
|
||||
"""Fallback CustomLogger when litellm is not available."""
|
||||
|
||||
class Usage:
|
||||
"""Fallback Usage when litellm is not available."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
|
||||
from crewai.utilities.logger_utils import suppress_warnings
|
||||
|
||||
|
||||
class TokenCalcHandler(CustomLogger):
|
||||
# Check if litellm is available for callback integration
|
||||
try:
|
||||
from litellm.integrations.custom_logger import CustomLogger as LiteLLMCustomLogger
|
||||
|
||||
LITELLM_AVAILABLE = True
|
||||
except ImportError:
|
||||
LiteLLMCustomLogger = None # type: ignore[misc, assignment]
|
||||
LITELLM_AVAILABLE = False
|
||||
|
||||
|
||||
# Create a base class that conditionally inherits from litellm's CustomLogger
|
||||
# when available, or from object when not available
|
||||
if LITELLM_AVAILABLE and LiteLLMCustomLogger is not None:
|
||||
_BaseClass: type = LiteLLMCustomLogger
|
||||
else:
|
||||
_BaseClass = object
|
||||
|
||||
|
||||
class TokenCalcHandler(_BaseClass): # type: ignore[misc]
|
||||
"""Handler for calculating and tracking token usage in LLM calls.
|
||||
|
||||
This handler integrates with litellm's logging system to track
|
||||
prompt tokens, completion tokens, and cached tokens across requests.
|
||||
This handler tracks prompt tokens, completion tokens, and cached tokens
|
||||
across requests. It works standalone and also integrates with litellm's
|
||||
logging system when litellm is installed (for the fallback path).
|
||||
|
||||
Attributes:
|
||||
token_cost_process: The token process tracker to accumulate usage metrics.
|
||||
@@ -43,7 +46,9 @@ class TokenCalcHandler(CustomLogger):
|
||||
Args:
|
||||
token_cost_process: Optional token process tracker for accumulating metrics.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
# Only call super().__init__ if we have a real parent class with __init__
|
||||
if LITELLM_AVAILABLE and LiteLLMCustomLogger is not None:
|
||||
super().__init__(**kwargs)
|
||||
self.token_cost_process = token_cost_process
|
||||
|
||||
def log_success_event(
|
||||
@@ -55,6 +60,10 @@ class TokenCalcHandler(CustomLogger):
|
||||
) -> None:
|
||||
"""Log successful LLM API call and track token usage.
|
||||
|
||||
This method has the same interface as litellm's CustomLogger.log_success_event()
|
||||
so it can be used as a litellm callback when litellm is installed, or called
|
||||
directly when litellm is not installed.
|
||||
|
||||
Args:
|
||||
kwargs: The arguments passed to the LLM call.
|
||||
response_obj: The response object from the LLM API.
|
||||
@@ -66,7 +75,7 @@ class TokenCalcHandler(CustomLogger):
|
||||
|
||||
with suppress_warnings():
|
||||
if isinstance(response_obj, dict) and "usage" in response_obj:
|
||||
usage: Usage = response_obj["usage"]
|
||||
usage = response_obj["usage"]
|
||||
if usage:
|
||||
self.token_cost_process.sum_successful_requests(1)
|
||||
if hasattr(usage, "prompt_tokens"):
|
||||
|
||||
Reference in New Issue
Block a user