This commit is contained in:
Lucas Gomide
2025-04-10 12:30:36 -03:00
parent c9f47e6a37
commit cf45026e75

View File

@@ -5,6 +5,7 @@ import sys
import threading import threading
import warnings import warnings
from contextlib import contextmanager from contextlib import contextmanager
from types import SimpleNamespace
from typing import ( from typing import (
Any, Any,
Dict, Dict,
@@ -371,6 +372,8 @@ class LLM(BaseLLM):
last_chunk = None last_chunk = None
chunk_count = 0 chunk_count = 0
usage_info = None usage_info = None
tool_calls = None
accumulated_tool_args = {} # Track tool call arguments by index
# --- 2) Make sure stream is set to True and include usage metrics # --- 2) Make sure stream is set to True and include usage metrics
params["stream"] = True params["stream"] = True
@@ -392,7 +395,6 @@ class LLM(BaseLLM):
if isinstance(chunk, dict) and "choices" in chunk: if isinstance(chunk, dict) and "choices" in chunk:
choices = chunk["choices"] choices = chunk["choices"]
elif hasattr(chunk, "choices"): elif hasattr(chunk, "choices"):
# Check if choices is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "choices"), type): if not isinstance(getattr(chunk, "choices"), type):
choices = getattr(chunk, "choices") choices = getattr(chunk, "choices")
@@ -400,14 +402,97 @@ class LLM(BaseLLM):
if isinstance(chunk, dict) and "usage" in chunk: if isinstance(chunk, dict) and "usage" in chunk:
usage_info = chunk["usage"] usage_info = chunk["usage"]
elif hasattr(chunk, "usage"): elif hasattr(chunk, "usage"):
# Check if usage is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "usage"), type): if not isinstance(getattr(chunk, "usage"), type):
usage_info = getattr(chunk, "usage") usage_info = getattr(chunk, "usage")
if choices and len(choices) > 0: if choices and len(choices) > 0:
choice = choices[0] choice = choices[0]
# Handle different delta formats # Check for tool calls in the chunk
if "delta" in choice:
delta = choice["delta"]
if "tool_calls" in delta:
tool_calls = delta["tool_calls"]
# If we have tool calls and available functions, accumulate arguments
if tool_calls and available_functions:
for tool_call in tool_calls:
if hasattr(tool_call, "index"):
index = tool_call.index
if index not in accumulated_tool_args:
accumulated_tool_args[index] = {
"name": None,
"arguments": "",
}
# Update tool call name if available
if hasattr(
tool_call, "function"
) and hasattr(tool_call.function, "name"):
if tool_call.function.name:
accumulated_tool_args[index][
"name"
] = tool_call.function.name
# Accumulate arguments
if hasattr(
tool_call, "function"
) and hasattr(
tool_call.function, "arguments"
):
if tool_call.function.arguments:
accumulated_tool_args[index][
"arguments"
] += tool_call.function.arguments
# Check if we have a complete tool call
if (
accumulated_tool_args[index]["name"]
and accumulated_tool_args[index][
"arguments"
]
):
try:
# Try to parse the accumulated arguments
json.loads(
accumulated_tool_args[index][
"arguments"
]
)
# Execute the tool call
tool_result = self._handle_tool_call(
[
SimpleNamespace(
function=SimpleNamespace(
name=accumulated_tool_args[
index
][
"name"
],
arguments=accumulated_tool_args[
index
][
"arguments"
],
),
)
],
available_functions,
)
if tool_result is not None:
# Stream the tool result
crewai_event_bus.emit(
self,
event=LLMStreamChunkEvent(
chunk=tool_result
),
)
return tool_result
except json.JSONDecodeError:
# If JSON is incomplete, continue accumulating
continue
# Handle different delta formats for content
delta = None delta = None
if isinstance(choice, dict) and "delta" in choice: if isinstance(choice, dict) and "delta" in choice:
delta = choice["delta"] delta = choice["delta"]
@@ -416,17 +501,13 @@ class LLM(BaseLLM):
# Extract content from delta # Extract content from delta
if delta: if delta:
# Handle dict format
if isinstance(delta, dict): if isinstance(delta, dict):
if "content" in delta and delta["content"] is not None: if "content" in delta and delta["content"] is not None:
chunk_content = delta["content"] chunk_content = delta["content"]
# Handle object format
elif hasattr(delta, "content"): elif hasattr(delta, "content"):
chunk_content = getattr(delta, "content") chunk_content = getattr(delta, "content")
# Handle case where content might be None or empty
if chunk_content is None and isinstance(delta, dict): if chunk_content is None and isinstance(delta, dict):
# Some models might send empty content chunks
chunk_content = "" chunk_content = ""
except Exception as e: except Exception as e:
logging.debug(f"Error extracting content from chunk: {e}") logging.debug(f"Error extracting content from chunk: {e}")
@@ -434,10 +515,7 @@ class LLM(BaseLLM):
# Only add non-None content to the response # Only add non-None content to the response
if chunk_content is not None: if chunk_content is not None:
# Add the chunk content to the full response
full_response += chunk_content full_response += chunk_content
# Emit the chunk event
crewai_event_bus.emit( crewai_event_bus.emit(
self, self,
event=LLMStreamChunkEvent(chunk=chunk_content), event=LLMStreamChunkEvent(chunk=chunk_content),
@@ -450,9 +528,7 @@ class LLM(BaseLLM):
) )
non_streaming_params = params.copy() non_streaming_params = params.copy()
non_streaming_params["stream"] = False non_streaming_params["stream"] = False
non_streaming_params.pop( non_streaming_params.pop("stream_options", None)
"stream_options", None
) # Remove stream_options for non-streaming call
return self._handle_non_streaming_response( return self._handle_non_streaming_response(
non_streaming_params, callbacks, available_functions non_streaming_params, callbacks, available_functions
) )
@@ -464,7 +540,6 @@ class LLM(BaseLLM):
) )
if last_chunk is not None: if last_chunk is not None:
try: try:
# Try to extract content from the last chunk's message
choices = None choices = None
if isinstance(last_chunk, dict) and "choices" in last_chunk: if isinstance(last_chunk, dict) and "choices" in last_chunk:
choices = last_chunk["choices"] choices = last_chunk["choices"]
@@ -474,8 +549,6 @@ class LLM(BaseLLM):
if choices and len(choices) > 0: if choices and len(choices) > 0:
choice = choices[0] choice = choices[0]
# Try to get content from message
message = None message = None
if isinstance(choice, dict) and "message" in choice: if isinstance(choice, dict) and "message" in choice:
message = choice["message"] message = choice["message"]
@@ -500,57 +573,14 @@ class LLM(BaseLLM):
f"Last chunk format: {type(last_chunk)}, content: {last_chunk}" f"Last chunk format: {type(last_chunk)}, content: {last_chunk}"
) )
# --- 6) If still empty, raise an error instead of using a default response # --- 6) If still empty, raise an error
if not full_response.strip(): if not full_response.strip():
raise Exception( raise Exception(
"No content received from streaming response. Received empty chunks or failed to extract content." "No content received from streaming response. Received empty chunks or failed to extract content."
) )
# --- 7) Check for tool calls in the final response # --- 7) Log token usage and emit completion event
tool_calls = None
try:
if last_chunk:
choices = None
if isinstance(last_chunk, dict) and "choices" in last_chunk:
choices = last_chunk["choices"]
elif hasattr(last_chunk, "choices"):
if not isinstance(getattr(last_chunk, "choices"), type):
choices = getattr(last_chunk, "choices")
if choices and len(choices) > 0:
choice = choices[0]
message = None
if isinstance(choice, dict) and "message" in choice:
message = choice["message"]
elif hasattr(choice, "message"):
message = getattr(choice, "message")
if message:
if isinstance(message, dict) and "tool_calls" in message:
tool_calls = message["tool_calls"]
elif hasattr(message, "tool_calls"):
tool_calls = getattr(message, "tool_calls")
except Exception as e:
logging.debug(f"Error checking for tool calls: {e}")
# --- 8) If no tool calls or no available functions, return the text response directly
if not tool_calls or not available_functions:
# Log token usage if available in streaming mode
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# Emit completion event and return response
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
return full_response
# --- 9) Handle tool calls if present
tool_result = self._handle_tool_call(tool_calls, available_functions)
if tool_result is not None:
return tool_result
# --- 10) Log token usage if available in streaming mode
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk) self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# --- 11) Emit completion event and return response
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL) self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
return full_response return full_response
@@ -561,7 +591,6 @@ class LLM(BaseLLM):
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL) self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
return full_response return full_response
# Emit failed event and re-raise the exception
crewai_event_bus.emit( crewai_event_bus.emit(
self, self,
event=LLMCallFailedEvent(error=str(e)), event=LLMCallFailedEvent(error=str(e)),