diff --git a/src/crewai/llm.py b/src/crewai/llm.py index 741544662..dad82a71c 100644 --- a/src/crewai/llm.py +++ b/src/crewai/llm.py @@ -4,7 +4,9 @@ import os import sys import threading import warnings +from collections import defaultdict from contextlib import contextmanager +from types import SimpleNamespace from typing import ( Any, Dict, @@ -371,6 +373,15 @@ class LLM(BaseLLM): last_chunk = None chunk_count = 0 usage_info = None + tool_calls = None + accumulated_tool_args = defaultdict( + lambda: SimpleNamespace( + function=SimpleNamespace( + name="", + arguments="", + ), + ), + ) # --- 2) Make sure stream is set to True and include usage metrics params["stream"] = True @@ -428,6 +439,19 @@ class LLM(BaseLLM): if chunk_content is None and isinstance(delta, dict): # Some models might send empty content chunks chunk_content = "" + + if "tool_calls" in delta: + tool_calls = delta["tool_calls"] + + if tool_calls: + result = self._handle_streaming_tool_calls( + tool_calls=tool_calls, + accumulated_tool_args=accumulated_tool_args, + available_functions=available_functions, + ) + if result is not None: + chunk_content = result + except Exception as e: logging.debug(f"Error extracting content from chunk: {e}") logging.debug(f"Chunk format: {type(chunk)}, content: {chunk}") @@ -501,7 +525,7 @@ class LLM(BaseLLM): ) # --- 6) If still empty, raise an error instead of using a default response - if not full_response.strip(): + if not full_response.strip() and len(accumulated_tool_args) == 0: raise Exception( "No content received from streaming response. Received empty chunks or failed to extract content." ) @@ -568,6 +592,52 @@ class LLM(BaseLLM): ) raise Exception(f"Failed to get streaming response: {str(e)}") + def _handle_streaming_tool_calls( + self, + tool_calls: List[Any], + accumulated_tool_args: Dict[int, SimpleNamespace], + available_functions: Optional[Dict[str, Any]] = None, + ) -> None | str: + + for tool_call in tool_calls: + if tool_call.function.name: + accumulated_tool_args[tool_call.index].function.name = ( + tool_call.function.name + ) + + if tool_call.function.arguments: + accumulated_tool_args[ + tool_call.index + ].function.arguments += tool_call.function.arguments + + crewai_event_bus.emit( + self, + event=LLMStreamChunkEvent( + tool_call=tool_call.to_dict(), + chunk=tool_call.function.arguments, + ), + ) + + if ( + accumulated_tool_args[tool_call.index].function.name + and accumulated_tool_args[tool_call.index].function.arguments + and available_functions + ): + try: + # Try to parse the accumulated arguments + json.loads( + accumulated_tool_args[tool_call.index].function.arguments + ) + + # Execute the tool call + return self._handle_tool_call( + [accumulated_tool_args[tool_call.index]], + available_functions, + ) + except json.JSONDecodeError: + # If JSON is incomplete, continue accumulating + continue + def _handle_streaming_callbacks( self, callbacks: Optional[List[Any]], diff --git a/src/crewai/utilities/events/llm_events.py b/src/crewai/utilities/events/llm_events.py index 07a17a48b..0d271f41b 100644 --- a/src/crewai/utilities/events/llm_events.py +++ b/src/crewai/utilities/events/llm_events.py @@ -46,3 +46,4 @@ class LLMStreamChunkEvent(BaseEvent): type: str = "llm_stream_chunk" chunk: str + tool_call: Optional[dict] = None