diff --git a/docs/edge/en/concepts/streaming.mdx b/docs/edge/en/concepts/streaming.mdx
index 9813864b0..aa242dd03 100644
--- a/docs/edge/en/concepts/streaming.mdx
+++ b/docs/edge/en/concepts/streaming.mdx
@@ -61,6 +61,35 @@ Frames are grouped into high-level channels:
The stream itself remains one ordered timeline. Channel projections let consumers focus on only part of that timeline.
+### Tool Call Streaming
+
+Tool calls appear in two places because there are two distinct phases:
+
+| Phase | Channel | Event type | Meaning |
+|-------|---------|------------|---------|
+| Tool-call construction | `llm` | `llm_stream_chunk` | The model is streaming the tool name or arguments it intends to call |
+| Tool execution | `tools` | `tool_usage_started`, `tool_usage_finished`, `tool_usage_error` | CrewAI is executing the tool and reporting the result |
+
+For streamed tool-call arguments, the latest provider delta is available as `frame.data["chunk"]`. The accumulated argument string is available at `frame.data["tool_call"]["function"]["arguments"]`.
+
+```python
+with llm.stream_events("Check the weather in Paris.", tools=[weather_tool]) as stream:
+ for frame in stream.llm:
+ if frame.type != "llm_stream_chunk":
+ continue
+
+ tool_call = frame.data.get("tool_call")
+ if tool_call:
+ function = tool_call["function"]
+ print("Tool:", function["name"])
+ print("Latest argument delta:", frame.data["chunk"])
+ print("Arguments so far:", function["arguments"])
+ elif frame.content:
+ print(frame.content, end="", flush=True)
+```
+
+Use the `tools` channel when you want to display that a tool actually started, completed, or failed. Use the `llm` channel when you want to observe the model constructing the tool call before execution.
+
```mermaid
flowchart LR
A["flow
flow_started"] --> B["llm
llm_call_started"]
diff --git a/docs/edge/en/learn/consuming-streams.mdx b/docs/edge/en/learn/consuming-streams.mdx
index 6ece26f3a..c39b05972 100644
--- a/docs/edge/en/learn/consuming-streams.mdx
+++ b/docs/edge/en/learn/consuming-streams.mdx
@@ -70,6 +70,38 @@ result = stream.result
`frame.event` is the structured payload for the source event. Use it for metadata such as tool names, arguments, message roles, and runtime identifiers.
+## Print Streamed Tool Call Arguments
+
+When a model supports streamed tool calling, CrewAI emits partial tool-call arguments as `llm_stream_chunk` frames on the `llm` channel. These frames are different from tool execution events:
+
+- `llm` channel: the model is constructing the tool call.
+- `tools` channel: CrewAI is executing the tool.
+
+The current delta is stored in `frame.event["chunk"]`. The accumulated arguments are stored in `frame.event["tool_call"]["function"]["arguments"]`.
+
+```python
+with llm.stream_events("Get the weather in Paris.", tools=[weather_tool]) as stream:
+ for frame in stream.interleave(["llm", "tools"]):
+ if frame.channel == "llm" and frame.type == "llm_stream_chunk":
+ tool_call = frame.event.get("tool_call")
+ if tool_call:
+ function = tool_call["function"]
+ print(f"\nPreparing tool: {function['name']}")
+ print(f"Arguments so far: {function['arguments']}")
+ elif frame.content:
+ print(frame.content, end="", flush=True)
+
+ elif frame.channel == "tools" and frame.type == "tool_usage_started":
+ print(f"\nRunning tool: {frame.event.get('tool_name')}")
+
+ elif frame.channel == "tools" and frame.type == "tool_usage_finished":
+ print(f"\nTool finished: {frame.event.get('tool_name')}")
+
+result = stream.result
+```
+
+Some providers emit arguments in small JSON fragments. For display, prefer the accumulated argument string over the latest delta.
+
## Watch Flow Progress
Flow lifecycle and method execution frames arrive on the `flow` channel:
diff --git a/docs/edge/en/learn/streaming-crew-execution.mdx b/docs/edge/en/learn/streaming-crew-execution.mdx
index ff0a3cd7f..40cc1743a 100644
--- a/docs/edge/en/learn/streaming-crew-execution.mdx
+++ b/docs/edge/en/learn/streaming-crew-execution.mdx
@@ -240,15 +240,18 @@ for chunk in streaming:
### TOOL_CALL Chunks
-Information about tool calls being made:
+Information about tool calls being made. Depending on the provider, CrewAI may receive tool-call arguments incrementally. In that case, each `TOOL_CALL` chunk contains the latest streamed content in `chunk.content`, while `chunk.tool_call.arguments` contains the accumulated argument string so far.
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TOOL_CALL:
print(f"\nCalling tool: {chunk.tool_call.tool_name}")
- print(f"Arguments: {chunk.tool_call.arguments}")
+ print(f"Latest argument delta: {chunk.content}")
+ print(f"Arguments so far: {chunk.tool_call.arguments}")
```
+Actual tool execution is reported separately through tool usage events in the runtime event stream and through CrewAI's verbose console output. `TOOL_CALL` chunks represent the model constructing the tool request before or during execution.
+
## Practical Example: Building a UI with Streaming
Here's a complete example showing how to build an interactive application with streaming:
@@ -381,4 +384,4 @@ except Exception as e:
print("Streaming completed but an error occurred")
```
-By leveraging streaming, you can build more responsive and interactive applications with CrewAI, providing users with real-time visibility into agent execution and results.
\ No newline at end of file
+By leveraging streaming, you can build more responsive and interactive applications with CrewAI, providing users with real-time visibility into agent execution and results.
diff --git a/docs/edge/en/learn/streaming-runtime-contract.mdx b/docs/edge/en/learn/streaming-runtime-contract.mdx
index b7116d470..6434c2f8b 100644
--- a/docs/edge/en/learn/streaming-runtime-contract.mdx
+++ b/docs/edge/en/learn/streaming-runtime-contract.mdx
@@ -145,6 +145,33 @@ result = stream.result
`llm.stream_events(...)` temporarily enables streaming for the wrapped call and restores the LLM's previous `stream` setting afterward. Provider integrations continue to emit the underlying LLM stream events; this helper provides a common iterator API over those events for every LLM provider.
+### Tool Call Argument Deltas
+
+Native tool-call streaming uses `llm_stream_chunk` frames on the `llm` channel. This represents the model constructing a tool call. It is separate from the `tools` channel, which represents CrewAI executing that tool.
+
+For a streamed tool call, the frame payload includes:
+
+```python
+frame.type == "llm_stream_chunk"
+frame.channel == "llm"
+frame.event["call_type"] == "tool_call"
+frame.event["chunk"] # latest argument delta from the provider
+frame.event["tool_call"] # accumulated tool-call state
+```
+
+The `tool_call` payload follows the OpenAI-style function call shape:
+
+```python
+tool_call = frame.event["tool_call"]
+tool_call["id"]
+tool_call["index"]
+tool_call["type"] # "function"
+tool_call["function"]["name"]
+tool_call["function"]["arguments"] # accumulated JSON argument string
+```
+
+Providers differ in granularity. OpenAI and Anthropic may stream arguments as small JSON fragments, while some providers emit the complete argument object in one chunk. Consumers should treat `frame.event["chunk"]` as the latest delta and `tool_call["function"]["arguments"]` as the current accumulated state.
+
## Conversational Turns
Conversational Flows can stream one user turn with `stream_turn()`:
diff --git a/lib/crewai/src/crewai/events/event_listener.py b/lib/crewai/src/crewai/events/event_listener.py
index a1a771f44..ae7fdc029 100644
--- a/lib/crewai/src/crewai/events/event_listener.py
+++ b/lib/crewai/src/crewai/events/event_listener.py
@@ -64,6 +64,7 @@ from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
+ LLMCallType,
LLMStreamChunkEvent,
)
from crewai.events.types.llm_guardrail_events import (
@@ -455,6 +456,13 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_llm_stream_chunk(_: Any, event: LLMStreamChunkEvent) -> None:
+ if event.call_type == LLMCallType.TOOL_CALL:
+ self.formatter.handle_llm_stream_chunk(
+ event.tool_call.function.arguments if event.tool_call else "",
+ event.call_type,
+ )
+ return
+
self.text_stream.write(event.chunk)
self.text_stream.seek(self.next_chunk)
self.text_stream.read()
diff --git a/lib/crewai/src/crewai/events/utils/console_formatter.py b/lib/crewai/src/crewai/events/utils/console_formatter.py
index 604c7b051..501e29aa4 100644
--- a/lib/crewai/src/crewai/events/utils/console_formatter.py
+++ b/lib/crewai/src/crewai/events/utils/console_formatter.py
@@ -437,6 +437,8 @@ To enable tracing, do any one of these:
if not self.verbose:
return
+ self.pause_live_updates()
+
with self._tool_counts_lock:
self.tool_usage_counts[tool_name] = (
self.tool_usage_counts.get(tool_name, 0) + 1
@@ -468,6 +470,8 @@ To enable tracing, do any one of these:
if not self.verbose:
return
+ self.pause_live_updates()
+
with self._tool_counts_lock:
iteration = self.tool_usage_counts.get(tool_name, 1)
@@ -495,6 +499,8 @@ To enable tracing, do any one of these:
if not self.verbose:
return
+ self.pause_live_updates()
+
with self._tool_counts_lock:
iteration = self.tool_usage_counts.get(tool_name, 1)
@@ -543,6 +549,15 @@ To enable tracing, do any one of these:
if should_suppress_console_output():
return
+ from crewai.events.types.llm_events import LLMCallType
+
+ if call_type == LLMCallType.TOOL_CALL:
+ self.pause_live_updates()
+ self._is_streaming = False
+ self._last_stream_call_type = call_type
+ self._just_streamed_final_answer = False
+ return
+
self._is_streaming = True
self._last_stream_call_type = call_type
@@ -554,17 +569,9 @@ To enable tracing, do any one of these:
display_text = "...\n" + display_text
content = Text()
-
- from crewai.events.types.llm_events import LLMCallType
-
- if call_type == LLMCallType.TOOL_CALL:
- content.append(display_text, style="yellow")
- title = "🔧 Tool Arguments"
- border_style = "yellow"
- else:
- content.append(display_text, style="bright_green")
- title = "✅ Agent Final Answer"
- border_style = "green"
+ content.append(display_text, style="bright_green")
+ title = "✅ Agent Final Answer"
+ border_style = "green"
streaming_panel = Panel(
content,
diff --git a/lib/crewai/src/crewai/llms/base_llm.py b/lib/crewai/src/crewai/llms/base_llm.py
index d679bf670..ad846daba 100644
--- a/lib/crewai/src/crewai/llms/base_llm.py
+++ b/lib/crewai/src/crewai/llms/base_llm.py
@@ -685,6 +685,40 @@ class BaseLLM(BaseModel, ABC):
),
)
+ def _emit_tool_call_stream_chunk_event(
+ self,
+ *,
+ chunk: str,
+ tool_call_id: str | None,
+ tool_name: str | None,
+ arguments: str,
+ index: int,
+ from_task: Task | None = None,
+ from_agent: BaseAgent | None = None,
+ response_id: str | None = None,
+ ) -> None:
+ """Emit a normalized streamed tool-call chunk.
+
+ ``chunk`` is the latest provider delta while ``arguments`` is the
+ accumulated argument string for consumers that want current state.
+ """
+ self._emit_stream_chunk_event(
+ chunk=chunk,
+ from_task=from_task,
+ from_agent=from_agent,
+ tool_call={
+ "id": tool_call_id,
+ "function": {
+ "name": tool_name or "",
+ "arguments": arguments,
+ },
+ "type": "function",
+ "index": index,
+ },
+ call_type=LLMCallType.TOOL_CALL,
+ response_id=response_id,
+ )
+
def _emit_thinking_chunk_event(
self,
chunk: str,
diff --git a/lib/crewai/src/crewai/llms/providers/openai/completion.py b/lib/crewai/src/crewai/llms/providers/openai/completion.py
index 77d2bbbdd..d44d4fde8 100644
--- a/lib/crewai/src/crewai/llms/providers/openai/completion.py
+++ b/lib/crewai/src/crewai/llms/providers/openai/completion.py
@@ -1102,6 +1102,7 @@ class OpenAICompletion(BaseLLM):
"""Handle streaming Responses API call."""
full_response = ""
function_calls: list[dict[str, Any]] = []
+ streaming_function_calls: dict[str, dict[str, Any]] = {}
final_response: Response | None = None
usage: dict[str, Any] | None = None
@@ -1123,11 +1124,46 @@ class OpenAICompletion(BaseLLM):
)
elif event.type == "response.function_call_arguments.delta":
- pass
+ self._process_responses_function_call_delta(
+ event=event,
+ function_calls=streaming_function_calls,
+ from_task=from_task,
+ from_agent=from_agent,
+ response_id=response_id_stream,
+ )
+
+ elif event.type == "response.function_call_arguments.done":
+ self._process_responses_function_call_done(
+ event=event,
+ function_calls=streaming_function_calls,
+ from_task=from_task,
+ from_agent=from_agent,
+ response_id=response_id_stream,
+ )
+
+ elif event.type == "response.output_item.added":
+ item = event.item
+ if item.type == "function_call":
+ self._process_responses_function_call_started(
+ event=event,
+ function_calls=streaming_function_calls,
+ from_task=from_task,
+ from_agent=from_agent,
+ response_id=response_id_stream,
+ )
elif event.type == "response.output_item.done":
item = event.item
if item.type == "function_call":
+ call_key = self._responses_function_call_key(event)
+ if call_key in streaming_function_calls:
+ streaming_function_calls[call_key].update(
+ {
+ "id": item.call_id,
+ "name": item.name,
+ "arguments": item.arguments,
+ }
+ )
function_calls.append(
{
"id": item.call_id,
@@ -1239,6 +1275,7 @@ class OpenAICompletion(BaseLLM):
"""Handle async streaming Responses API call."""
full_response = ""
function_calls: list[dict[str, Any]] = []
+ streaming_function_calls: dict[str, dict[str, Any]] = {}
final_response: Response | None = None
usage: dict[str, Any] | None = None
@@ -1260,11 +1297,46 @@ class OpenAICompletion(BaseLLM):
)
elif event.type == "response.function_call_arguments.delta":
- pass
+ self._process_responses_function_call_delta(
+ event=event,
+ function_calls=streaming_function_calls,
+ from_task=from_task,
+ from_agent=from_agent,
+ response_id=response_id_stream,
+ )
+
+ elif event.type == "response.function_call_arguments.done":
+ self._process_responses_function_call_done(
+ event=event,
+ function_calls=streaming_function_calls,
+ from_task=from_task,
+ from_agent=from_agent,
+ response_id=response_id_stream,
+ )
+
+ elif event.type == "response.output_item.added":
+ item = event.item
+ if item.type == "function_call":
+ self._process_responses_function_call_started(
+ event=event,
+ function_calls=streaming_function_calls,
+ from_task=from_task,
+ from_agent=from_agent,
+ response_id=response_id_stream,
+ )
elif event.type == "response.output_item.done":
item = event.item
if item.type == "function_call":
+ call_key = self._responses_function_call_key(event)
+ if call_key in streaming_function_calls:
+ streaming_function_calls[call_key].update(
+ {
+ "id": item.call_id,
+ "name": item.name,
+ "arguments": item.arguments,
+ }
+ )
function_calls.append(
{
"id": item.call_id,
@@ -1363,6 +1435,135 @@ class OpenAICompletion(BaseLLM):
return full_response
+ @staticmethod
+ def _responses_function_call_key(event: Any) -> str:
+ item = getattr(event, "item", None)
+ item_id = getattr(event, "item_id", None) or getattr(item, "id", None)
+ if item_id:
+ return str(item_id)
+ output_index = getattr(event, "output_index", None)
+ if output_index is not None:
+ return f"output:{output_index}"
+ return "output:0"
+
+ def _process_responses_function_call_delta(
+ self,
+ *,
+ event: Any,
+ function_calls: dict[str, dict[str, Any]],
+ from_task: Any | None = None,
+ from_agent: Any | None = None,
+ response_id: str | None = None,
+ ) -> None:
+ call_key = self._responses_function_call_key(event)
+ output_index = getattr(event, "output_index", 0) or 0
+ item_id = getattr(event, "item_id", None)
+ delta = getattr(event, "delta", "") or ""
+ call_data = function_calls.setdefault(
+ call_key,
+ {
+ "id": item_id,
+ "name": "",
+ "arguments": "",
+ "index": output_index,
+ "streamed_arguments": False,
+ },
+ )
+ if item_id and not call_data.get("id"):
+ call_data["id"] = item_id
+ call_data["arguments"] += delta
+ call_data["streamed_arguments"] = True
+
+ self._emit_tool_call_stream_chunk_event(
+ chunk=delta,
+ tool_call_id=call_data.get("id"),
+ tool_name=call_data.get("name"),
+ arguments=call_data["arguments"],
+ index=call_data["index"],
+ from_task=from_task,
+ from_agent=from_agent,
+ response_id=response_id,
+ )
+
+ def _process_responses_function_call_started(
+ self,
+ *,
+ event: Any,
+ function_calls: dict[str, dict[str, Any]],
+ from_task: Any | None = None,
+ from_agent: Any | None = None,
+ response_id: str | None = None,
+ ) -> None:
+ call_key = self._responses_function_call_key(event)
+ output_index = getattr(event, "output_index", 0) or 0
+ item = getattr(event, "item", None)
+ item_id = getattr(item, "id", None)
+ call_data = function_calls.setdefault(
+ call_key,
+ {
+ "id": getattr(item, "call_id", None) or item_id,
+ "name": getattr(item, "name", None) or "",
+ "arguments": "",
+ "index": output_index,
+ "streamed_arguments": False,
+ },
+ )
+ call_data["id"] = call_data.get("id") or getattr(item, "call_id", None)
+ call_data["name"] = call_data.get("name") or getattr(item, "name", None) or ""
+
+ self._emit_tool_call_stream_chunk_event(
+ chunk="",
+ tool_call_id=call_data.get("id"),
+ tool_name=call_data.get("name"),
+ arguments=call_data["arguments"],
+ index=call_data["index"],
+ from_task=from_task,
+ from_agent=from_agent,
+ response_id=response_id,
+ )
+
+ def _process_responses_function_call_done(
+ self,
+ *,
+ event: Any,
+ function_calls: dict[str, dict[str, Any]],
+ from_task: Any | None = None,
+ from_agent: Any | None = None,
+ response_id: str | None = None,
+ ) -> None:
+ call_key = self._responses_function_call_key(event)
+ output_index = getattr(event, "output_index", 0) or 0
+ item_id = getattr(event, "item_id", None)
+ arguments = getattr(event, "arguments", "") or ""
+ call_data = function_calls.setdefault(
+ call_key,
+ {
+ "id": item_id,
+ "name": "",
+ "arguments": "",
+ "index": output_index,
+ "streamed_arguments": False,
+ },
+ )
+ if item_id and not call_data.get("id"):
+ call_data["id"] = item_id
+ call_data["name"] = getattr(event, "name", None) or call_data.get("name", "")
+
+ if not call_data.get("streamed_arguments") and arguments:
+ call_data["arguments"] = arguments
+ self._emit_tool_call_stream_chunk_event(
+ chunk=arguments,
+ tool_call_id=call_data.get("id"),
+ tool_name=call_data.get("name"),
+ arguments=call_data["arguments"],
+ index=call_data["index"],
+ from_task=from_task,
+ from_agent=from_agent,
+ response_id=response_id,
+ )
+ elif arguments:
+ call_data["arguments"] = arguments
+
def _extract_function_calls_from_response(
self, response: Response
) -> list[dict[str, Any]]:
diff --git a/lib/crewai/tests/llms/openai/test_openai.py b/lib/crewai/tests/llms/openai/test_openai.py
index 836abe838..0fc9ba4a3 100644
--- a/lib/crewai/tests/llms/openai/test_openai.py
+++ b/lib/crewai/tests/llms/openai/test_openai.py
@@ -7,6 +7,7 @@ import openai
import pytest
from crewai.llm import LLM
+from crewai.events.types.llm_events import LLMCallType, LLMStreamChunkEvent
from crewai.llms.providers.openai.completion import OpenAICompletion, ResponsesAPIResult
from crewai.crew import Crew
from crewai.agent import Agent
@@ -1887,6 +1888,195 @@ def test_openai_responses_api_no_detail_fields_omitted():
assert "reasoning_tokens" not in usage
+def test_openai_responses_streaming_emits_tool_call_argument_deltas():
+ llm = OpenAICompletion(model="gpt-4o", api="responses", stream=True)
+ stream_events = [
+ types.SimpleNamespace(
+ type="response.created",
+ response=types.SimpleNamespace(id="resp_123"),
+ ),
+ types.SimpleNamespace(
+ type="response.output_item.added",
+ output_index=0,
+ item=types.SimpleNamespace(
+ type="function_call",
+ id="fc_123",
+ call_id="call_123",
+ name="get_weather",
+ arguments="",
+ ),
+ ),
+ types.SimpleNamespace(
+ type="response.function_call_arguments.delta",
+ item_id="fc_123",
+ output_index=0,
+ delta='{"city"',
+ ),
+ types.SimpleNamespace(
+ type="response.function_call_arguments.delta",
+ item_id="fc_123",
+ output_index=0,
+ delta=': "Paris"}',
+ ),
+ types.SimpleNamespace(
+ type="response.function_call_arguments.done",
+ item_id="fc_123",
+ output_index=0,
+ name="get_weather",
+ arguments='{"city": "Paris"}',
+ ),
+ types.SimpleNamespace(
+ type="response.output_item.done",
+ output_index=0,
+ item=types.SimpleNamespace(
+ type="function_call",
+ id="fc_123",
+ call_id="call_123",
+ name="get_weather",
+ arguments='{"city": "Paris"}',
+ ),
+ ),
+ types.SimpleNamespace(
+ type="response.completed",
+ response=types.SimpleNamespace(
+ id="resp_123",
+ status="completed",
+ usage=None,
+ ),
+ ),
+ ]
+ fake_client = types.SimpleNamespace(
+ responses=types.SimpleNamespace(create=lambda **_: iter(stream_events))
+ )
+
+ with patch.object(llm, "_get_sync_client", return_value=fake_client):
+ with patch("crewai.events.event_bus.CrewAIEventsBus.emit") as mock_emit:
+ llm._handle_streaming_responses({"input": []})
+
+ tool_call_events = [
+ call.kwargs["event"]
+ for call in mock_emit.call_args_list
+ if isinstance(call.kwargs.get("event"), LLMStreamChunkEvent)
+ and call.kwargs["event"].call_type == LLMCallType.TOOL_CALL
+ ]
+
+ assert [event.chunk for event in tool_call_events] == [
+ "",
+ '{"city"',
+ ': "Paris"}',
+ ]
+ assert [
+ event.tool_call.function.arguments for event in tool_call_events
+ ] == ["", '{"city"', '{"city": "Paris"}']
+ assert all(event.tool_call.id == "call_123" for event in tool_call_events)
+ assert all(
+ event.tool_call.function.name == "get_weather" for event in tool_call_events
+ )
+
+
+@pytest.mark.asyncio
+async def test_openai_responses_async_streaming_emits_tool_call_argument_deltas():
+ llm = OpenAICompletion(model="gpt-4o", api="responses", stream=True)
+ stream_events = [
+ types.SimpleNamespace(
+ type="response.created",
+ response=types.SimpleNamespace(id="resp_123"),
+ ),
+ types.SimpleNamespace(
+ type="response.output_item.added",
+ output_index=0,
+ item=types.SimpleNamespace(
+ type="function_call",
+ id="fc_123",
+ call_id="call_123",
+ name="get_weather",
+ arguments="",
+ ),
+ ),
+ types.SimpleNamespace(
+ type="response.function_call_arguments.delta",
+ item_id="fc_123",
+ output_index=0,
+ delta='{"city"',
+ ),
+ types.SimpleNamespace(
+ type="response.function_call_arguments.delta",
+ item_id="fc_123",
+ output_index=0,
+ delta=': "Paris"}',
+ ),
+ types.SimpleNamespace(
+ type="response.function_call_arguments.done",
+ item_id="fc_123",
+ output_index=0,
+ name="get_weather",
+ arguments='{"city": "Paris"}',
+ ),
+ types.SimpleNamespace(
+ type="response.output_item.done",
+ output_index=0,
+ item=types.SimpleNamespace(
+ type="function_call",
+ id="fc_123",
+ call_id="call_123",
+ name="get_weather",
+ arguments='{"city": "Paris"}',
+ ),
+ ),
+ types.SimpleNamespace(
+ type="response.completed",
+ response=types.SimpleNamespace(
+ id="resp_123",
+ status="completed",
+ usage=None,
+ ),
+ ),
+ ]
+
+ class MockAsyncStream:
+ def __init__(self, events: list[Any]) -> None:
+ self._events = events
+ self._index = 0
+
+ def __aiter__(self) -> "MockAsyncStream":
+ return self
+
+ async def __anext__(self) -> Any:
+ if self._index >= len(self._events):
+ raise StopAsyncIteration
+ event = self._events[self._index]
+ self._index += 1
+ return event
+
+ fake_client = types.SimpleNamespace(
+ responses=types.SimpleNamespace(create=lambda **_: MockAsyncStream(stream_events))
+ )
+
+ with patch.object(llm, "_get_async_client", return_value=fake_client):
+ with patch("crewai.events.event_bus.CrewAIEventsBus.emit") as mock_emit:
+ await llm._ahandle_streaming_responses({"input": []})
+
+ tool_call_events = [
+ call.kwargs["event"]
+ for call in mock_emit.call_args_list
+ if isinstance(call.kwargs.get("event"), LLMStreamChunkEvent)
+ and call.kwargs["event"].call_type == LLMCallType.TOOL_CALL
+ ]
+
+ assert [event.chunk for event in tool_call_events] == [
+ "",
+ '{"city"',
+ ': "Paris"}',
+ ]
+ assert [
+ event.tool_call.function.arguments for event in tool_call_events
+ ] == ["", '{"city"', '{"city": "Paris"}']
+ assert all(event.tool_call.id == "call_123" for event in tool_call_events)
+ assert all(
+ event.tool_call.function.name == "get_weather" for event in tool_call_events
+ )
+
+
@pytest.mark.asyncio
async def test_openai_async_streaming_returns_tool_calls_without_available_functions():
"""Test that async streaming returns tool calls list when available_functions is None.
diff --git a/lib/crewai/tests/utilities/test_console_formatter_pause_resume.py b/lib/crewai/tests/utilities/test_console_formatter_pause_resume.py
index 1ffbb3850..9c9fe2e28 100644
--- a/lib/crewai/tests/utilities/test_console_formatter_pause_resume.py
+++ b/lib/crewai/tests/utilities/test_console_formatter_pause_resume.py
@@ -1,5 +1,6 @@
from unittest.mock import MagicMock, patch
from rich.live import Live
+from crewai.events.types.llm_events import LLMCallType
from crewai.events.utils.console_formatter import ConsoleFormatter
@@ -110,3 +111,38 @@ class TestConsoleFormatterPauseResume:
# Streaming again creates new session
formatter.handle_llm_stream_chunk("chunk 2", call_type=None)
assert formatter._streaming_live == mock_live_instance_2
+
+ def test_tool_call_stream_chunk_does_not_create_live_panel(self):
+ formatter = ConsoleFormatter(verbose=True)
+
+ with patch("crewai.events.utils.console_formatter.Live") as mock_live_class:
+ formatter.handle_llm_stream_chunk(
+ '{"city":"Paris"}', call_type=LLMCallType.TOOL_CALL
+ )
+
+ mock_live_class.assert_not_called()
+ assert formatter._streaming_live is None
+ assert formatter._is_streaming is False
+
+ def test_tool_call_stream_chunk_stops_existing_live_panel(self):
+ formatter = ConsoleFormatter(verbose=True)
+ mock_live = MagicMock(spec=Live)
+ formatter._streaming_live = mock_live
+
+ formatter.handle_llm_stream_chunk(
+ '{"city":"Paris"}', call_type=LLMCallType.TOOL_CALL
+ )
+
+ mock_live.stop.assert_called_once()
+ assert formatter._streaming_live is None
+
+ def test_tool_usage_started_pauses_existing_live_panel(self):
+ formatter = ConsoleFormatter(verbose=True)
+ mock_live = MagicMock(spec=Live)
+ formatter._streaming_live = mock_live
+
+ with patch.object(formatter, "print_panel"):
+ formatter.handle_tool_usage_started("get_weather", {"city": "Paris"})
+
+ mock_live.stop.assert_called_once()
+ assert formatter._streaming_live is None
diff --git a/scripts/stream_tool_call_runner.py b/scripts/stream_tool_call_runner.py
new file mode 100644
index 000000000..da2a58e02
--- /dev/null
+++ b/scripts/stream_tool_call_runner.py
@@ -0,0 +1,84 @@
+"""Real OpenAI Responses API runner for streamed tool-call deltas.
+
+Fill in ``OPENAI_API_KEY`` below, then run from the repository root:
+
+ uv run python scripts/stream_tool_call_runner.py
+"""
+
+from __future__ import annotations
+
+import os
+
+# ruff: noqa: T201
+from crewai.llms.providers.openai.completion import OpenAICompletion
+from dotenv import load_dotenv
+
+
+load_dotenv()
+
+
+MODEL = "gpt-4o-mini"
+
+
+def get_weather(city: str) -> str:
+ return f"The weather in {city} is sunny and 72F."
+
+
+def main() -> None:
+
+ llm = OpenAICompletion(
+ model=MODEL,
+ api="responses",
+ api_key=os.getenv("OPENAI_API_KEY"),
+ stream=True,
+ additional_params={
+ "tool_choice": {"type": "function", "name": "get_weather"},
+ },
+ )
+
+ tool_schema = {
+ "type": "function",
+ "function": {
+ "name": "get_weather",
+ "description": "Get the weather for a city.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "city": {
+ "type": "string",
+ "description": "The city to check.",
+ }
+ },
+ "required": ["city"],
+ "additionalProperties": False,
+ },
+ },
+ }
+
+ with llm.stream_events(
+ "Call get_weather for Paris. Do not answer directly.",
+ tools=[tool_schema],
+ available_functions={"get_weather": get_weather},
+ ) as stream:
+ for frame in stream.llm:
+ if frame.type != "llm_stream_chunk":
+ continue
+
+ tool_call = frame.data.get("tool_call")
+ if not tool_call:
+ print(f"text delta: {frame.content!r}")
+ continue
+
+ function = tool_call["function"]
+ print(
+ "tool delta: "
+ f"chunk={frame.data['chunk']!r} "
+ f"name={function['name']!r} "
+ f"arguments={function['arguments']!r}"
+ )
+
+ print(f"final result: {stream.result!r}")
+
+
+if __name__ == "__main__":
+ main()