Document streamed tool call arguments

This commit is contained in:
lorenzejay
2026-07-02 11:33:26 -07:00
parent 559a9c65c4
commit b6a4af584d
11 changed files with 667 additions and 16 deletions

View File

@@ -61,6 +61,35 @@ Frames are grouped into high-level channels:
The stream itself remains one ordered timeline. Channel projections let consumers focus on only part of that timeline.
### Tool Call Streaming
Tool calls appear in two places because there are two distinct phases:
| Phase | Channel | Event type | Meaning |
|-------|---------|------------|---------|
| Tool-call construction | `llm` | `llm_stream_chunk` | The model is streaming the tool name or arguments it intends to call |
| Tool execution | `tools` | `tool_usage_started`, `tool_usage_finished`, `tool_usage_error` | CrewAI is executing the tool and reporting the result |
For streamed tool-call arguments, the latest provider delta is available as `frame.data["chunk"]`. The accumulated argument string is available at `frame.data["tool_call"]["function"]["arguments"]`.
```python
with llm.stream_events("Check the weather in Paris.", tools=[weather_tool]) as stream:
for frame in stream.llm:
if frame.type != "llm_stream_chunk":
continue
tool_call = frame.data.get("tool_call")
if tool_call:
function = tool_call["function"]
print("Tool:", function["name"])
print("Latest argument delta:", frame.data["chunk"])
print("Arguments so far:", function["arguments"])
elif frame.content:
print(frame.content, end="", flush=True)
```
Use the `tools` channel when you want to display that a tool actually started, completed, or failed. Use the `llm` channel when you want to observe the model constructing the tool call before execution.
```mermaid
flowchart LR
A["flow<br/>flow_started"] --> B["llm<br/>llm_call_started"]

View File

@@ -70,6 +70,38 @@ result = stream.result
`frame.event` is the structured payload for the source event. Use it for metadata such as tool names, arguments, message roles, and runtime identifiers.
## Print Streamed Tool Call Arguments
When a model supports streamed tool calling, CrewAI emits partial tool-call arguments as `llm_stream_chunk` frames on the `llm` channel. These frames are different from tool execution events:
- `llm` channel: the model is constructing the tool call.
- `tools` channel: CrewAI is executing the tool.
The current delta is stored in `frame.event["chunk"]`. The accumulated arguments are stored in `frame.event["tool_call"]["function"]["arguments"]`.
```python
with llm.stream_events("Get the weather in Paris.", tools=[weather_tool]) as stream:
for frame in stream.interleave(["llm", "tools"]):
if frame.channel == "llm" and frame.type == "llm_stream_chunk":
tool_call = frame.event.get("tool_call")
if tool_call:
function = tool_call["function"]
print(f"\nPreparing tool: {function['name']}")
print(f"Arguments so far: {function['arguments']}")
elif frame.content:
print(frame.content, end="", flush=True)
elif frame.channel == "tools" and frame.type == "tool_usage_started":
print(f"\nRunning tool: {frame.event.get('tool_name')}")
elif frame.channel == "tools" and frame.type == "tool_usage_finished":
print(f"\nTool finished: {frame.event.get('tool_name')}")
result = stream.result
```
Some providers emit arguments in small JSON fragments. For display, prefer the accumulated argument string over the latest delta.
## Watch Flow Progress
Flow lifecycle and method execution frames arrive on the `flow` channel:

View File

@@ -240,15 +240,18 @@ for chunk in streaming:
### TOOL_CALL Chunks
Information about tool calls being made:
Information about tool calls being made. Depending on the provider, CrewAI may receive tool-call arguments incrementally. In that case, each `TOOL_CALL` chunk contains the latest streamed content in `chunk.content`, while `chunk.tool_call.arguments` contains the accumulated argument string so far.
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TOOL_CALL:
print(f"\nCalling tool: {chunk.tool_call.tool_name}")
print(f"Arguments: {chunk.tool_call.arguments}")
print(f"Latest argument delta: {chunk.content}")
print(f"Arguments so far: {chunk.tool_call.arguments}")
```
Actual tool execution is reported separately through tool usage events in the runtime event stream and through CrewAI's verbose console output. `TOOL_CALL` chunks represent the model constructing the tool request before or during execution.
## Practical Example: Building a UI with Streaming
Here's a complete example showing how to build an interactive application with streaming:
@@ -381,4 +384,4 @@ except Exception as e:
print("Streaming completed but an error occurred")
```
By leveraging streaming, you can build more responsive and interactive applications with CrewAI, providing users with real-time visibility into agent execution and results.
By leveraging streaming, you can build more responsive and interactive applications with CrewAI, providing users with real-time visibility into agent execution and results.

View File

@@ -145,6 +145,33 @@ result = stream.result
`llm.stream_events(...)` temporarily enables streaming for the wrapped call and restores the LLM's previous `stream` setting afterward. Provider integrations continue to emit the underlying LLM stream events; this helper provides a common iterator API over those events for every LLM provider.
### Tool Call Argument Deltas
Native tool-call streaming uses `llm_stream_chunk` frames on the `llm` channel. This represents the model constructing a tool call. It is separate from the `tools` channel, which represents CrewAI executing that tool.
For a streamed tool call, the frame payload includes:
```python
frame.type == "llm_stream_chunk"
frame.channel == "llm"
frame.event["call_type"] == "tool_call"
frame.event["chunk"] # latest argument delta from the provider
frame.event["tool_call"] # accumulated tool-call state
```
The `tool_call` payload follows the OpenAI-style function call shape:
```python
tool_call = frame.event["tool_call"]
tool_call["id"]
tool_call["index"]
tool_call["type"] # "function"
tool_call["function"]["name"]
tool_call["function"]["arguments"] # accumulated JSON argument string
```
Providers differ in granularity. OpenAI and Anthropic may stream arguments as small JSON fragments, while some providers emit the complete argument object in one chunk. Consumers should treat `frame.event["chunk"]` as the latest delta and `tool_call["function"]["arguments"]` as the current accumulated state.
## Conversational Turns
Conversational Flows can stream one user turn with `stream_turn()`:

View File

@@ -64,6 +64,7 @@ from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMCallType,
LLMStreamChunkEvent,
)
from crewai.events.types.llm_guardrail_events import (
@@ -455,6 +456,13 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_llm_stream_chunk(_: Any, event: LLMStreamChunkEvent) -> None:
if event.call_type == LLMCallType.TOOL_CALL:
self.formatter.handle_llm_stream_chunk(
event.tool_call.function.arguments if event.tool_call else "",
event.call_type,
)
return
self.text_stream.write(event.chunk)
self.text_stream.seek(self.next_chunk)
self.text_stream.read()

View File

@@ -437,6 +437,8 @@ To enable tracing, do any one of these:
if not self.verbose:
return
self.pause_live_updates()
with self._tool_counts_lock:
self.tool_usage_counts[tool_name] = (
self.tool_usage_counts.get(tool_name, 0) + 1
@@ -468,6 +470,8 @@ To enable tracing, do any one of these:
if not self.verbose:
return
self.pause_live_updates()
with self._tool_counts_lock:
iteration = self.tool_usage_counts.get(tool_name, 1)
@@ -495,6 +499,8 @@ To enable tracing, do any one of these:
if not self.verbose:
return
self.pause_live_updates()
with self._tool_counts_lock:
iteration = self.tool_usage_counts.get(tool_name, 1)
@@ -543,6 +549,15 @@ To enable tracing, do any one of these:
if should_suppress_console_output():
return
from crewai.events.types.llm_events import LLMCallType
if call_type == LLMCallType.TOOL_CALL:
self.pause_live_updates()
self._is_streaming = False
self._last_stream_call_type = call_type
self._just_streamed_final_answer = False
return
self._is_streaming = True
self._last_stream_call_type = call_type
@@ -554,17 +569,9 @@ To enable tracing, do any one of these:
display_text = "...\n" + display_text
content = Text()
from crewai.events.types.llm_events import LLMCallType
if call_type == LLMCallType.TOOL_CALL:
content.append(display_text, style="yellow")
title = "🔧 Tool Arguments"
border_style = "yellow"
else:
content.append(display_text, style="bright_green")
title = "✅ Agent Final Answer"
border_style = "green"
content.append(display_text, style="bright_green")
title = "✅ Agent Final Answer"
border_style = "green"
streaming_panel = Panel(
content,

View File

@@ -685,6 +685,40 @@ class BaseLLM(BaseModel, ABC):
),
)
def _emit_tool_call_stream_chunk_event(
self,
*,
chunk: str,
tool_call_id: str | None,
tool_name: str | None,
arguments: str,
index: int,
from_task: Task | None = None,
from_agent: BaseAgent | None = None,
response_id: str | None = None,
) -> None:
"""Emit a normalized streamed tool-call chunk.
``chunk`` is the latest provider delta while ``arguments`` is the
accumulated argument string for consumers that want current state.
"""
self._emit_stream_chunk_event(
chunk=chunk,
from_task=from_task,
from_agent=from_agent,
tool_call={
"id": tool_call_id,
"function": {
"name": tool_name or "",
"arguments": arguments,
},
"type": "function",
"index": index,
},
call_type=LLMCallType.TOOL_CALL,
response_id=response_id,
)
def _emit_thinking_chunk_event(
self,
chunk: str,

View File

@@ -1102,6 +1102,7 @@ class OpenAICompletion(BaseLLM):
"""Handle streaming Responses API call."""
full_response = ""
function_calls: list[dict[str, Any]] = []
streaming_function_calls: dict[str, dict[str, Any]] = {}
final_response: Response | None = None
usage: dict[str, Any] | None = None
@@ -1123,11 +1124,46 @@ class OpenAICompletion(BaseLLM):
)
elif event.type == "response.function_call_arguments.delta":
pass
self._process_responses_function_call_delta(
event=event,
function_calls=streaming_function_calls,
from_task=from_task,
from_agent=from_agent,
response_id=response_id_stream,
)
elif event.type == "response.function_call_arguments.done":
self._process_responses_function_call_done(
event=event,
function_calls=streaming_function_calls,
from_task=from_task,
from_agent=from_agent,
response_id=response_id_stream,
)
elif event.type == "response.output_item.added":
item = event.item
if item.type == "function_call":
self._process_responses_function_call_started(
event=event,
function_calls=streaming_function_calls,
from_task=from_task,
from_agent=from_agent,
response_id=response_id_stream,
)
elif event.type == "response.output_item.done":
item = event.item
if item.type == "function_call":
call_key = self._responses_function_call_key(event)
if call_key in streaming_function_calls:
streaming_function_calls[call_key].update(
{
"id": item.call_id,
"name": item.name,
"arguments": item.arguments,
}
)
function_calls.append(
{
"id": item.call_id,
@@ -1239,6 +1275,7 @@ class OpenAICompletion(BaseLLM):
"""Handle async streaming Responses API call."""
full_response = ""
function_calls: list[dict[str, Any]] = []
streaming_function_calls: dict[str, dict[str, Any]] = {}
final_response: Response | None = None
usage: dict[str, Any] | None = None
@@ -1260,11 +1297,46 @@ class OpenAICompletion(BaseLLM):
)
elif event.type == "response.function_call_arguments.delta":
pass
self._process_responses_function_call_delta(
event=event,
function_calls=streaming_function_calls,
from_task=from_task,
from_agent=from_agent,
response_id=response_id_stream,
)
elif event.type == "response.function_call_arguments.done":
self._process_responses_function_call_done(
event=event,
function_calls=streaming_function_calls,
from_task=from_task,
from_agent=from_agent,
response_id=response_id_stream,
)
elif event.type == "response.output_item.added":
item = event.item
if item.type == "function_call":
self._process_responses_function_call_started(
event=event,
function_calls=streaming_function_calls,
from_task=from_task,
from_agent=from_agent,
response_id=response_id_stream,
)
elif event.type == "response.output_item.done":
item = event.item
if item.type == "function_call":
call_key = self._responses_function_call_key(event)
if call_key in streaming_function_calls:
streaming_function_calls[call_key].update(
{
"id": item.call_id,
"name": item.name,
"arguments": item.arguments,
}
)
function_calls.append(
{
"id": item.call_id,
@@ -1363,6 +1435,135 @@ class OpenAICompletion(BaseLLM):
return full_response
@staticmethod
def _responses_function_call_key(event: Any) -> str:
item = getattr(event, "item", None)
item_id = getattr(event, "item_id", None) or getattr(item, "id", None)
if item_id:
return str(item_id)
output_index = getattr(event, "output_index", None)
if output_index is not None:
return f"output:{output_index}"
return "output:0"
def _process_responses_function_call_delta(
self,
*,
event: Any,
function_calls: dict[str, dict[str, Any]],
from_task: Any | None = None,
from_agent: Any | None = None,
response_id: str | None = None,
) -> None:
call_key = self._responses_function_call_key(event)
output_index = getattr(event, "output_index", 0) or 0
item_id = getattr(event, "item_id", None)
delta = getattr(event, "delta", "") or ""
call_data = function_calls.setdefault(
call_key,
{
"id": item_id,
"name": "",
"arguments": "",
"index": output_index,
"streamed_arguments": False,
},
)
if item_id and not call_data.get("id"):
call_data["id"] = item_id
call_data["arguments"] += delta
call_data["streamed_arguments"] = True
self._emit_tool_call_stream_chunk_event(
chunk=delta,
tool_call_id=call_data.get("id"),
tool_name=call_data.get("name"),
arguments=call_data["arguments"],
index=call_data["index"],
from_task=from_task,
from_agent=from_agent,
response_id=response_id,
)
def _process_responses_function_call_started(
self,
*,
event: Any,
function_calls: dict[str, dict[str, Any]],
from_task: Any | None = None,
from_agent: Any | None = None,
response_id: str | None = None,
) -> None:
call_key = self._responses_function_call_key(event)
output_index = getattr(event, "output_index", 0) or 0
item = getattr(event, "item", None)
item_id = getattr(item, "id", None)
call_data = function_calls.setdefault(
call_key,
{
"id": getattr(item, "call_id", None) or item_id,
"name": getattr(item, "name", None) or "",
"arguments": "",
"index": output_index,
"streamed_arguments": False,
},
)
call_data["id"] = call_data.get("id") or getattr(item, "call_id", None)
call_data["name"] = call_data.get("name") or getattr(item, "name", None) or ""
self._emit_tool_call_stream_chunk_event(
chunk="",
tool_call_id=call_data.get("id"),
tool_name=call_data.get("name"),
arguments=call_data["arguments"],
index=call_data["index"],
from_task=from_task,
from_agent=from_agent,
response_id=response_id,
)
def _process_responses_function_call_done(
self,
*,
event: Any,
function_calls: dict[str, dict[str, Any]],
from_task: Any | None = None,
from_agent: Any | None = None,
response_id: str | None = None,
) -> None:
call_key = self._responses_function_call_key(event)
output_index = getattr(event, "output_index", 0) or 0
item_id = getattr(event, "item_id", None)
arguments = getattr(event, "arguments", "") or ""
call_data = function_calls.setdefault(
call_key,
{
"id": item_id,
"name": "",
"arguments": "",
"index": output_index,
"streamed_arguments": False,
},
)
if item_id and not call_data.get("id"):
call_data["id"] = item_id
call_data["name"] = getattr(event, "name", None) or call_data.get("name", "")
if not call_data.get("streamed_arguments") and arguments:
call_data["arguments"] = arguments
self._emit_tool_call_stream_chunk_event(
chunk=arguments,
tool_call_id=call_data.get("id"),
tool_name=call_data.get("name"),
arguments=call_data["arguments"],
index=call_data["index"],
from_task=from_task,
from_agent=from_agent,
response_id=response_id,
)
elif arguments:
call_data["arguments"] = arguments
def _extract_function_calls_from_response(
self, response: Response
) -> list[dict[str, Any]]:

View File

@@ -7,6 +7,7 @@ import openai
import pytest
from crewai.llm import LLM
from crewai.events.types.llm_events import LLMCallType, LLMStreamChunkEvent
from crewai.llms.providers.openai.completion import OpenAICompletion, ResponsesAPIResult
from crewai.crew import Crew
from crewai.agent import Agent
@@ -1887,6 +1888,195 @@ def test_openai_responses_api_no_detail_fields_omitted():
assert "reasoning_tokens" not in usage
def test_openai_responses_streaming_emits_tool_call_argument_deltas():
llm = OpenAICompletion(model="gpt-4o", api="responses", stream=True)
stream_events = [
types.SimpleNamespace(
type="response.created",
response=types.SimpleNamespace(id="resp_123"),
),
types.SimpleNamespace(
type="response.output_item.added",
output_index=0,
item=types.SimpleNamespace(
type="function_call",
id="fc_123",
call_id="call_123",
name="get_weather",
arguments="",
),
),
types.SimpleNamespace(
type="response.function_call_arguments.delta",
item_id="fc_123",
output_index=0,
delta='{"city"',
),
types.SimpleNamespace(
type="response.function_call_arguments.delta",
item_id="fc_123",
output_index=0,
delta=': "Paris"}',
),
types.SimpleNamespace(
type="response.function_call_arguments.done",
item_id="fc_123",
output_index=0,
name="get_weather",
arguments='{"city": "Paris"}',
),
types.SimpleNamespace(
type="response.output_item.done",
output_index=0,
item=types.SimpleNamespace(
type="function_call",
id="fc_123",
call_id="call_123",
name="get_weather",
arguments='{"city": "Paris"}',
),
),
types.SimpleNamespace(
type="response.completed",
response=types.SimpleNamespace(
id="resp_123",
status="completed",
usage=None,
),
),
]
fake_client = types.SimpleNamespace(
responses=types.SimpleNamespace(create=lambda **_: iter(stream_events))
)
with patch.object(llm, "_get_sync_client", return_value=fake_client):
with patch("crewai.events.event_bus.CrewAIEventsBus.emit") as mock_emit:
llm._handle_streaming_responses({"input": []})
tool_call_events = [
call.kwargs["event"]
for call in mock_emit.call_args_list
if isinstance(call.kwargs.get("event"), LLMStreamChunkEvent)
and call.kwargs["event"].call_type == LLMCallType.TOOL_CALL
]
assert [event.chunk for event in tool_call_events] == [
"",
'{"city"',
': "Paris"}',
]
assert [
event.tool_call.function.arguments for event in tool_call_events
] == ["", '{"city"', '{"city": "Paris"}']
assert all(event.tool_call.id == "call_123" for event in tool_call_events)
assert all(
event.tool_call.function.name == "get_weather" for event in tool_call_events
)
@pytest.mark.asyncio
async def test_openai_responses_async_streaming_emits_tool_call_argument_deltas():
llm = OpenAICompletion(model="gpt-4o", api="responses", stream=True)
stream_events = [
types.SimpleNamespace(
type="response.created",
response=types.SimpleNamespace(id="resp_123"),
),
types.SimpleNamespace(
type="response.output_item.added",
output_index=0,
item=types.SimpleNamespace(
type="function_call",
id="fc_123",
call_id="call_123",
name="get_weather",
arguments="",
),
),
types.SimpleNamespace(
type="response.function_call_arguments.delta",
item_id="fc_123",
output_index=0,
delta='{"city"',
),
types.SimpleNamespace(
type="response.function_call_arguments.delta",
item_id="fc_123",
output_index=0,
delta=': "Paris"}',
),
types.SimpleNamespace(
type="response.function_call_arguments.done",
item_id="fc_123",
output_index=0,
name="get_weather",
arguments='{"city": "Paris"}',
),
types.SimpleNamespace(
type="response.output_item.done",
output_index=0,
item=types.SimpleNamespace(
type="function_call",
id="fc_123",
call_id="call_123",
name="get_weather",
arguments='{"city": "Paris"}',
),
),
types.SimpleNamespace(
type="response.completed",
response=types.SimpleNamespace(
id="resp_123",
status="completed",
usage=None,
),
),
]
class MockAsyncStream:
def __init__(self, events: list[Any]) -> None:
self._events = events
self._index = 0
def __aiter__(self) -> "MockAsyncStream":
return self
async def __anext__(self) -> Any:
if self._index >= len(self._events):
raise StopAsyncIteration
event = self._events[self._index]
self._index += 1
return event
fake_client = types.SimpleNamespace(
responses=types.SimpleNamespace(create=lambda **_: MockAsyncStream(stream_events))
)
with patch.object(llm, "_get_async_client", return_value=fake_client):
with patch("crewai.events.event_bus.CrewAIEventsBus.emit") as mock_emit:
await llm._ahandle_streaming_responses({"input": []})
tool_call_events = [
call.kwargs["event"]
for call in mock_emit.call_args_list
if isinstance(call.kwargs.get("event"), LLMStreamChunkEvent)
and call.kwargs["event"].call_type == LLMCallType.TOOL_CALL
]
assert [event.chunk for event in tool_call_events] == [
"",
'{"city"',
': "Paris"}',
]
assert [
event.tool_call.function.arguments for event in tool_call_events
] == ["", '{"city"', '{"city": "Paris"}']
assert all(event.tool_call.id == "call_123" for event in tool_call_events)
assert all(
event.tool_call.function.name == "get_weather" for event in tool_call_events
)
@pytest.mark.asyncio
async def test_openai_async_streaming_returns_tool_calls_without_available_functions():
"""Test that async streaming returns tool calls list when available_functions is None.

View File

@@ -1,5 +1,6 @@
from unittest.mock import MagicMock, patch
from rich.live import Live
from crewai.events.types.llm_events import LLMCallType
from crewai.events.utils.console_formatter import ConsoleFormatter
@@ -110,3 +111,38 @@ class TestConsoleFormatterPauseResume:
# Streaming again creates new session
formatter.handle_llm_stream_chunk("chunk 2", call_type=None)
assert formatter._streaming_live == mock_live_instance_2
def test_tool_call_stream_chunk_does_not_create_live_panel(self):
formatter = ConsoleFormatter(verbose=True)
with patch("crewai.events.utils.console_formatter.Live") as mock_live_class:
formatter.handle_llm_stream_chunk(
'{"city":"Paris"}', call_type=LLMCallType.TOOL_CALL
)
mock_live_class.assert_not_called()
assert formatter._streaming_live is None
assert formatter._is_streaming is False
def test_tool_call_stream_chunk_stops_existing_live_panel(self):
formatter = ConsoleFormatter(verbose=True)
mock_live = MagicMock(spec=Live)
formatter._streaming_live = mock_live
formatter.handle_llm_stream_chunk(
'{"city":"Paris"}', call_type=LLMCallType.TOOL_CALL
)
mock_live.stop.assert_called_once()
assert formatter._streaming_live is None
def test_tool_usage_started_pauses_existing_live_panel(self):
formatter = ConsoleFormatter(verbose=True)
mock_live = MagicMock(spec=Live)
formatter._streaming_live = mock_live
with patch.object(formatter, "print_panel"):
formatter.handle_tool_usage_started("get_weather", {"city": "Paris"})
mock_live.stop.assert_called_once()
assert formatter._streaming_live is None

View File

@@ -0,0 +1,84 @@
"""Real OpenAI Responses API runner for streamed tool-call deltas.
Fill in ``OPENAI_API_KEY`` below, then run from the repository root:
uv run python scripts/stream_tool_call_runner.py
"""
from __future__ import annotations
import os
# ruff: noqa: T201
from crewai.llms.providers.openai.completion import OpenAICompletion
from dotenv import load_dotenv
load_dotenv()
MODEL = "gpt-4o-mini"
def get_weather(city: str) -> str:
return f"The weather in {city} is sunny and 72F."
def main() -> None:
llm = OpenAICompletion(
model=MODEL,
api="responses",
api_key=os.getenv("OPENAI_API_KEY"),
stream=True,
additional_params={
"tool_choice": {"type": "function", "name": "get_weather"},
},
)
tool_schema = {
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the weather for a city.",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "The city to check.",
}
},
"required": ["city"],
"additionalProperties": False,
},
},
}
with llm.stream_events(
"Call get_weather for Paris. Do not answer directly.",
tools=[tool_schema],
available_functions={"get_weather": get_weather},
) as stream:
for frame in stream.llm:
if frame.type != "llm_stream_chunk":
continue
tool_call = frame.data.get("tool_call")
if not tool_call:
print(f"text delta: {frame.content!r}")
continue
function = tool_call["function"]
print(
"tool delta: "
f"chunk={frame.data['chunk']!r} "
f"name={function['name']!r} "
f"arguments={function['arguments']!r}"
)
print(f"final result: {stream.result!r}")
if __name__ == "__main__":
main()