mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 08:08:32 +00:00
stream working too
This commit is contained in:
@@ -286,6 +286,7 @@ class LLM:
|
||||
|
||||
Args:
|
||||
params: Parameters for the completion call
|
||||
callbacks: Optional list of callback functions
|
||||
available_functions: Dict of available functions
|
||||
|
||||
Returns:
|
||||
@@ -295,10 +296,10 @@ class LLM:
|
||||
full_response = ""
|
||||
last_chunk = None
|
||||
chunk_count = 0
|
||||
debug_info = []
|
||||
|
||||
# --- 2) Make sure stream is set to True
|
||||
# --- 2) Make sure stream is set to True and include usage metrics
|
||||
params["stream"] = True
|
||||
params["stream_options"] = {"include_usage": True}
|
||||
|
||||
try:
|
||||
# --- 3) Process each chunk in the stream
|
||||
@@ -306,28 +307,21 @@ class LLM:
|
||||
chunk_count += 1
|
||||
last_chunk = chunk
|
||||
|
||||
# Add debug info
|
||||
debug_info.append(f"Chunk type: {type(chunk)}")
|
||||
|
||||
# Extract content from the chunk
|
||||
chunk_content = None
|
||||
|
||||
# Handle ModelResponse objects
|
||||
if isinstance(chunk, ModelResponse):
|
||||
debug_info.append("Chunk is ModelResponse")
|
||||
|
||||
# Get usage information from the chunk
|
||||
# Get usage information from the chunk (if any)
|
||||
usage_info = getattr(chunk, "usage", None)
|
||||
|
||||
choices = getattr(chunk, "choices", [])
|
||||
if choices and len(choices) > 0:
|
||||
choice = choices[0]
|
||||
debug_info.append(f"Choice type: {type(choice)}")
|
||||
|
||||
# Handle dictionary-style choices
|
||||
if isinstance(choice, dict):
|
||||
delta = choice.get("delta", {})
|
||||
debug_info.append(f"Delta: {delta}")
|
||||
if (
|
||||
isinstance(delta, dict)
|
||||
and "content" in delta
|
||||
@@ -338,7 +332,6 @@ class LLM:
|
||||
# Handle object-style choices
|
||||
else:
|
||||
delta = getattr(choice, "delta", None)
|
||||
debug_info.append(f"Delta: {delta}")
|
||||
|
||||
if delta is not None:
|
||||
if (
|
||||
@@ -356,8 +349,6 @@ class LLM:
|
||||
self,
|
||||
event=LLMStreamChunkEvent(chunk=chunk_content),
|
||||
)
|
||||
else:
|
||||
debug_info.append(f"No content found in chunk: {chunk}")
|
||||
|
||||
# --- 4) Fallback to non-streaming if no content received
|
||||
if not full_response.strip() and chunk_count == 0:
|
||||
@@ -366,15 +357,15 @@ class LLM:
|
||||
)
|
||||
non_streaming_params = params.copy()
|
||||
non_streaming_params["stream"] = False
|
||||
non_streaming_params.pop(
|
||||
"stream_options", None
|
||||
) # Remove stream_options for non-streaming call
|
||||
return self._handle_non_streaming_response(
|
||||
non_streaming_params, callbacks, available_functions
|
||||
)
|
||||
|
||||
# --- 5) Handle empty response with chunks
|
||||
if not full_response.strip() and chunk_count > 0:
|
||||
logging.warning(
|
||||
f"Received {chunk_count} chunks but no content. Debug info: {debug_info}"
|
||||
)
|
||||
if last_chunk is not None and isinstance(last_chunk, ModelResponse):
|
||||
usage_info = getattr(last_chunk, "usage", None)
|
||||
|
||||
@@ -398,6 +389,10 @@ class LLM:
|
||||
logging.warning("Using default response as fallback")
|
||||
full_response = "I apologize, but I couldn't generate a proper response. Please try again or rephrase your request."
|
||||
|
||||
print("LAST CHUNK:", last_chunk)
|
||||
if hasattr(last_chunk, "usage"):
|
||||
print("LAST CHUNK USAGE:", last_chunk.usage)
|
||||
|
||||
# --- 7) Check for tool calls in the final response
|
||||
if isinstance(last_chunk, ModelResponse):
|
||||
usage_info = getattr(last_chunk, "usage", None)
|
||||
@@ -414,26 +409,29 @@ class LLM:
|
||||
return tool_result
|
||||
|
||||
# --- 8) Log token usage if available in streaming mode
|
||||
# Use usage info from the last chunk if present
|
||||
usage_info = getattr(last_chunk, "usage", None) if last_chunk else None
|
||||
if usage_info and self.callbacks and len(self.callbacks) > 0:
|
||||
for callback in self.callbacks:
|
||||
|
||||
# Safely handle callbacks with usage info
|
||||
if callbacks and len(callbacks) > 0:
|
||||
for callback in callbacks:
|
||||
if hasattr(callback, "log_success_event"):
|
||||
callback.log_success_event(
|
||||
kwargs=params,
|
||||
response_obj={"usage": usage_info},
|
||||
start_time=0,
|
||||
end_time=0,
|
||||
usage_info = (
|
||||
getattr(last_chunk, "usage", None) if last_chunk else None
|
||||
)
|
||||
print("USAGE INFO", usage_info)
|
||||
if usage_info:
|
||||
callback.log_success_event(
|
||||
kwargs=params,
|
||||
response_obj={"usage": usage_info},
|
||||
start_time=0,
|
||||
end_time=0,
|
||||
)
|
||||
|
||||
# --- 9) Emit completion event and return response
|
||||
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
|
||||
return full_response
|
||||
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
f"Error in streaming response: {str(e)}, Debug info: {debug_info}"
|
||||
)
|
||||
logging.error(f"Error in streaming response: {str(e)}")
|
||||
if full_response.strip():
|
||||
logging.warning(f"Returning partial response despite error: {str(e)}")
|
||||
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
|
||||
@@ -442,6 +440,9 @@ class LLM:
|
||||
logging.warning("Falling back to non-streaming after error")
|
||||
non_streaming_params = params.copy()
|
||||
non_streaming_params["stream"] = False
|
||||
non_streaming_params.pop(
|
||||
"stream_options", None
|
||||
) # Remove stream_options for non-streaming call
|
||||
return self._handle_non_streaming_response(
|
||||
non_streaming_params, callbacks, available_functions
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user