diff --git a/lib/crewai/src/crewai/events/types/llm_events.py b/lib/crewai/src/crewai/events/types/llm_events.py index 73d743804..4b8c96d9e 100644 --- a/lib/crewai/src/crewai/events/types/llm_events.py +++ b/lib/crewai/src/crewai/events/types/llm_events.py @@ -57,6 +57,7 @@ class LLMCallCompletedEvent(LLMEventBase): messages: str | list[dict[str, Any]] | None = None response: Any call_type: LLMCallType + usage: dict[str, Any] | None = None class LLMCallFailedEvent(LLMEventBase): diff --git a/lib/crewai/src/crewai/llm.py b/lib/crewai/src/crewai/llm.py index 4e7303347..873c1b7dd 100644 --- a/lib/crewai/src/crewai/llm.py +++ b/lib/crewai/src/crewai/llm.py @@ -970,21 +970,25 @@ class LLM(BaseLLM): ) result = instructor_instance.to_pydantic() structured_response = result.model_dump_json() + usage_dict = self._usage_to_dict(usage_info) self._handle_emit_call_events( response=structured_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage_dict, ) return structured_response + usage_dict = self._usage_to_dict(usage_info) self._handle_emit_call_events( response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage_dict, ) return full_response @@ -994,12 +998,14 @@ class LLM(BaseLLM): return tool_result # --- 10) Emit completion event and return response + usage_dict = self._usage_to_dict(usage_info) self._handle_emit_call_events( response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage_dict, ) return full_response @@ -1021,6 +1027,7 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=self._usage_to_dict(usage_info), ) return full_response @@ -1172,6 +1179,7 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=None, ) return structured_response @@ -1202,6 +1210,8 @@ class LLM(BaseLLM): raise LLMContextLengthExceededError(error_msg) from e raise + response_usage = self._usage_to_dict(getattr(response, "usage", None)) + # --- 2) Handle structured output response (when response_model is provided) if response_model is not None: # When using instructor/response_model, litellm returns a Pydantic model instance @@ -1213,6 +1223,7 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=response_usage, ) return structured_response @@ -1244,6 +1255,7 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=response_usage, ) return text_response @@ -1267,6 +1279,7 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=response_usage, ) return text_response @@ -1316,6 +1329,7 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=None, ) return structured_response @@ -1342,6 +1356,8 @@ class LLM(BaseLLM): raise LLMContextLengthExceededError(error_msg) from e raise + response_usage = self._usage_to_dict(getattr(response, "usage", None)) + if response_model is not None: if isinstance(response, BaseModel): structured_response = response.model_dump_json() @@ -1351,6 +1367,7 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=response_usage, ) return structured_response @@ -1380,6 +1397,7 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=response_usage, ) return text_response @@ -1402,6 +1420,7 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=response_usage, ) return text_response @@ -1548,12 +1567,14 @@ class LLM(BaseLLM): if result is not None: return result + usage_dict = self._usage_to_dict(usage_info) self._handle_emit_call_events( response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params.get("messages"), + usage=usage_dict, ) return full_response @@ -1575,6 +1596,7 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("messages"), + usage=self._usage_to_dict(usage_info), ) return full_response raise @@ -1961,6 +1983,19 @@ class LLM(BaseLLM): ) raise + @staticmethod + def _usage_to_dict(usage: Any) -> dict[str, Any] | None: + if usage is None: + return None + if isinstance(usage, dict): + return usage + if hasattr(usage, "model_dump"): + result: dict[str, Any] = usage.model_dump() + return result + if hasattr(usage, "__dict__"): + return {k: v for k, v in vars(usage).items() if not k.startswith("_")} + return None + def _handle_emit_call_events( self, response: Any, @@ -1968,6 +2003,7 @@ class LLM(BaseLLM): from_task: Task | None = None, from_agent: Agent | None = None, messages: str | list[LLMMessage] | None = None, + usage: dict[str, Any] | None = None, ) -> None: """Handle the events for the LLM call. @@ -1977,6 +2013,7 @@ class LLM(BaseLLM): from_task: Optional task object from_agent: Optional agent object messages: Optional messages object + usage: Optional token usage data """ crewai_event_bus.emit( self, @@ -1988,6 +2025,7 @@ class LLM(BaseLLM): from_agent=from_agent, model=self.model, call_id=get_current_call_id(), + usage=usage, ), ) diff --git a/lib/crewai/src/crewai/llms/base_llm.py b/lib/crewai/src/crewai/llms/base_llm.py index 857c2707d..a0bf7c56a 100644 --- a/lib/crewai/src/crewai/llms/base_llm.py +++ b/lib/crewai/src/crewai/llms/base_llm.py @@ -460,6 +460,7 @@ class BaseLLM(BaseModel, ABC): from_task: Task | None = None, from_agent: Agent | None = None, messages: str | list[LLMMessage] | None = None, + usage: dict[str, Any] | None = None, ) -> None: """Emit LLM call completed event.""" from crewai.utilities.serialization import to_serializable @@ -474,6 +475,7 @@ class BaseLLM(BaseModel, ABC): from_agent=from_agent, model=self.model, call_id=get_current_call_id(), + usage=usage, ), ) diff --git a/lib/crewai/src/crewai/llms/providers/anthropic/completion.py b/lib/crewai/src/crewai/llms/providers/anthropic/completion.py index 1c77d2bc7..d710404bd 100644 --- a/lib/crewai/src/crewai/llms/providers/anthropic/completion.py +++ b/lib/crewai/src/crewai/llms/providers/anthropic/completion.py @@ -811,6 +811,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return structured_data else: @@ -826,6 +827,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return structured_data @@ -848,6 +850,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return list(tool_uses) @@ -879,6 +882,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) if usage.get("total_tokens", 0) > 0: @@ -1028,6 +1032,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return structured_data for block in final_message.content: @@ -1042,6 +1047,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return structured_data @@ -1071,6 +1077,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return self._invoke_after_llm_call_hooks( @@ -1241,6 +1248,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=follow_up_params["messages"], + usage=follow_up_usage, ) # Log combined token usage @@ -1332,6 +1340,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return structured_data else: @@ -1347,6 +1356,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return structured_data @@ -1367,6 +1377,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return list(tool_uses) @@ -1390,6 +1401,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) if usage.get("total_tokens", 0) > 0: @@ -1527,6 +1539,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return structured_data for block in final_message.content: @@ -1541,6 +1554,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return structured_data @@ -1569,6 +1583,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return full_response @@ -1627,6 +1642,7 @@ class AnthropicCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=follow_up_params["messages"], + usage=follow_up_usage, ) total_usage = { diff --git a/lib/crewai/src/crewai/llms/providers/azure/completion.py b/lib/crewai/src/crewai/llms/providers/azure/completion.py index cae50d0c6..52bf05531 100644 --- a/lib/crewai/src/crewai/llms/providers/azure/completion.py +++ b/lib/crewai/src/crewai/llms/providers/azure/completion.py @@ -569,6 +569,7 @@ class AzureCompletion(BaseLLM): params: AzureCompletionParams, from_task: Any | None = None, from_agent: Any | None = None, + usage: dict[str, Any] | None = None, ) -> BaseModel: """Validate content against response model and emit completion event. @@ -594,6 +595,7 @@ class AzureCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return structured_data @@ -643,6 +645,7 @@ class AzureCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return list(message.tool_calls) @@ -680,6 +683,7 @@ class AzureCompletion(BaseLLM): params=params, from_task=from_task, from_agent=from_agent, + usage=usage, ) content = self._apply_stop_words(content) @@ -691,6 +695,7 @@ class AzureCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return self._invoke_after_llm_call_hooks( @@ -794,7 +799,7 @@ class AzureCompletion(BaseLLM): self, full_response: str, tool_calls: dict[int, dict[str, Any]], - usage_data: dict[str, int], + usage_data: dict[str, Any] | None, params: AzureCompletionParams, available_functions: dict[str, Any] | None = None, from_task: Any | None = None, @@ -806,7 +811,7 @@ class AzureCompletion(BaseLLM): Args: full_response: The complete streamed response content tool_calls: Dictionary of tool calls accumulated during streaming - usage_data: Token usage data from the stream + usage_data: Token usage data from the stream, or None if unavailable params: Completion parameters containing messages available_functions: Available functions for tool calling from_task: Task that initiated the call @@ -816,7 +821,8 @@ class AzureCompletion(BaseLLM): Returns: Final response content after processing, or structured output """ - self._track_token_usage_internal(usage_data) + if usage_data: + self._track_token_usage_internal(usage_data) # Handle structured output validation if response_model and self.is_openai_model: @@ -826,6 +832,7 @@ class AzureCompletion(BaseLLM): params=params, from_task=from_task, from_agent=from_agent, + usage=usage_data, ) # If there are tool_calls but no available_functions, return them @@ -848,6 +855,7 @@ class AzureCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage_data, ) return formatted_tool_calls @@ -884,6 +892,7 @@ class AzureCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage_data, ) return self._invoke_after_llm_call_hooks( @@ -902,7 +911,7 @@ class AzureCompletion(BaseLLM): full_response = "" tool_calls: dict[int, dict[str, Any]] = {} - usage_data = {"total_tokens": 0} + usage_data: dict[str, Any] | None = None for update in self._client.complete(**params): if isinstance(update, StreamingChatCompletionsUpdate): if update.usage: @@ -968,7 +977,7 @@ class AzureCompletion(BaseLLM): full_response = "" tool_calls: dict[int, dict[str, Any]] = {} - usage_data = {"total_tokens": 0} + usage_data: dict[str, Any] | None = None stream = await self._async_client.complete(**params) async for update in stream: diff --git a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py index 510c84cc7..6fcf3581d 100644 --- a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py +++ b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py @@ -664,8 +664,9 @@ class BedrockCompletion(BaseLLM): ) # Track token usage according to AWS response format - if "usage" in response: - self._track_token_usage_internal(response["usage"]) + usage = response.get("usage") + if usage: + self._track_token_usage_internal(usage) stop_reason = response.get("stopReason") if stop_reason: @@ -705,6 +706,7 @@ class BedrockCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=messages, + usage=usage, ) return result except Exception as e: @@ -727,6 +729,7 @@ class BedrockCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=messages, + usage=usage, ) return non_structured_output_tool_uses @@ -806,6 +809,7 @@ class BedrockCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=messages, + usage=usage, ) return self._invoke_after_llm_call_hooks( @@ -936,6 +940,7 @@ class BedrockCompletion(BaseLLM): tool_use_id: str | None = None tool_use_index = 0 accumulated_tool_input = "" + usage_data: dict[str, Any] | None = None try: response = self._client.converse_stream( @@ -1045,6 +1050,7 @@ class BedrockCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=messages, + usage=usage_data, ) return result # type: ignore[return-value] except Exception as e: @@ -1112,6 +1118,7 @@ class BedrockCompletion(BaseLLM): metadata = event["metadata"] if "usage" in metadata: usage_metrics = metadata["usage"] + usage_data = usage_metrics self._track_token_usage_internal(usage_metrics) logging.debug(f"Token usage: {usage_metrics}") if "trace" in metadata: @@ -1141,6 +1148,7 @@ class BedrockCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=messages, + usage=usage_data, ) return full_response @@ -1252,8 +1260,9 @@ class BedrockCompletion(BaseLLM): **body, ) - if "usage" in response: - self._track_token_usage_internal(response["usage"]) + usage = response.get("usage") + if usage: + self._track_token_usage_internal(usage) stop_reason = response.get("stopReason") if stop_reason: @@ -1292,6 +1301,7 @@ class BedrockCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=messages, + usage=usage, ) return result except Exception as e: @@ -1314,6 +1324,7 @@ class BedrockCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=messages, + usage=usage, ) return non_structured_output_tool_uses @@ -1388,6 +1399,7 @@ class BedrockCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=messages, + usage=usage, ) return text_content @@ -1508,6 +1520,7 @@ class BedrockCompletion(BaseLLM): tool_use_id: str | None = None tool_use_index = 0 accumulated_tool_input = "" + usage_data: dict[str, Any] | None = None try: async_client = await self._ensure_async_client() @@ -1619,6 +1632,7 @@ class BedrockCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=messages, + usage=usage_data, ) return result # type: ignore[return-value] except Exception as e: @@ -1691,6 +1705,7 @@ class BedrockCompletion(BaseLLM): metadata = event["metadata"] if "usage" in metadata: usage_metrics = metadata["usage"] + usage_data = usage_metrics self._track_token_usage_internal(usage_metrics) logging.debug(f"Token usage: {usage_metrics}") if "trace" in metadata: @@ -1720,6 +1735,7 @@ class BedrockCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=messages, + usage=usage_data, ) return self._invoke_after_llm_call_hooks( diff --git a/lib/crewai/src/crewai/llms/providers/gemini/completion.py b/lib/crewai/src/crewai/llms/providers/gemini/completion.py index 827df750c..f790e22cf 100644 --- a/lib/crewai/src/crewai/llms/providers/gemini/completion.py +++ b/lib/crewai/src/crewai/llms/providers/gemini/completion.py @@ -665,6 +665,7 @@ class GeminiCompletion(BaseLLM): messages_for_event: list[LLMMessage], from_task: Any | None = None, from_agent: Any | None = None, + usage: dict[str, Any] | None = None, ) -> BaseModel: """Validate content against response model and emit completion event. @@ -690,6 +691,7 @@ class GeminiCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=messages_for_event, + usage=usage, ) return structured_data @@ -705,6 +707,7 @@ class GeminiCompletion(BaseLLM): response_model: type[BaseModel] | None = None, from_task: Any | None = None, from_agent: Any | None = None, + usage: dict[str, Any] | None = None, ) -> str | BaseModel: """Finalize completion response with validation and event emission. @@ -728,6 +731,7 @@ class GeminiCompletion(BaseLLM): messages_for_event=messages_for_event, from_task=from_task, from_agent=from_agent, + usage=usage, ) self._emit_call_completed_event( @@ -736,6 +740,7 @@ class GeminiCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=messages_for_event, + usage=usage, ) return self._invoke_after_llm_call_hooks( @@ -749,6 +754,7 @@ class GeminiCompletion(BaseLLM): contents: list[types.Content], from_task: Any | None = None, from_agent: Any | None = None, + usage: dict[str, Any] | None = None, ) -> BaseModel: """Validate and emit event for structured_output tool call. @@ -773,6 +779,7 @@ class GeminiCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=self._convert_contents_to_dict(contents), + usage=usage, ) return validated_data except Exception as e: @@ -791,6 +798,7 @@ class GeminiCompletion(BaseLLM): from_task: Any | None = None, from_agent: Any | None = None, response_model: type[BaseModel] | None = None, + usage: dict[str, Any] | None = None, ) -> str | Any: """Process response, execute function calls, and finalize completion. @@ -831,6 +839,7 @@ class GeminiCompletion(BaseLLM): contents=contents, from_task=from_task, from_agent=from_agent, + usage=usage, ) # Filter out structured_output from function calls returned to executor @@ -852,6 +861,7 @@ class GeminiCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=self._convert_contents_to_dict(contents), + usage=usage, ) return non_structured_output_parts @@ -893,6 +903,7 @@ class GeminiCompletion(BaseLLM): response_model=effective_response_model, from_task=from_task, from_agent=from_agent, + usage=usage, ) def _process_stream_chunk( @@ -900,10 +911,10 @@ class GeminiCompletion(BaseLLM): chunk: GenerateContentResponse, full_response: str, function_calls: dict[int, dict[str, Any]], - usage_data: dict[str, int], + usage_data: dict[str, int] | None, from_task: Any | None = None, from_agent: Any | None = None, - ) -> tuple[str, dict[int, dict[str, Any]], dict[str, int]]: + ) -> tuple[str, dict[int, dict[str, Any]], dict[str, int] | None]: """Process a single streaming chunk. Args: @@ -979,7 +990,7 @@ class GeminiCompletion(BaseLLM): self, full_response: str, function_calls: dict[int, dict[str, Any]], - usage_data: dict[str, int], + usage_data: dict[str, int] | None, contents: list[types.Content], available_functions: dict[str, Any] | None = None, from_task: Any | None = None, @@ -991,7 +1002,7 @@ class GeminiCompletion(BaseLLM): Args: full_response: The complete streamed response content function_calls: Dictionary of function calls accumulated during streaming - usage_data: Token usage data from the stream + usage_data: Token usage data from the stream, or None if unavailable contents: Original contents for event conversion available_functions: Available functions for function calling from_task: Task that initiated the call @@ -1001,7 +1012,8 @@ class GeminiCompletion(BaseLLM): Returns: Final response content after processing """ - self._track_token_usage_internal(usage_data) + if usage_data: + self._track_token_usage_internal(usage_data) if response_model and function_calls: for call_data in function_calls.values(): @@ -1013,6 +1025,7 @@ class GeminiCompletion(BaseLLM): contents=contents, from_task=from_task, from_agent=from_agent, + usage=usage_data, ) non_structured_output_calls = { @@ -1041,6 +1054,7 @@ class GeminiCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=self._convert_contents_to_dict(contents), + usage=usage_data, ) return formatted_function_calls @@ -1081,6 +1095,7 @@ class GeminiCompletion(BaseLLM): response_model=effective_response_model, from_task=from_task, from_agent=from_agent, + usage=usage_data, ) def _handle_completion( @@ -1118,6 +1133,7 @@ class GeminiCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, response_model=response_model, + usage=usage, ) def _handle_streaming_completion( @@ -1132,7 +1148,7 @@ class GeminiCompletion(BaseLLM): """Handle streaming content generation.""" full_response = "" function_calls: dict[int, dict[str, Any]] = {} - usage_data = {"total_tokens": 0} + usage_data: dict[str, int] | None = None # The API accepts list[Content] but mypy is overly strict about variance contents_for_api: Any = contents @@ -1196,6 +1212,7 @@ class GeminiCompletion(BaseLLM): from_task=from_task, from_agent=from_agent, response_model=response_model, + usage=usage, ) async def _ahandle_streaming_completion( @@ -1210,7 +1227,7 @@ class GeminiCompletion(BaseLLM): """Handle async streaming content generation.""" full_response = "" function_calls: dict[int, dict[str, Any]] = {} - usage_data = {"total_tokens": 0} + usage_data: dict[str, int] | None = None # The API accepts list[Content] but mypy is overly strict about variance contents_for_api: Any = contents diff --git a/lib/crewai/src/crewai/llms/providers/openai/completion.py b/lib/crewai/src/crewai/llms/providers/openai/completion.py index 8870fcd85..d58e6b0d9 100644 --- a/lib/crewai/src/crewai/llms/providers/openai/completion.py +++ b/lib/crewai/src/crewai/llms/providers/openai/completion.py @@ -809,6 +809,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) return parsed_result @@ -821,6 +822,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) return function_calls @@ -858,6 +860,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) return structured_result except ValueError as e: @@ -871,6 +874,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) content = self._invoke_after_llm_call_hooks( @@ -941,6 +945,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) return parsed_result @@ -953,6 +958,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) return function_calls @@ -990,6 +996,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) return structured_result except ValueError as e: @@ -1003,6 +1010,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) except NotFoundError as e: @@ -1045,6 +1053,7 @@ class OpenAICompletion(BaseLLM): full_response = "" function_calls: list[dict[str, Any]] = [] final_response: Response | None = None + usage: dict[str, Any] | None = None stream = self._client.responses.create(**params) response_id_stream = None @@ -1102,6 +1111,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) return parsed_result @@ -1138,6 +1148,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) return structured_result except ValueError as e: @@ -1151,6 +1162,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) return self._invoke_after_llm_call_hooks( @@ -1169,6 +1181,7 @@ class OpenAICompletion(BaseLLM): full_response = "" function_calls: list[dict[str, Any]] = [] final_response: Response | None = None + usage: dict[str, Any] | None = None stream = await self._async_client.responses.create(**params) response_id_stream = None @@ -1226,6 +1239,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) return parsed_result @@ -1262,6 +1276,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) return structured_result except ValueError as e: @@ -1275,6 +1290,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params.get("input", []), + usage=usage, ) return full_response @@ -1580,6 +1596,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return parsed_object @@ -1601,6 +1618,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return list(message.tool_calls) @@ -1639,6 +1657,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return structured_result except ValueError as e: @@ -1652,6 +1671,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) if usage.get("total_tokens", 0) > 0: @@ -1693,7 +1713,7 @@ class OpenAICompletion(BaseLLM): self, full_response: str, tool_calls: dict[int, dict[str, Any]], - usage_data: dict[str, int], + usage_data: dict[str, Any] | None, params: dict[str, Any], available_functions: dict[str, Any] | None = None, from_task: Any | None = None, @@ -1704,7 +1724,7 @@ class OpenAICompletion(BaseLLM): Args: full_response: The accumulated text response from the stream. tool_calls: Accumulated tool calls from the stream, keyed by index. - usage_data: Token usage data from the stream. + usage_data: Token usage data from the stream, or None if unavailable. params: The completion parameters containing messages. available_functions: Available functions for tool calling. from_task: Task that initiated the call. @@ -1715,7 +1735,8 @@ class OpenAICompletion(BaseLLM): tool execution result when available_functions is provided, or the text response string. """ - self._track_token_usage_internal(usage_data) + if usage_data: + self._track_token_usage_internal(usage_data) if tool_calls and not available_functions: tool_calls_list = [ @@ -1736,6 +1757,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage_data, ) return tool_calls_list @@ -1778,6 +1800,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage_data, ) return full_response @@ -1831,6 +1854,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return parsed_result @@ -1841,7 +1865,7 @@ class OpenAICompletion(BaseLLM): self._client.chat.completions.create(**params) ) - usage_data = {"total_tokens": 0} + usage_data: dict[str, Any] | None = None for completion_chunk in completion_stream: response_id_stream = ( @@ -1955,6 +1979,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return parsed_object @@ -1978,6 +2003,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return list(message.tool_calls) @@ -2016,6 +2042,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) return structured_result except ValueError as e: @@ -2029,6 +2056,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage, ) if usage.get("total_tokens", 0) > 0: @@ -2079,7 +2107,7 @@ class OpenAICompletion(BaseLLM): ] = await self._async_client.chat.completions.create(**params) accumulated_content = "" - usage_data = {"total_tokens": 0} + usage_data: dict[str, Any] | None = None async for chunk in completion_stream: response_id_stream = chunk.id if hasattr(chunk, "id") else None @@ -2102,7 +2130,8 @@ class OpenAICompletion(BaseLLM): response_id=response_id_stream, ) - self._track_token_usage_internal(usage_data) + if usage_data: + self._track_token_usage_internal(usage_data) try: parsed_object = response_model.model_validate_json(accumulated_content) @@ -2113,6 +2142,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage_data, ) return parsed_object @@ -2124,6 +2154,7 @@ class OpenAICompletion(BaseLLM): from_task=from_task, from_agent=from_agent, messages=params["messages"], + usage=usage_data, ) return accumulated_content @@ -2131,7 +2162,7 @@ class OpenAICompletion(BaseLLM): ChatCompletionChunk ] = await self._async_client.chat.completions.create(**params) - usage_data = {"total_tokens": 0} + usage_data = None async for chunk in stream: response_id_stream = chunk.id if hasattr(chunk, "id") else None diff --git a/lib/crewai/tests/cassettes/utilities/test_llm_completed_event_includes_usage.yaml b/lib/crewai/tests/cassettes/utilities/test_llm_completed_event_includes_usage.yaml new file mode 100644 index 000000000..cc9245041 --- /dev/null +++ b/lib/crewai/tests/cassettes/utilities/test_llm_completed_event_includes_usage.yaml @@ -0,0 +1,108 @@ +interactions: +- request: + body: '{"messages":[{"role":"user","content":"Say hello"}],"model":"gpt-4o-mini"}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '74' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.13.2 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: "{\n \"id\": \"chatcmpl-DPS8YQSwQ3pZKZztIoIe1eYodMqh2\",\n \"object\": + \"chat.completion\",\n \"created\": 1774958730,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n + \ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": + \"assistant\",\n \"content\": \"Hello! How can I assist you today?\",\n + \ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": + null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": + 9,\n \"completion_tokens\": 9,\n \"total_tokens\": 18,\n \"prompt_tokens_details\": + {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": + {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": + 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": + \"default\",\n \"system_fingerprint\": \"fp_709f182cb4\"\n}\n" + headers: + CF-Cache-Status: + - DYNAMIC + CF-Ray: + - 9e4f38fc5d9d82e8-GIG + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Tue, 31 Mar 2026 12:05:30 GMT + Server: + - cloudflare + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + content-length: + - '839' + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '680' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + set-cookie: + - SET-COOKIE-XXX + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/events/test_llm_usage_event.py b/lib/crewai/tests/events/test_llm_usage_event.py new file mode 100644 index 000000000..f19f07b47 --- /dev/null +++ b/lib/crewai/tests/events/test_llm_usage_event.py @@ -0,0 +1,176 @@ +from typing import Any +from unittest.mock import patch + +import pytest +from pydantic import BaseModel + +from crewai.events.event_bus import CrewAIEventsBus +from crewai.events.types.llm_events import LLMCallCompletedEvent, LLMCallType +from crewai.llm import LLM +from crewai.llms.base_llm import BaseLLM + + +class TestLLMCallCompletedEventUsageField: + def test_accepts_usage_dict(self): + event = LLMCallCompletedEvent( + response="hello", + call_type=LLMCallType.LLM_CALL, + call_id="test-id", + usage={"prompt_tokens": 10, "completion_tokens": 20, "total_tokens": 30}, + ) + assert event.usage == { + "prompt_tokens": 10, + "completion_tokens": 20, + "total_tokens": 30, + } + + def test_usage_defaults_to_none(self): + event = LLMCallCompletedEvent( + response="hello", + call_type=LLMCallType.LLM_CALL, + call_id="test-id", + ) + assert event.usage is None + + def test_accepts_none_usage(self): + event = LLMCallCompletedEvent( + response="hello", + call_type=LLMCallType.LLM_CALL, + call_id="test-id", + usage=None, + ) + assert event.usage is None + + def test_accepts_nested_usage_dict(self): + usage = { + "prompt_tokens": 100, + "completion_tokens": 200, + "total_tokens": 300, + "prompt_tokens_details": {"cached_tokens": 50}, + } + event = LLMCallCompletedEvent( + response="hello", + call_type=LLMCallType.LLM_CALL, + call_id="test-id", + usage=usage, + ) + assert event.usage["prompt_tokens_details"]["cached_tokens"] == 50 + + +class TestUsageToDict: + def test_none_returns_none(self): + assert LLM._usage_to_dict(None) is None + + def test_dict_passes_through(self): + usage = {"prompt_tokens": 10, "total_tokens": 30} + assert LLM._usage_to_dict(usage) is usage + + def test_pydantic_model_uses_model_dump(self): + class Usage(BaseModel): + prompt_tokens: int = 10 + completion_tokens: int = 20 + total_tokens: int = 30 + + result = LLM._usage_to_dict(Usage()) + assert result == { + "prompt_tokens": 10, + "completion_tokens": 20, + "total_tokens": 30, + } + + def test_object_with_dict_attr(self): + class UsageObj: + def __init__(self): + self.prompt_tokens = 5 + self.completion_tokens = 15 + self.total_tokens = 20 + + result = LLM._usage_to_dict(UsageObj()) + assert result == { + "prompt_tokens": 5, + "completion_tokens": 15, + "total_tokens": 20, + } + + def test_object_with_dict_excludes_private_attrs(self): + class UsageObj: + def __init__(self): + self.total_tokens = 42 + self._internal = "hidden" + + result = LLM._usage_to_dict(UsageObj()) + assert result == {"total_tokens": 42} + assert "_internal" not in result + + def test_unsupported_type_returns_none(self): + assert LLM._usage_to_dict(42) is None + assert LLM._usage_to_dict("string") is None + + +class _StubLLM(BaseLLM): + """Minimal concrete BaseLLM for testing event emission.""" + + model: str = "test-model" + + def call(self, *args: Any, **kwargs: Any) -> str: + return "" + + async def acall(self, *args: Any, **kwargs: Any) -> str: + return "" + + def supports_function_calling(self) -> bool: + return False + + def supports_stop_words(self) -> bool: + return True + + +class TestEmitCallCompletedEventPassesUsage: + @pytest.fixture + def mock_emit(self): + with patch.object(CrewAIEventsBus, "emit") as mock: + yield mock + + @pytest.fixture + def llm(self): + return _StubLLM(model="test-model") + + def test_usage_is_passed_to_event(self, mock_emit, llm): + usage_data = {"prompt_tokens": 10, "completion_tokens": 20, "total_tokens": 30} + + llm._emit_call_completed_event( + response="hello", + call_type=LLMCallType.LLM_CALL, + messages="test prompt", + usage=usage_data, + ) + + mock_emit.assert_called_once() + event = mock_emit.call_args[1]["event"] + assert isinstance(event, LLMCallCompletedEvent) + assert event.usage == usage_data + + def test_none_usage_is_passed_to_event(self, mock_emit, llm): + llm._emit_call_completed_event( + response="hello", + call_type=LLMCallType.LLM_CALL, + messages="test prompt", + usage=None, + ) + + mock_emit.assert_called_once() + event = mock_emit.call_args[1]["event"] + assert isinstance(event, LLMCallCompletedEvent) + assert event.usage is None + + def test_usage_omitted_defaults_to_none(self, mock_emit, llm): + llm._emit_call_completed_event( + response="hello", + call_type=LLMCallType.LLM_CALL, + messages="test prompt", + ) + + mock_emit.assert_called_once() + event = mock_emit.call_args[1]["event"] + assert isinstance(event, LLMCallCompletedEvent) + assert event.usage is None diff --git a/lib/crewai/tests/test_llm.py b/lib/crewai/tests/test_llm.py index 52b00753b..413504f31 100644 --- a/lib/crewai/tests/test_llm.py +++ b/lib/crewai/tests/test_llm.py @@ -752,11 +752,7 @@ def test_litellm_retry_catches_litellm_unsupported_params_error(caplog): raise litellm_error return MagicMock( choices=[MagicMock(message=MagicMock(content="Paris", tool_calls=None))], - usage=MagicMock( - prompt_tokens=10, - completion_tokens=5, - total_tokens=15, - ), + usage={"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}, ) with patch("litellm.completion", side_effect=mock_completion): @@ -787,11 +783,7 @@ def test_litellm_retry_catches_openai_api_stop_error(caplog): raise api_error return MagicMock( choices=[MagicMock(message=MagicMock(content="Paris", tool_calls=None))], - usage=MagicMock( - prompt_tokens=10, - completion_tokens=5, - total_tokens=15, - ), + usage={"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}, ) with patch("litellm.completion", side_effect=mock_completion): diff --git a/lib/crewai/tests/utilities/test_events.py b/lib/crewai/tests/utilities/test_events.py index 6b7c1783c..0cd9d90e5 100644 --- a/lib/crewai/tests/utilities/test_events.py +++ b/lib/crewai/tests/utilities/test_events.py @@ -879,6 +879,35 @@ def test_llm_emits_call_started_event(): assert started_events[0].task_id is None +@pytest.mark.vcr() +def test_llm_completed_event_includes_usage(): + completed_events: list[LLMCallCompletedEvent] = [] + condition = threading.Condition() + + @crewai_event_bus.on(LLMCallCompletedEvent) + def handle_llm_call_completed(source, event): + with condition: + completed_events.append(event) + condition.notify() + + llm = LLM(model="gpt-4o-mini") + llm.call("Say hello") + + with condition: + success = condition.wait_for( + lambda: len(completed_events) >= 1, + timeout=10, + ) + assert success, "Timeout waiting for LLMCallCompletedEvent" + + event = completed_events[0] + assert event.usage is not None + assert isinstance(event.usage, dict) + assert event.usage.get("prompt_tokens", 0) > 0 + assert event.usage.get("completion_tokens", 0) > 0 + assert event.usage.get("total_tokens", 0) > 0 + + @pytest.mark.vcr() def test_llm_emits_call_failed_event(): received_events = []