Compare commits

...

1 Commits

Author SHA1 Message Date
Lucas Gomide
cf45026e75 wip 2025-04-10 12:30:36 -03:00

View File

@@ -5,6 +5,7 @@ import sys
import threading
import warnings
from contextlib import contextmanager
from types import SimpleNamespace
from typing import (
Any,
Dict,
@@ -371,6 +372,8 @@ class LLM(BaseLLM):
last_chunk = None
chunk_count = 0
usage_info = None
tool_calls = None
accumulated_tool_args = {} # Track tool call arguments by index
# --- 2) Make sure stream is set to True and include usage metrics
params["stream"] = True
@@ -392,7 +395,6 @@ class LLM(BaseLLM):
if isinstance(chunk, dict) and "choices" in chunk:
choices = chunk["choices"]
elif hasattr(chunk, "choices"):
# Check if choices is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "choices"), type):
choices = getattr(chunk, "choices")
@@ -400,14 +402,97 @@ class LLM(BaseLLM):
if isinstance(chunk, dict) and "usage" in chunk:
usage_info = chunk["usage"]
elif hasattr(chunk, "usage"):
# Check if usage is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "usage"), type):
usage_info = getattr(chunk, "usage")
if choices and len(choices) > 0:
choice = choices[0]
# Handle different delta formats
# Check for tool calls in the chunk
if "delta" in choice:
delta = choice["delta"]
if "tool_calls" in delta:
tool_calls = delta["tool_calls"]
# If we have tool calls and available functions, accumulate arguments
if tool_calls and available_functions:
for tool_call in tool_calls:
if hasattr(tool_call, "index"):
index = tool_call.index
if index not in accumulated_tool_args:
accumulated_tool_args[index] = {
"name": None,
"arguments": "",
}
# Update tool call name if available
if hasattr(
tool_call, "function"
) and hasattr(tool_call.function, "name"):
if tool_call.function.name:
accumulated_tool_args[index][
"name"
] = tool_call.function.name
# Accumulate arguments
if hasattr(
tool_call, "function"
) and hasattr(
tool_call.function, "arguments"
):
if tool_call.function.arguments:
accumulated_tool_args[index][
"arguments"
] += tool_call.function.arguments
# Check if we have a complete tool call
if (
accumulated_tool_args[index]["name"]
and accumulated_tool_args[index][
"arguments"
]
):
try:
# Try to parse the accumulated arguments
json.loads(
accumulated_tool_args[index][
"arguments"
]
)
# Execute the tool call
tool_result = self._handle_tool_call(
[
SimpleNamespace(
function=SimpleNamespace(
name=accumulated_tool_args[
index
][
"name"
],
arguments=accumulated_tool_args[
index
][
"arguments"
],
),
)
],
available_functions,
)
if tool_result is not None:
# Stream the tool result
crewai_event_bus.emit(
self,
event=LLMStreamChunkEvent(
chunk=tool_result
),
)
return tool_result
except json.JSONDecodeError:
# If JSON is incomplete, continue accumulating
continue
# Handle different delta formats for content
delta = None
if isinstance(choice, dict) and "delta" in choice:
delta = choice["delta"]
@@ -416,17 +501,13 @@ class LLM(BaseLLM):
# Extract content from delta
if delta:
# Handle dict format
if isinstance(delta, dict):
if "content" in delta and delta["content"] is not None:
chunk_content = delta["content"]
# Handle object format
elif hasattr(delta, "content"):
chunk_content = getattr(delta, "content")
# Handle case where content might be None or empty
if chunk_content is None and isinstance(delta, dict):
# Some models might send empty content chunks
chunk_content = ""
except Exception as e:
logging.debug(f"Error extracting content from chunk: {e}")
@@ -434,10 +515,7 @@ class LLM(BaseLLM):
# Only add non-None content to the response
if chunk_content is not None:
# Add the chunk content to the full response
full_response += chunk_content
# Emit the chunk event
crewai_event_bus.emit(
self,
event=LLMStreamChunkEvent(chunk=chunk_content),
@@ -450,9 +528,7 @@ class LLM(BaseLLM):
)
non_streaming_params = params.copy()
non_streaming_params["stream"] = False
non_streaming_params.pop(
"stream_options", None
) # Remove stream_options for non-streaming call
non_streaming_params.pop("stream_options", None)
return self._handle_non_streaming_response(
non_streaming_params, callbacks, available_functions
)
@@ -464,7 +540,6 @@ class LLM(BaseLLM):
)
if last_chunk is not None:
try:
# Try to extract content from the last chunk's message
choices = None
if isinstance(last_chunk, dict) and "choices" in last_chunk:
choices = last_chunk["choices"]
@@ -474,8 +549,6 @@ class LLM(BaseLLM):
if choices and len(choices) > 0:
choice = choices[0]
# Try to get content from message
message = None
if isinstance(choice, dict) and "message" in choice:
message = choice["message"]
@@ -500,57 +573,14 @@ class LLM(BaseLLM):
f"Last chunk format: {type(last_chunk)}, content: {last_chunk}"
)
# --- 6) If still empty, raise an error instead of using a default response
# --- 6) If still empty, raise an error
if not full_response.strip():
raise Exception(
"No content received from streaming response. Received empty chunks or failed to extract content."
)
# --- 7) Check for tool calls in the final response
tool_calls = None
try:
if last_chunk:
choices = None
if isinstance(last_chunk, dict) and "choices" in last_chunk:
choices = last_chunk["choices"]
elif hasattr(last_chunk, "choices"):
if not isinstance(getattr(last_chunk, "choices"), type):
choices = getattr(last_chunk, "choices")
if choices and len(choices) > 0:
choice = choices[0]
message = None
if isinstance(choice, dict) and "message" in choice:
message = choice["message"]
elif hasattr(choice, "message"):
message = getattr(choice, "message")
if message:
if isinstance(message, dict) and "tool_calls" in message:
tool_calls = message["tool_calls"]
elif hasattr(message, "tool_calls"):
tool_calls = getattr(message, "tool_calls")
except Exception as e:
logging.debug(f"Error checking for tool calls: {e}")
# --- 8) If no tool calls or no available functions, return the text response directly
if not tool_calls or not available_functions:
# Log token usage if available in streaming mode
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# Emit completion event and return response
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
return full_response
# --- 9) Handle tool calls if present
tool_result = self._handle_tool_call(tool_calls, available_functions)
if tool_result is not None:
return tool_result
# --- 10) Log token usage if available in streaming mode
# --- 7) Log token usage and emit completion event
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# --- 11) Emit completion event and return response
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
return full_response
@@ -561,7 +591,6 @@ class LLM(BaseLLM):
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
return full_response
# Emit failed event and re-raise the exception
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=str(e)),