diff --git a/src/crewai/llm.py b/src/crewai/llm.py index 741544662..0d8b95e67 100644 --- a/src/crewai/llm.py +++ b/src/crewai/llm.py @@ -5,6 +5,7 @@ import sys import threading import warnings from contextlib import contextmanager +from types import SimpleNamespace from typing import ( Any, Dict, @@ -371,6 +372,8 @@ class LLM(BaseLLM): last_chunk = None chunk_count = 0 usage_info = None + tool_calls = None + accumulated_tool_args = {} # Track tool call arguments by index # --- 2) Make sure stream is set to True and include usage metrics params["stream"] = True @@ -392,7 +395,6 @@ class LLM(BaseLLM): if isinstance(chunk, dict) and "choices" in chunk: choices = chunk["choices"] elif hasattr(chunk, "choices"): - # Check if choices is not a type but an actual attribute with value if not isinstance(getattr(chunk, "choices"), type): choices = getattr(chunk, "choices") @@ -400,14 +402,97 @@ class LLM(BaseLLM): if isinstance(chunk, dict) and "usage" in chunk: usage_info = chunk["usage"] elif hasattr(chunk, "usage"): - # Check if usage is not a type but an actual attribute with value if not isinstance(getattr(chunk, "usage"), type): usage_info = getattr(chunk, "usage") if choices and len(choices) > 0: choice = choices[0] - # Handle different delta formats + # Check for tool calls in the chunk + if "delta" in choice: + delta = choice["delta"] + if "tool_calls" in delta: + tool_calls = delta["tool_calls"] + # If we have tool calls and available functions, accumulate arguments + if tool_calls and available_functions: + for tool_call in tool_calls: + if hasattr(tool_call, "index"): + index = tool_call.index + if index not in accumulated_tool_args: + accumulated_tool_args[index] = { + "name": None, + "arguments": "", + } + + # Update tool call name if available + if hasattr( + tool_call, "function" + ) and hasattr(tool_call.function, "name"): + if tool_call.function.name: + accumulated_tool_args[index][ + "name" + ] = tool_call.function.name + + # Accumulate arguments + if hasattr( + tool_call, "function" + ) and hasattr( + tool_call.function, "arguments" + ): + if tool_call.function.arguments: + accumulated_tool_args[index][ + "arguments" + ] += tool_call.function.arguments + + # Check if we have a complete tool call + if ( + accumulated_tool_args[index]["name"] + and accumulated_tool_args[index][ + "arguments" + ] + ): + try: + # Try to parse the accumulated arguments + json.loads( + accumulated_tool_args[index][ + "arguments" + ] + ) + # Execute the tool call + tool_result = self._handle_tool_call( + [ + SimpleNamespace( + function=SimpleNamespace( + name=accumulated_tool_args[ + index + ][ + "name" + ], + arguments=accumulated_tool_args[ + index + ][ + "arguments" + ], + ), + ) + ], + available_functions, + ) + + if tool_result is not None: + # Stream the tool result + crewai_event_bus.emit( + self, + event=LLMStreamChunkEvent( + chunk=tool_result + ), + ) + return tool_result + except json.JSONDecodeError: + # If JSON is incomplete, continue accumulating + continue + + # Handle different delta formats for content delta = None if isinstance(choice, dict) and "delta" in choice: delta = choice["delta"] @@ -416,17 +501,13 @@ class LLM(BaseLLM): # Extract content from delta if delta: - # Handle dict format if isinstance(delta, dict): if "content" in delta and delta["content"] is not None: chunk_content = delta["content"] - # Handle object format elif hasattr(delta, "content"): chunk_content = getattr(delta, "content") - # Handle case where content might be None or empty if chunk_content is None and isinstance(delta, dict): - # Some models might send empty content chunks chunk_content = "" except Exception as e: logging.debug(f"Error extracting content from chunk: {e}") @@ -434,10 +515,7 @@ class LLM(BaseLLM): # Only add non-None content to the response if chunk_content is not None: - # Add the chunk content to the full response full_response += chunk_content - - # Emit the chunk event crewai_event_bus.emit( self, event=LLMStreamChunkEvent(chunk=chunk_content), @@ -450,9 +528,7 @@ class LLM(BaseLLM): ) non_streaming_params = params.copy() non_streaming_params["stream"] = False - non_streaming_params.pop( - "stream_options", None - ) # Remove stream_options for non-streaming call + non_streaming_params.pop("stream_options", None) return self._handle_non_streaming_response( non_streaming_params, callbacks, available_functions ) @@ -464,7 +540,6 @@ class LLM(BaseLLM): ) if last_chunk is not None: try: - # Try to extract content from the last chunk's message choices = None if isinstance(last_chunk, dict) and "choices" in last_chunk: choices = last_chunk["choices"] @@ -474,8 +549,6 @@ class LLM(BaseLLM): if choices and len(choices) > 0: choice = choices[0] - - # Try to get content from message message = None if isinstance(choice, dict) and "message" in choice: message = choice["message"] @@ -500,57 +573,14 @@ class LLM(BaseLLM): f"Last chunk format: {type(last_chunk)}, content: {last_chunk}" ) - # --- 6) If still empty, raise an error instead of using a default response + # --- 6) If still empty, raise an error if not full_response.strip(): raise Exception( "No content received from streaming response. Received empty chunks or failed to extract content." ) - # --- 7) Check for tool calls in the final response - tool_calls = None - try: - if last_chunk: - choices = None - if isinstance(last_chunk, dict) and "choices" in last_chunk: - choices = last_chunk["choices"] - elif hasattr(last_chunk, "choices"): - if not isinstance(getattr(last_chunk, "choices"), type): - choices = getattr(last_chunk, "choices") - - if choices and len(choices) > 0: - choice = choices[0] - - message = None - if isinstance(choice, dict) and "message" in choice: - message = choice["message"] - elif hasattr(choice, "message"): - message = getattr(choice, "message") - - if message: - if isinstance(message, dict) and "tool_calls" in message: - tool_calls = message["tool_calls"] - elif hasattr(message, "tool_calls"): - tool_calls = getattr(message, "tool_calls") - except Exception as e: - logging.debug(f"Error checking for tool calls: {e}") - - # --- 8) If no tool calls or no available functions, return the text response directly - if not tool_calls or not available_functions: - # Log token usage if available in streaming mode - self._handle_streaming_callbacks(callbacks, usage_info, last_chunk) - # Emit completion event and return response - self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL) - return full_response - - # --- 9) Handle tool calls if present - tool_result = self._handle_tool_call(tool_calls, available_functions) - if tool_result is not None: - return tool_result - - # --- 10) Log token usage if available in streaming mode + # --- 7) Log token usage and emit completion event self._handle_streaming_callbacks(callbacks, usage_info, last_chunk) - - # --- 11) Emit completion event and return response self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL) return full_response @@ -561,7 +591,6 @@ class LLM(BaseLLM): self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL) return full_response - # Emit failed event and re-raise the exception crewai_event_bus.emit( self, event=LLMCallFailedEvent(error=str(e)),