From b6a4af584deb58acddc92cc030dbacb68b87f53b Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Thu, 2 Jul 2026 11:33:26 -0700 Subject: [PATCH] Document streamed tool call arguments --- docs/edge/en/concepts/streaming.mdx | 29 +++ docs/edge/en/learn/consuming-streams.mdx | 32 +++ .../en/learn/streaming-crew-execution.mdx | 9 +- .../en/learn/streaming-runtime-contract.mdx | 27 +++ .../src/crewai/events/event_listener.py | 8 + .../crewai/events/utils/console_formatter.py | 29 ++- lib/crewai/src/crewai/llms/base_llm.py | 34 +++ .../llms/providers/openai/completion.py | 205 +++++++++++++++++- lib/crewai/tests/llms/openai/test_openai.py | 190 ++++++++++++++++ .../test_console_formatter_pause_resume.py | 36 +++ scripts/stream_tool_call_runner.py | 84 +++++++ 11 files changed, 667 insertions(+), 16 deletions(-) create mode 100644 scripts/stream_tool_call_runner.py diff --git a/docs/edge/en/concepts/streaming.mdx b/docs/edge/en/concepts/streaming.mdx index 9813864b0..aa242dd03 100644 --- a/docs/edge/en/concepts/streaming.mdx +++ b/docs/edge/en/concepts/streaming.mdx @@ -61,6 +61,35 @@ Frames are grouped into high-level channels: The stream itself remains one ordered timeline. Channel projections let consumers focus on only part of that timeline. +### Tool Call Streaming + +Tool calls appear in two places because there are two distinct phases: + +| Phase | Channel | Event type | Meaning | +|-------|---------|------------|---------| +| Tool-call construction | `llm` | `llm_stream_chunk` | The model is streaming the tool name or arguments it intends to call | +| Tool execution | `tools` | `tool_usage_started`, `tool_usage_finished`, `tool_usage_error` | CrewAI is executing the tool and reporting the result | + +For streamed tool-call arguments, the latest provider delta is available as `frame.data["chunk"]`. The accumulated argument string is available at `frame.data["tool_call"]["function"]["arguments"]`. + +```python +with llm.stream_events("Check the weather in Paris.", tools=[weather_tool]) as stream: + for frame in stream.llm: + if frame.type != "llm_stream_chunk": + continue + + tool_call = frame.data.get("tool_call") + if tool_call: + function = tool_call["function"] + print("Tool:", function["name"]) + print("Latest argument delta:", frame.data["chunk"]) + print("Arguments so far:", function["arguments"]) + elif frame.content: + print(frame.content, end="", flush=True) +``` + +Use the `tools` channel when you want to display that a tool actually started, completed, or failed. Use the `llm` channel when you want to observe the model constructing the tool call before execution. + ```mermaid flowchart LR A["flow
flow_started"] --> B["llm
llm_call_started"] diff --git a/docs/edge/en/learn/consuming-streams.mdx b/docs/edge/en/learn/consuming-streams.mdx index 6ece26f3a..c39b05972 100644 --- a/docs/edge/en/learn/consuming-streams.mdx +++ b/docs/edge/en/learn/consuming-streams.mdx @@ -70,6 +70,38 @@ result = stream.result `frame.event` is the structured payload for the source event. Use it for metadata such as tool names, arguments, message roles, and runtime identifiers. +## Print Streamed Tool Call Arguments + +When a model supports streamed tool calling, CrewAI emits partial tool-call arguments as `llm_stream_chunk` frames on the `llm` channel. These frames are different from tool execution events: + +- `llm` channel: the model is constructing the tool call. +- `tools` channel: CrewAI is executing the tool. + +The current delta is stored in `frame.event["chunk"]`. The accumulated arguments are stored in `frame.event["tool_call"]["function"]["arguments"]`. + +```python +with llm.stream_events("Get the weather in Paris.", tools=[weather_tool]) as stream: + for frame in stream.interleave(["llm", "tools"]): + if frame.channel == "llm" and frame.type == "llm_stream_chunk": + tool_call = frame.event.get("tool_call") + if tool_call: + function = tool_call["function"] + print(f"\nPreparing tool: {function['name']}") + print(f"Arguments so far: {function['arguments']}") + elif frame.content: + print(frame.content, end="", flush=True) + + elif frame.channel == "tools" and frame.type == "tool_usage_started": + print(f"\nRunning tool: {frame.event.get('tool_name')}") + + elif frame.channel == "tools" and frame.type == "tool_usage_finished": + print(f"\nTool finished: {frame.event.get('tool_name')}") + +result = stream.result +``` + +Some providers emit arguments in small JSON fragments. For display, prefer the accumulated argument string over the latest delta. + ## Watch Flow Progress Flow lifecycle and method execution frames arrive on the `flow` channel: diff --git a/docs/edge/en/learn/streaming-crew-execution.mdx b/docs/edge/en/learn/streaming-crew-execution.mdx index ff0a3cd7f..40cc1743a 100644 --- a/docs/edge/en/learn/streaming-crew-execution.mdx +++ b/docs/edge/en/learn/streaming-crew-execution.mdx @@ -240,15 +240,18 @@ for chunk in streaming: ### TOOL_CALL Chunks -Information about tool calls being made: +Information about tool calls being made. Depending on the provider, CrewAI may receive tool-call arguments incrementally. In that case, each `TOOL_CALL` chunk contains the latest streamed content in `chunk.content`, while `chunk.tool_call.arguments` contains the accumulated argument string so far. ```python Code for chunk in streaming: if chunk.chunk_type == StreamChunkType.TOOL_CALL: print(f"\nCalling tool: {chunk.tool_call.tool_name}") - print(f"Arguments: {chunk.tool_call.arguments}") + print(f"Latest argument delta: {chunk.content}") + print(f"Arguments so far: {chunk.tool_call.arguments}") ``` +Actual tool execution is reported separately through tool usage events in the runtime event stream and through CrewAI's verbose console output. `TOOL_CALL` chunks represent the model constructing the tool request before or during execution. + ## Practical Example: Building a UI with Streaming Here's a complete example showing how to build an interactive application with streaming: @@ -381,4 +384,4 @@ except Exception as e: print("Streaming completed but an error occurred") ``` -By leveraging streaming, you can build more responsive and interactive applications with CrewAI, providing users with real-time visibility into agent execution and results. \ No newline at end of file +By leveraging streaming, you can build more responsive and interactive applications with CrewAI, providing users with real-time visibility into agent execution and results. diff --git a/docs/edge/en/learn/streaming-runtime-contract.mdx b/docs/edge/en/learn/streaming-runtime-contract.mdx index b7116d470..6434c2f8b 100644 --- a/docs/edge/en/learn/streaming-runtime-contract.mdx +++ b/docs/edge/en/learn/streaming-runtime-contract.mdx @@ -145,6 +145,33 @@ result = stream.result `llm.stream_events(...)` temporarily enables streaming for the wrapped call and restores the LLM's previous `stream` setting afterward. Provider integrations continue to emit the underlying LLM stream events; this helper provides a common iterator API over those events for every LLM provider. +### Tool Call Argument Deltas + +Native tool-call streaming uses `llm_stream_chunk` frames on the `llm` channel. This represents the model constructing a tool call. It is separate from the `tools` channel, which represents CrewAI executing that tool. + +For a streamed tool call, the frame payload includes: + +```python +frame.type == "llm_stream_chunk" +frame.channel == "llm" +frame.event["call_type"] == "tool_call" +frame.event["chunk"] # latest argument delta from the provider +frame.event["tool_call"] # accumulated tool-call state +``` + +The `tool_call` payload follows the OpenAI-style function call shape: + +```python +tool_call = frame.event["tool_call"] +tool_call["id"] +tool_call["index"] +tool_call["type"] # "function" +tool_call["function"]["name"] +tool_call["function"]["arguments"] # accumulated JSON argument string +``` + +Providers differ in granularity. OpenAI and Anthropic may stream arguments as small JSON fragments, while some providers emit the complete argument object in one chunk. Consumers should treat `frame.event["chunk"]` as the latest delta and `tool_call["function"]["arguments"]` as the current accumulated state. + ## Conversational Turns Conversational Flows can stream one user turn with `stream_turn()`: diff --git a/lib/crewai/src/crewai/events/event_listener.py b/lib/crewai/src/crewai/events/event_listener.py index a1a771f44..ae7fdc029 100644 --- a/lib/crewai/src/crewai/events/event_listener.py +++ b/lib/crewai/src/crewai/events/event_listener.py @@ -64,6 +64,7 @@ from crewai.events.types.llm_events import ( LLMCallCompletedEvent, LLMCallFailedEvent, LLMCallStartedEvent, + LLMCallType, LLMStreamChunkEvent, ) from crewai.events.types.llm_guardrail_events import ( @@ -455,6 +456,13 @@ class EventListener(BaseEventListener): @crewai_event_bus.on(LLMStreamChunkEvent) def on_llm_stream_chunk(_: Any, event: LLMStreamChunkEvent) -> None: + if event.call_type == LLMCallType.TOOL_CALL: + self.formatter.handle_llm_stream_chunk( + event.tool_call.function.arguments if event.tool_call else "", + event.call_type, + ) + return + self.text_stream.write(event.chunk) self.text_stream.seek(self.next_chunk) self.text_stream.read() diff --git a/lib/crewai/src/crewai/events/utils/console_formatter.py b/lib/crewai/src/crewai/events/utils/console_formatter.py index 604c7b051..501e29aa4 100644 --- a/lib/crewai/src/crewai/events/utils/console_formatter.py +++ b/lib/crewai/src/crewai/events/utils/console_formatter.py @@ -437,6 +437,8 @@ To enable tracing, do any one of these: if not self.verbose: return + self.pause_live_updates() + with self._tool_counts_lock: self.tool_usage_counts[tool_name] = ( self.tool_usage_counts.get(tool_name, 0) + 1 @@ -468,6 +470,8 @@ To enable tracing, do any one of these: if not self.verbose: return + self.pause_live_updates() + with self._tool_counts_lock: iteration = self.tool_usage_counts.get(tool_name, 1) @@ -495,6 +499,8 @@ To enable tracing, do any one of these: if not self.verbose: return + self.pause_live_updates() + with self._tool_counts_lock: iteration = self.tool_usage_counts.get(tool_name, 1) @@ -543,6 +549,15 @@ To enable tracing, do any one of these: if should_suppress_console_output(): return + from crewai.events.types.llm_events import LLMCallType + + if call_type == LLMCallType.TOOL_CALL: + self.pause_live_updates() + self._is_streaming = False + self._last_stream_call_type = call_type + self._just_streamed_final_answer = False + return + self._is_streaming = True self._last_stream_call_type = call_type @@ -554,17 +569,9 @@ To enable tracing, do any one of these: display_text = "...\n" + display_text content = Text() - - from crewai.events.types.llm_events import LLMCallType - - if call_type == LLMCallType.TOOL_CALL: - content.append(display_text, style="yellow") - title = "🔧 Tool Arguments" - border_style = "yellow" - else: - content.append(display_text, style="bright_green") - title = "✅ Agent Final Answer" - border_style = "green" + content.append(display_text, style="bright_green") + title = "✅ Agent Final Answer" + border_style = "green" streaming_panel = Panel( content, diff --git a/lib/crewai/src/crewai/llms/base_llm.py b/lib/crewai/src/crewai/llms/base_llm.py index d679bf670..ad846daba 100644 --- a/lib/crewai/src/crewai/llms/base_llm.py +++ b/lib/crewai/src/crewai/llms/base_llm.py @@ -685,6 +685,40 @@ class BaseLLM(BaseModel, ABC): ), ) + def _emit_tool_call_stream_chunk_event( + self, + *, + chunk: str, + tool_call_id: str | None, + tool_name: str | None, + arguments: str, + index: int, + from_task: Task | None = None, + from_agent: BaseAgent | None = None, + response_id: str | None = None, + ) -> None: + """Emit a normalized streamed tool-call chunk. + + ``chunk`` is the latest provider delta while ``arguments`` is the + accumulated argument string for consumers that want current state. + """ + self._emit_stream_chunk_event( + chunk=chunk, + from_task=from_task, + from_agent=from_agent, + tool_call={ + "id": tool_call_id, + "function": { + "name": tool_name or "", + "arguments": arguments, + }, + "type": "function", + "index": index, + }, + call_type=LLMCallType.TOOL_CALL, + response_id=response_id, + ) + def _emit_thinking_chunk_event( self, chunk: str, diff --git a/lib/crewai/src/crewai/llms/providers/openai/completion.py b/lib/crewai/src/crewai/llms/providers/openai/completion.py index 77d2bbbdd..d44d4fde8 100644 --- a/lib/crewai/src/crewai/llms/providers/openai/completion.py +++ b/lib/crewai/src/crewai/llms/providers/openai/completion.py @@ -1102,6 +1102,7 @@ class OpenAICompletion(BaseLLM): """Handle streaming Responses API call.""" full_response = "" function_calls: list[dict[str, Any]] = [] + streaming_function_calls: dict[str, dict[str, Any]] = {} final_response: Response | None = None usage: dict[str, Any] | None = None @@ -1123,11 +1124,46 @@ class OpenAICompletion(BaseLLM): ) elif event.type == "response.function_call_arguments.delta": - pass + self._process_responses_function_call_delta( + event=event, + function_calls=streaming_function_calls, + from_task=from_task, + from_agent=from_agent, + response_id=response_id_stream, + ) + + elif event.type == "response.function_call_arguments.done": + self._process_responses_function_call_done( + event=event, + function_calls=streaming_function_calls, + from_task=from_task, + from_agent=from_agent, + response_id=response_id_stream, + ) + + elif event.type == "response.output_item.added": + item = event.item + if item.type == "function_call": + self._process_responses_function_call_started( + event=event, + function_calls=streaming_function_calls, + from_task=from_task, + from_agent=from_agent, + response_id=response_id_stream, + ) elif event.type == "response.output_item.done": item = event.item if item.type == "function_call": + call_key = self._responses_function_call_key(event) + if call_key in streaming_function_calls: + streaming_function_calls[call_key].update( + { + "id": item.call_id, + "name": item.name, + "arguments": item.arguments, + } + ) function_calls.append( { "id": item.call_id, @@ -1239,6 +1275,7 @@ class OpenAICompletion(BaseLLM): """Handle async streaming Responses API call.""" full_response = "" function_calls: list[dict[str, Any]] = [] + streaming_function_calls: dict[str, dict[str, Any]] = {} final_response: Response | None = None usage: dict[str, Any] | None = None @@ -1260,11 +1297,46 @@ class OpenAICompletion(BaseLLM): ) elif event.type == "response.function_call_arguments.delta": - pass + self._process_responses_function_call_delta( + event=event, + function_calls=streaming_function_calls, + from_task=from_task, + from_agent=from_agent, + response_id=response_id_stream, + ) + + elif event.type == "response.function_call_arguments.done": + self._process_responses_function_call_done( + event=event, + function_calls=streaming_function_calls, + from_task=from_task, + from_agent=from_agent, + response_id=response_id_stream, + ) + + elif event.type == "response.output_item.added": + item = event.item + if item.type == "function_call": + self._process_responses_function_call_started( + event=event, + function_calls=streaming_function_calls, + from_task=from_task, + from_agent=from_agent, + response_id=response_id_stream, + ) elif event.type == "response.output_item.done": item = event.item if item.type == "function_call": + call_key = self._responses_function_call_key(event) + if call_key in streaming_function_calls: + streaming_function_calls[call_key].update( + { + "id": item.call_id, + "name": item.name, + "arguments": item.arguments, + } + ) function_calls.append( { "id": item.call_id, @@ -1363,6 +1435,135 @@ class OpenAICompletion(BaseLLM): return full_response + @staticmethod + def _responses_function_call_key(event: Any) -> str: + item = getattr(event, "item", None) + item_id = getattr(event, "item_id", None) or getattr(item, "id", None) + if item_id: + return str(item_id) + output_index = getattr(event, "output_index", None) + if output_index is not None: + return f"output:{output_index}" + return "output:0" + + def _process_responses_function_call_delta( + self, + *, + event: Any, + function_calls: dict[str, dict[str, Any]], + from_task: Any | None = None, + from_agent: Any | None = None, + response_id: str | None = None, + ) -> None: + call_key = self._responses_function_call_key(event) + output_index = getattr(event, "output_index", 0) or 0 + item_id = getattr(event, "item_id", None) + delta = getattr(event, "delta", "") or "" + call_data = function_calls.setdefault( + call_key, + { + "id": item_id, + "name": "", + "arguments": "", + "index": output_index, + "streamed_arguments": False, + }, + ) + if item_id and not call_data.get("id"): + call_data["id"] = item_id + call_data["arguments"] += delta + call_data["streamed_arguments"] = True + + self._emit_tool_call_stream_chunk_event( + chunk=delta, + tool_call_id=call_data.get("id"), + tool_name=call_data.get("name"), + arguments=call_data["arguments"], + index=call_data["index"], + from_task=from_task, + from_agent=from_agent, + response_id=response_id, + ) + + def _process_responses_function_call_started( + self, + *, + event: Any, + function_calls: dict[str, dict[str, Any]], + from_task: Any | None = None, + from_agent: Any | None = None, + response_id: str | None = None, + ) -> None: + call_key = self._responses_function_call_key(event) + output_index = getattr(event, "output_index", 0) or 0 + item = getattr(event, "item", None) + item_id = getattr(item, "id", None) + call_data = function_calls.setdefault( + call_key, + { + "id": getattr(item, "call_id", None) or item_id, + "name": getattr(item, "name", None) or "", + "arguments": "", + "index": output_index, + "streamed_arguments": False, + }, + ) + call_data["id"] = call_data.get("id") or getattr(item, "call_id", None) + call_data["name"] = call_data.get("name") or getattr(item, "name", None) or "" + + self._emit_tool_call_stream_chunk_event( + chunk="", + tool_call_id=call_data.get("id"), + tool_name=call_data.get("name"), + arguments=call_data["arguments"], + index=call_data["index"], + from_task=from_task, + from_agent=from_agent, + response_id=response_id, + ) + + def _process_responses_function_call_done( + self, + *, + event: Any, + function_calls: dict[str, dict[str, Any]], + from_task: Any | None = None, + from_agent: Any | None = None, + response_id: str | None = None, + ) -> None: + call_key = self._responses_function_call_key(event) + output_index = getattr(event, "output_index", 0) or 0 + item_id = getattr(event, "item_id", None) + arguments = getattr(event, "arguments", "") or "" + call_data = function_calls.setdefault( + call_key, + { + "id": item_id, + "name": "", + "arguments": "", + "index": output_index, + "streamed_arguments": False, + }, + ) + if item_id and not call_data.get("id"): + call_data["id"] = item_id + call_data["name"] = getattr(event, "name", None) or call_data.get("name", "") + + if not call_data.get("streamed_arguments") and arguments: + call_data["arguments"] = arguments + self._emit_tool_call_stream_chunk_event( + chunk=arguments, + tool_call_id=call_data.get("id"), + tool_name=call_data.get("name"), + arguments=call_data["arguments"], + index=call_data["index"], + from_task=from_task, + from_agent=from_agent, + response_id=response_id, + ) + elif arguments: + call_data["arguments"] = arguments + def _extract_function_calls_from_response( self, response: Response ) -> list[dict[str, Any]]: diff --git a/lib/crewai/tests/llms/openai/test_openai.py b/lib/crewai/tests/llms/openai/test_openai.py index 836abe838..0fc9ba4a3 100644 --- a/lib/crewai/tests/llms/openai/test_openai.py +++ b/lib/crewai/tests/llms/openai/test_openai.py @@ -7,6 +7,7 @@ import openai import pytest from crewai.llm import LLM +from crewai.events.types.llm_events import LLMCallType, LLMStreamChunkEvent from crewai.llms.providers.openai.completion import OpenAICompletion, ResponsesAPIResult from crewai.crew import Crew from crewai.agent import Agent @@ -1887,6 +1888,195 @@ def test_openai_responses_api_no_detail_fields_omitted(): assert "reasoning_tokens" not in usage +def test_openai_responses_streaming_emits_tool_call_argument_deltas(): + llm = OpenAICompletion(model="gpt-4o", api="responses", stream=True) + stream_events = [ + types.SimpleNamespace( + type="response.created", + response=types.SimpleNamespace(id="resp_123"), + ), + types.SimpleNamespace( + type="response.output_item.added", + output_index=0, + item=types.SimpleNamespace( + type="function_call", + id="fc_123", + call_id="call_123", + name="get_weather", + arguments="", + ), + ), + types.SimpleNamespace( + type="response.function_call_arguments.delta", + item_id="fc_123", + output_index=0, + delta='{"city"', + ), + types.SimpleNamespace( + type="response.function_call_arguments.delta", + item_id="fc_123", + output_index=0, + delta=': "Paris"}', + ), + types.SimpleNamespace( + type="response.function_call_arguments.done", + item_id="fc_123", + output_index=0, + name="get_weather", + arguments='{"city": "Paris"}', + ), + types.SimpleNamespace( + type="response.output_item.done", + output_index=0, + item=types.SimpleNamespace( + type="function_call", + id="fc_123", + call_id="call_123", + name="get_weather", + arguments='{"city": "Paris"}', + ), + ), + types.SimpleNamespace( + type="response.completed", + response=types.SimpleNamespace( + id="resp_123", + status="completed", + usage=None, + ), + ), + ] + fake_client = types.SimpleNamespace( + responses=types.SimpleNamespace(create=lambda **_: iter(stream_events)) + ) + + with patch.object(llm, "_get_sync_client", return_value=fake_client): + with patch("crewai.events.event_bus.CrewAIEventsBus.emit") as mock_emit: + llm._handle_streaming_responses({"input": []}) + + tool_call_events = [ + call.kwargs["event"] + for call in mock_emit.call_args_list + if isinstance(call.kwargs.get("event"), LLMStreamChunkEvent) + and call.kwargs["event"].call_type == LLMCallType.TOOL_CALL + ] + + assert [event.chunk for event in tool_call_events] == [ + "", + '{"city"', + ': "Paris"}', + ] + assert [ + event.tool_call.function.arguments for event in tool_call_events + ] == ["", '{"city"', '{"city": "Paris"}'] + assert all(event.tool_call.id == "call_123" for event in tool_call_events) + assert all( + event.tool_call.function.name == "get_weather" for event in tool_call_events + ) + + +@pytest.mark.asyncio +async def test_openai_responses_async_streaming_emits_tool_call_argument_deltas(): + llm = OpenAICompletion(model="gpt-4o", api="responses", stream=True) + stream_events = [ + types.SimpleNamespace( + type="response.created", + response=types.SimpleNamespace(id="resp_123"), + ), + types.SimpleNamespace( + type="response.output_item.added", + output_index=0, + item=types.SimpleNamespace( + type="function_call", + id="fc_123", + call_id="call_123", + name="get_weather", + arguments="", + ), + ), + types.SimpleNamespace( + type="response.function_call_arguments.delta", + item_id="fc_123", + output_index=0, + delta='{"city"', + ), + types.SimpleNamespace( + type="response.function_call_arguments.delta", + item_id="fc_123", + output_index=0, + delta=': "Paris"}', + ), + types.SimpleNamespace( + type="response.function_call_arguments.done", + item_id="fc_123", + output_index=0, + name="get_weather", + arguments='{"city": "Paris"}', + ), + types.SimpleNamespace( + type="response.output_item.done", + output_index=0, + item=types.SimpleNamespace( + type="function_call", + id="fc_123", + call_id="call_123", + name="get_weather", + arguments='{"city": "Paris"}', + ), + ), + types.SimpleNamespace( + type="response.completed", + response=types.SimpleNamespace( + id="resp_123", + status="completed", + usage=None, + ), + ), + ] + + class MockAsyncStream: + def __init__(self, events: list[Any]) -> None: + self._events = events + self._index = 0 + + def __aiter__(self) -> "MockAsyncStream": + return self + + async def __anext__(self) -> Any: + if self._index >= len(self._events): + raise StopAsyncIteration + event = self._events[self._index] + self._index += 1 + return event + + fake_client = types.SimpleNamespace( + responses=types.SimpleNamespace(create=lambda **_: MockAsyncStream(stream_events)) + ) + + with patch.object(llm, "_get_async_client", return_value=fake_client): + with patch("crewai.events.event_bus.CrewAIEventsBus.emit") as mock_emit: + await llm._ahandle_streaming_responses({"input": []}) + + tool_call_events = [ + call.kwargs["event"] + for call in mock_emit.call_args_list + if isinstance(call.kwargs.get("event"), LLMStreamChunkEvent) + and call.kwargs["event"].call_type == LLMCallType.TOOL_CALL + ] + + assert [event.chunk for event in tool_call_events] == [ + "", + '{"city"', + ': "Paris"}', + ] + assert [ + event.tool_call.function.arguments for event in tool_call_events + ] == ["", '{"city"', '{"city": "Paris"}'] + assert all(event.tool_call.id == "call_123" for event in tool_call_events) + assert all( + event.tool_call.function.name == "get_weather" for event in tool_call_events + ) + + @pytest.mark.asyncio async def test_openai_async_streaming_returns_tool_calls_without_available_functions(): """Test that async streaming returns tool calls list when available_functions is None. diff --git a/lib/crewai/tests/utilities/test_console_formatter_pause_resume.py b/lib/crewai/tests/utilities/test_console_formatter_pause_resume.py index 1ffbb3850..9c9fe2e28 100644 --- a/lib/crewai/tests/utilities/test_console_formatter_pause_resume.py +++ b/lib/crewai/tests/utilities/test_console_formatter_pause_resume.py @@ -1,5 +1,6 @@ from unittest.mock import MagicMock, patch from rich.live import Live +from crewai.events.types.llm_events import LLMCallType from crewai.events.utils.console_formatter import ConsoleFormatter @@ -110,3 +111,38 @@ class TestConsoleFormatterPauseResume: # Streaming again creates new session formatter.handle_llm_stream_chunk("chunk 2", call_type=None) assert formatter._streaming_live == mock_live_instance_2 + + def test_tool_call_stream_chunk_does_not_create_live_panel(self): + formatter = ConsoleFormatter(verbose=True) + + with patch("crewai.events.utils.console_formatter.Live") as mock_live_class: + formatter.handle_llm_stream_chunk( + '{"city":"Paris"}', call_type=LLMCallType.TOOL_CALL + ) + + mock_live_class.assert_not_called() + assert formatter._streaming_live is None + assert formatter._is_streaming is False + + def test_tool_call_stream_chunk_stops_existing_live_panel(self): + formatter = ConsoleFormatter(verbose=True) + mock_live = MagicMock(spec=Live) + formatter._streaming_live = mock_live + + formatter.handle_llm_stream_chunk( + '{"city":"Paris"}', call_type=LLMCallType.TOOL_CALL + ) + + mock_live.stop.assert_called_once() + assert formatter._streaming_live is None + + def test_tool_usage_started_pauses_existing_live_panel(self): + formatter = ConsoleFormatter(verbose=True) + mock_live = MagicMock(spec=Live) + formatter._streaming_live = mock_live + + with patch.object(formatter, "print_panel"): + formatter.handle_tool_usage_started("get_weather", {"city": "Paris"}) + + mock_live.stop.assert_called_once() + assert formatter._streaming_live is None diff --git a/scripts/stream_tool_call_runner.py b/scripts/stream_tool_call_runner.py new file mode 100644 index 000000000..da2a58e02 --- /dev/null +++ b/scripts/stream_tool_call_runner.py @@ -0,0 +1,84 @@ +"""Real OpenAI Responses API runner for streamed tool-call deltas. + +Fill in ``OPENAI_API_KEY`` below, then run from the repository root: + + uv run python scripts/stream_tool_call_runner.py +""" + +from __future__ import annotations + +import os + +# ruff: noqa: T201 +from crewai.llms.providers.openai.completion import OpenAICompletion +from dotenv import load_dotenv + + +load_dotenv() + + +MODEL = "gpt-4o-mini" + + +def get_weather(city: str) -> str: + return f"The weather in {city} is sunny and 72F." + + +def main() -> None: + + llm = OpenAICompletion( + model=MODEL, + api="responses", + api_key=os.getenv("OPENAI_API_KEY"), + stream=True, + additional_params={ + "tool_choice": {"type": "function", "name": "get_weather"}, + }, + ) + + tool_schema = { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get the weather for a city.", + "parameters": { + "type": "object", + "properties": { + "city": { + "type": "string", + "description": "The city to check.", + } + }, + "required": ["city"], + "additionalProperties": False, + }, + }, + } + + with llm.stream_events( + "Call get_weather for Paris. Do not answer directly.", + tools=[tool_schema], + available_functions={"get_weather": get_weather}, + ) as stream: + for frame in stream.llm: + if frame.type != "llm_stream_chunk": + continue + + tool_call = frame.data.get("tool_call") + if not tool_call: + print(f"text delta: {frame.content!r}") + continue + + function = tool_call["function"] + print( + "tool delta: " + f"chunk={frame.data['chunk']!r} " + f"name={function['name']!r} " + f"arguments={function['arguments']!r}" + ) + + print(f"final result: {stream.result!r}") + + +if __name__ == "__main__": + main()