Address streaming contract review feedback

This commit is contained in:
lorenzejay
2026-06-29 15:35:18 -07:00
parent 01c7915528
commit 7de7e32bb2
11 changed files with 201 additions and 93 deletions

View File

@@ -11,7 +11,7 @@ mode: "wide"
## كيف يعمل بث التدفق
عند تفعيل البث في تدفق، يلتقط CrewAI ويبث المخرجات من أي أطقم أو استدعاءات LLM داخل التدفق. يقدم البث أجزاء منظمة تحتوي على المحتوى وسياق المهمة ومعلومات الوكيل مع تقدم التنفيذ.
عند تفعيل البث في تدفق، يلتقط CrewAI ويبث المخرجات من أي أطقم أو استدعاءات LLM أو أدوات أو أحداث دورة حياة داخل التدفق. يقدم البث عناصر `StreamFrame` مرتبة تحتوي على محتوى قابل للطباعة وبيانات حدث مهيكلة مع تقدم التنفيذ.
## تفعيل البث
@@ -180,13 +180,14 @@ flow = MultiStepFlow()
streaming = flow.kickoff()
current_step = ""
for chunk in streaming:
for item in streaming:
# Track which flow step is executing
if chunk.task_name != current_step:
current_step = chunk.task_name
print(f"\n\n=== {chunk.task_name} ===\n")
step_name = item.event.get("method_name") or item.event.get("task_name")
if step_name and step_name != current_step:
current_step = step_name
print(f"\n\n=== {step_name} ===\n")
print(chunk.content, end="", flush=True)
print(item.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal analysis: {result}")
@@ -200,7 +201,6 @@ print(f"\n\nFinal analysis: {result}")
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType
class ResearchPipeline(Flow):
stream = True
@@ -253,33 +253,35 @@ async def run_with_dashboard():
current_agent = ""
current_task = ""
chunk_count = 0
frame_count = 0
async for chunk in streaming:
chunk_count += 1
async for item in streaming:
frame_count += 1
# Display phase transitions
if chunk.task_name != current_task:
current_task = chunk.task_name
current_agent = chunk.agent_role
task_name = item.event.get("task_name", "")
agent_role = item.event.get("agent_role", "")
if task_name and task_name != current_task:
current_task = task_name
current_agent = agent_role
print(f"\n\n📋 Phase: {current_task}")
print(f"👤 Agent: {current_agent}")
print("-" * 60)
# Display text output
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
if item.content:
print(item.content, end="", flush=True)
# Display tool usage
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\n🔧 Tool: {chunk.tool_call.tool_name}")
elif item.channel == "tools":
print(f"\n🔧 Tool event: {item.type}")
# Show completion summary
result = streaming.result
print(f"\n\n{'='*60}")
print("PIPELINE COMPLETE")
print(f"{'='*60}")
print(f"Total chunks: {chunk_count}")
print(f"Total frames: {frame_count}")
print(f"Final output length: {len(str(result))} characters")
asyncio.run(run_with_dashboard())
@@ -352,8 +354,8 @@ class StatefulStreamingFlow(Flow[AnalysisState]):
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})
for chunk in streaming:
print(chunk.content, end="", flush=True)
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal state:")
@@ -373,29 +375,29 @@ print(f"Insights length: {len(flow.state.insights)}")
- **تتبع التقدم**: إظهار المرحلة الحالية من سير العمل للمستخدمين
- **لوحات المعلومات الحية**: إنشاء واجهات مراقبة لتدفقات الإنتاج
## أنواع أجزاء البث
## قنوات إطارات البث
مثل بث الطاقم، يمكن أن تكون أجزاء التدفق من أنواع مختلفة:
ينتج بث التدفق عناصر `StreamFrame` عبر عدة قنوات:
### أجزاء TEXT
### إطارات LLM
محتوى نصي قياسي من استجابات LLM:
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
for item in streaming:
if item.channel == "llm" and item.content:
print(item.content, end="", flush=True)
```
### أجزاء TOOL_CALL
### إطارات الأدوات
معلومات حول استدعاءات الأدوات داخل التدفق:
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\nTool: {chunk.tool_call.tool_name}")
print(f"Args: {chunk.tool_call.arguments}")
for item in streaming:
if item.channel == "tools":
print(f"\nTool event: {item.type}")
print(f"Payload: {item.event}")
```
## معالجة الأخطاء
@@ -407,8 +409,8 @@ flow = ResearchFlow()
streaming = flow.kickoff()
try:
for chunk in streaming:
print(chunk.content, end="", flush=True)
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\nSuccess! Result: {result}")
@@ -453,7 +455,7 @@ finally:
- يجب التكرار عبر جميع عناصر stream قبل الوصول إلى خاصية `.result`
- يعمل البث مع كل من حالة التدفق المنظمة وغير المنظمة
- يلتقط بث التدفق المخرجات من جميع الأطقم واستدعاءات LLM في التدفق
- يتضمن كل جزء سياقاً حول الوكيل والمهمة التي ولدته
- يتضمن كل إطار سياق حدث مهيكلاً مثل القناة والنوع والنطاق والحمولة
- يضيف البث حملاً ضئيلاً لتنفيذ التدفق
## الدمج مع تصور التدفق
@@ -467,8 +469,8 @@ flow.plot("research_flow") # Creates HTML visualization
# Run with streaming
streaming = flow.kickoff()
for chunk in streaming:
print(chunk.content, end="", flush=True)
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")

View File

@@ -11,7 +11,7 @@ CrewAI Flows support streaming output, allowing you to receive real-time updates
## How Flow Streaming Works
When streaming is enabled on a Flow, CrewAI captures and streams output from any crews or LLM calls within the flow. The stream delivers structured chunks containing the content, task context, and agent information as execution progresses.
When streaming is enabled on a Flow, CrewAI captures and streams output from any crews, LLM calls, tools, and lifecycle events within the flow. The stream delivers ordered `StreamFrame` items with printable content plus structured event data as execution progresses.
## Enabling Streaming
@@ -180,13 +180,14 @@ flow = MultiStepFlow()
streaming = flow.kickoff()
current_step = ""
for chunk in streaming:
for item in streaming:
# Track which flow step is executing
if chunk.task_name != current_step:
current_step = chunk.task_name
print(f"\n\n=== {chunk.task_name} ===\n")
step_name = item.event.get("method_name") or item.event.get("task_name")
if step_name and step_name != current_step:
current_step = step_name
print(f"\n\n=== {step_name} ===\n")
print(chunk.content, end="", flush=True)
print(item.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal analysis: {result}")
@@ -200,7 +201,6 @@ Here's a complete example showing how to build a progress dashboard with streami
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType
class ResearchPipeline(Flow):
stream = True
@@ -253,33 +253,35 @@ async def run_with_dashboard():
current_agent = ""
current_task = ""
chunk_count = 0
frame_count = 0
async for chunk in streaming:
chunk_count += 1
async for item in streaming:
frame_count += 1
# Display phase transitions
if chunk.task_name != current_task:
current_task = chunk.task_name
current_agent = chunk.agent_role
task_name = item.event.get("task_name", "")
agent_role = item.event.get("agent_role", "")
if task_name and task_name != current_task:
current_task = task_name
current_agent = agent_role
print(f"\n\n📋 Phase: {current_task}")
print(f"👤 Agent: {current_agent}")
print("-" * 60)
# Display text output
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
if item.content:
print(item.content, end="", flush=True)
# Display tool usage
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\n🔧 Tool: {chunk.tool_call.tool_name}")
elif item.channel == "tools":
print(f"\n🔧 Tool event: {item.type}")
# Show completion summary
result = streaming.result
print(f"\n\n{'='*60}")
print("PIPELINE COMPLETE")
print(f"{'='*60}")
print(f"Total chunks: {chunk_count}")
print(f"Total frames: {frame_count}")
print(f"Final output length: {len(str(result))} characters")
asyncio.run(run_with_dashboard())
@@ -352,8 +354,8 @@ class StatefulStreamingFlow(Flow[AnalysisState]):
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})
for chunk in streaming:
print(chunk.content, end="", flush=True)
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal state:")
@@ -373,29 +375,29 @@ Flow streaming is particularly valuable for:
- **Progress Tracking**: Show users which stage of the workflow is currently executing
- **Live Dashboards**: Create monitoring interfaces for production flows
## Stream Chunk Types
## Stream Frame Channels
Like crew streaming, flow chunks can be of different types:
Flow streaming yields `StreamFrame` items across several channels:
### TEXT Chunks
### LLM Frames
Standard text content from LLM responses:
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
for item in streaming:
if item.channel == "llm" and item.content:
print(item.content, end="", flush=True)
```
### TOOL_CALL Chunks
### Tool Frames
Information about tool calls within the flow:
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\nTool: {chunk.tool_call.tool_name}")
print(f"Args: {chunk.tool_call.arguments}")
for item in streaming:
if item.channel == "tools":
print(f"\nTool event: {item.type}")
print(f"Payload: {item.event}")
```
## Error Handling
@@ -407,8 +409,8 @@ flow = ResearchFlow()
streaming = flow.kickoff()
try:
for chunk in streaming:
print(chunk.content, end="", flush=True)
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\nSuccess! Result: {result}")
@@ -453,7 +455,7 @@ After cancellation, `streaming.is_cancelled` and `streaming.is_completed` are bo
- You must iterate through all stream items before accessing the `.result` property
- Streaming works with both structured and unstructured flow state
- Flow streaming captures output from all crews and LLM calls in the flow
- Each chunk includes context about which agent and task generated it
- Each frame includes structured event context such as channel, type, namespace, and payload
- Streaming adds minimal overhead to flow execution
## Combining with Flow Visualization
@@ -467,8 +469,8 @@ flow.plot("research_flow") # Creates HTML visualization
# Run with streaming
streaming = flow.kickoff()
for chunk in streaming:
print(chunk.content, end="", flush=True)
for item in streaming:
print(item.content, end="", flush=True)
result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")

View File

@@ -777,9 +777,7 @@ class CrewAIEventsBus:
source: The object emitting the event
event: The event instance to emit
"""
self._register_source(source)
event.emission_sequence = get_next_emission_sequence()
self._record_event(event)
self._prepare_event(source, event)
event_type = type(event)

View File

@@ -45,6 +45,7 @@ from crewai.experimental.conversational import (
_conversational_only,
message_to_llm_dict,
)
from crewai.flow.async_feedback import HumanFeedbackPending
from crewai.flow.conversation import (
append_message as _append_conversation_message,
get_conversation_messages,
@@ -382,7 +383,6 @@ class _ConversationalMixin:
self._pending_intents = list(intents) if intents else None
self._pending_intent_llm = intent_llm
failed_event: ConversationTurnFailedEvent | None = None
try:
if "from_checkpoint" not in kickoff_kwargs:
self._reset_turn_execution_state()
@@ -407,6 +407,8 @@ class _ConversationalMixin:
and self._is_public_turn_result(result)
):
self.append_assistant_message(self._stringify_result(result))
except HumanFeedbackPending as exc:
return exc
except Exception as exc:
failed_event = ConversationTurnFailedEvent(
type="conversation_turn_failed",

View File

@@ -1950,6 +1950,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return self.stream_events(
inputs=inputs,
input_files=input_files,
from_checkpoint=from_checkpoint,
restore_from_state_id=restore_from_state_id,
)
@@ -2015,6 +2016,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return self.astream(
inputs=inputs,
input_files=input_files,
from_checkpoint=from_checkpoint,
restore_from_state_id=restore_from_state_id,
)

View File

@@ -339,20 +339,16 @@ class BaseLLM(BaseModel, ABC):
output_holder: list[StreamSession[Any]] = []
def run_llm_call() -> Any:
original_stream = self.stream
try:
self.stream = True
return self.call(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
finally:
self.stream = original_stream
streaming_llm = self.model_copy(update={"stream": True})
return streaming_llm.call(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
stream_session: StreamSession[Any] = StreamSession(
sync_iterator=create_frame_generator(state, run_llm_call, output_holder)

View File

@@ -29,6 +29,7 @@ from crewai.experimental import (
RouterConfig,
)
from crewai.flow import Flow, ChatState, listen, start
from crewai.flow.async_feedback import HumanFeedbackPending, PendingFeedbackContext
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
@@ -220,6 +221,28 @@ class TestConversationalFlow:
if frame.type == "llm_stream_chunk"
] == ["po", "ng"]
def test_stream_turn_returns_pending_feedback_without_failure_event(self) -> None:
flow = ConversationalFlow()
pending = HumanFeedbackPending(
context=PendingFeedbackContext(
flow_id="session-1",
flow_class="tests.PendingFeedbackFlow",
method_name="review",
method_output="draft",
message="Please review",
)
)
def kickoff_side_effect(*_: Any, **__: Any) -> None:
raise pending
with patch.object(flow, "kickoff", side_effect=kickoff_side_effect):
stream = flow.stream_turn("review this", session_id="session-1")
frames = list(stream.events)
assert stream.result is pending
assert [frame.type for frame in frames] == ["conversation_turn_started"]
def test_deferred_multi_turn_emits_single_flow_finished(self) -> None:
"""A deferred multi-turn session lands as one trace: exactly one
``FlowFinishedEvent`` is emitted at ``finalize_session_traces()``, not

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from typing import Any
from typing import Any, ClassVar
import pytest
@@ -59,7 +59,12 @@ class FrameFlow(Flow):
class DirectStreamingLLM(BaseLLM):
call_stream_values: ClassVar[list[bool | None]] = []
call_instance_ids: ClassVar[list[int]] = []
def call(self, messages: Any, *args: Any, **kwargs: Any) -> str:
self.call_stream_values.append(self.stream)
self.call_instance_ids.append(id(self))
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
@@ -149,6 +154,8 @@ def test_flow_streaming_returns_iterable_frame_session() -> None:
def test_direct_llm_stream_events_scope_and_restore_stream_flag() -> None:
DirectStreamingLLM.call_stream_values = []
DirectStreamingLLM.call_instance_ids = []
llm = DirectStreamingLLM(model="gpt-4o-mini", stream=False)
with llm.stream_events("hello") as stream:
@@ -158,6 +165,8 @@ def test_direct_llm_stream_events_scope_and_restore_stream_flag() -> None:
assert frames[0].event["chunk"] == "hel"
assert stream.result == "hello"
assert llm.stream is False
assert DirectStreamingLLM.call_stream_values == [True]
assert DirectStreamingLLM.call_instance_ids != [id(llm)]
@pytest.mark.asyncio

View File

@@ -11,6 +11,7 @@ from crewai import Agent, Crew, Task
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMStreamChunkEvent, ToolCall, FunctionCall
from crewai.flow.flow import Flow, start
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.types.streaming import (
AsyncStreamSession,
CrewStreamingOutput,
@@ -509,6 +510,25 @@ class TestFlowKickoffStreaming:
result = streaming.result
assert result == "flow result"
def test_streaming_kickoff_passes_checkpoint_config_to_stream_events(self) -> None:
"""stream=True preserves checkpoint config when routing to stream_events."""
class TestFlow(Flow[dict[str, Any]]):
@start()
def generate(self) -> str:
return "flow result"
flow = TestFlow()
flow.stream = True
checkpoint = CheckpointConfig()
with patch.object(flow, "stream_events", wraps=flow.stream_events) as spy:
streaming = flow.kickoff(from_checkpoint=checkpoint)
list(streaming)
assert spy.call_args.kwargs["from_checkpoint"] is checkpoint
assert streaming.result == "flow result"
class TestFlowKickoffStreamingAsync:
"""Tests for Flow(stream=True).kickoff_async() method."""
@@ -611,6 +631,29 @@ class TestFlowKickoffStreamingAsync:
result = streaming.result
assert result == "async flow result"
@pytest.mark.asyncio
async def test_streaming_kickoff_async_passes_checkpoint_config_to_astream(
self,
) -> None:
"""stream=True preserves checkpoint config when routing to astream."""
class TestFlow(Flow[dict[str, Any]]):
@start()
async def generate(self) -> str:
return "async flow result"
flow = TestFlow()
flow.stream = True
checkpoint = CheckpointConfig()
with patch.object(flow, "astream", wraps=flow.astream) as spy:
streaming = await flow.kickoff_async(from_checkpoint=checkpoint)
async for _ in streaming:
pass
assert spy.call_args.kwargs["from_checkpoint"] is checkpoint
assert streaming.result == "async flow result"
class TestStreamingEdgeCases:
"""Tests for edge cases in streaming functionality."""

View File

@@ -3,6 +3,8 @@
import pytest
from crewai import Agent, Crew, Task
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMStreamChunkEvent
from crewai.flow.flow import Flow, start
from crewai.types.streaming import AsyncStreamSession, CrewStreamingOutput, StreamSession
@@ -232,6 +234,14 @@ class TestStreamingFlowIntegration:
@start()
def execute(self) -> str:
crewai_event_bus.emit(
self,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Flow result",
call_id="call-1",
),
)
return "Flow result"
flow = SimpleFlow()
@@ -241,8 +251,10 @@ class TestStreamingFlowIntegration:
pass
assert streaming.is_completed is True
full_text = "".join(frame.content for frame in streaming.frames)
assert isinstance(full_text, str)
content_frames = [frame for frame in streaming.frames if frame.content]
full_text = "".join(frame.content for frame in content_frames)
assert full_text == "Flow result"
assert len(content_frames) == 1
assert len(streaming.frames) > 0
result = streaming.result

View File

@@ -9,6 +9,7 @@ import pytest
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import crewai_event_bus
from crewai.events.stream_context import add_stream_sink, reset_stream_sinks
class AsyncTestEvent(BaseEvent):
@@ -53,6 +54,24 @@ async def test_aemit_with_async_handlers():
assert received_events[0] == event
@pytest.mark.asyncio
async def test_aemit_publishes_to_active_stream_sinks():
published_events = []
def sink(source: object, event: BaseEvent) -> None:
published_events.append((source, event))
event = AsyncTestEvent(type="async_test")
token = add_stream_sink(sink)
try:
await crewai_event_bus.aemit("test_source", event)
finally:
reset_stream_sinks(token)
assert published_events == [("test_source", event)]
assert event.emission_sequence is not None
@pytest.mark.asyncio
async def test_multiple_async_handlers():
received_events_1 = []