From 7de7e32bb2f704b05280f425b594f20bc1e0bba3 Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Mon, 29 Jun 2026 15:35:18 -0700 Subject: [PATCH] Address streaming contract review feedback --- .../ar/learn/streaming-flow-execution.mdx | 74 ++++++++++--------- .../en/learn/streaming-flow-execution.mdx | 74 ++++++++++--------- lib/crewai/src/crewai/events/event_bus.py | 4 +- .../experimental/conversational_mixin.py | 4 +- .../src/crewai/flow/runtime/__init__.py | 2 + lib/crewai/src/crewai/llms/base_llm.py | 24 +++--- lib/crewai/tests/test_flow_conversation.py | 23 ++++++ lib/crewai/tests/test_stream_frames.py | 11 ++- lib/crewai/tests/test_streaming.py | 43 +++++++++++ .../tests/test_streaming_integration.py | 16 +++- .../utilities/events/test_async_event_bus.py | 19 +++++ 11 files changed, 201 insertions(+), 93 deletions(-) diff --git a/docs/edge/ar/learn/streaming-flow-execution.mdx b/docs/edge/ar/learn/streaming-flow-execution.mdx index 652eee7ce..4341ec932 100644 --- a/docs/edge/ar/learn/streaming-flow-execution.mdx +++ b/docs/edge/ar/learn/streaming-flow-execution.mdx @@ -11,7 +11,7 @@ mode: "wide" ## كيف يعمل بث التدفق -عند تفعيل البث في تدفق، يلتقط CrewAI ويبث المخرجات من أي أطقم أو استدعاءات LLM داخل التدفق. يقدم البث أجزاء منظمة تحتوي على المحتوى وسياق المهمة ومعلومات الوكيل مع تقدم التنفيذ. +عند تفعيل البث في تدفق، يلتقط CrewAI ويبث المخرجات من أي أطقم أو استدعاءات LLM أو أدوات أو أحداث دورة حياة داخل التدفق. يقدم البث عناصر `StreamFrame` مرتبة تحتوي على محتوى قابل للطباعة وبيانات حدث مهيكلة مع تقدم التنفيذ. ## تفعيل البث @@ -180,13 +180,14 @@ flow = MultiStepFlow() streaming = flow.kickoff() current_step = "" -for chunk in streaming: +for item in streaming: # Track which flow step is executing - if chunk.task_name != current_step: - current_step = chunk.task_name - print(f"\n\n=== {chunk.task_name} ===\n") + step_name = item.event.get("method_name") or item.event.get("task_name") + if step_name and step_name != current_step: + current_step = step_name + print(f"\n\n=== {step_name} ===\n") - print(chunk.content, end="", flush=True) + print(item.content, end="", flush=True) result = streaming.result print(f"\n\nFinal analysis: {result}") @@ -200,7 +201,6 @@ print(f"\n\nFinal analysis: {result}") import asyncio from crewai.flow.flow import Flow, listen, start from crewai import Agent, Crew, Task -from crewai.types.streaming import StreamChunkType class ResearchPipeline(Flow): stream = True @@ -253,33 +253,35 @@ async def run_with_dashboard(): current_agent = "" current_task = "" - chunk_count = 0 + frame_count = 0 - async for chunk in streaming: - chunk_count += 1 + async for item in streaming: + frame_count += 1 # Display phase transitions - if chunk.task_name != current_task: - current_task = chunk.task_name - current_agent = chunk.agent_role + task_name = item.event.get("task_name", "") + agent_role = item.event.get("agent_role", "") + if task_name and task_name != current_task: + current_task = task_name + current_agent = agent_role print(f"\n\n📋 Phase: {current_task}") print(f"👤 Agent: {current_agent}") print("-" * 60) # Display text output - if chunk.chunk_type == StreamChunkType.TEXT: - print(chunk.content, end="", flush=True) + if item.content: + print(item.content, end="", flush=True) # Display tool usage - elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call: - print(f"\n🔧 Tool: {chunk.tool_call.tool_name}") + elif item.channel == "tools": + print(f"\n🔧 Tool event: {item.type}") # Show completion summary result = streaming.result print(f"\n\n{'='*60}") print("PIPELINE COMPLETE") print(f"{'='*60}") - print(f"Total chunks: {chunk_count}") + print(f"Total frames: {frame_count}") print(f"Final output length: {len(str(result))} characters") asyncio.run(run_with_dashboard()) @@ -352,8 +354,8 @@ class StatefulStreamingFlow(Flow[AnalysisState]): flow = StatefulStreamingFlow() streaming = flow.kickoff(inputs={"topic": "quantum computing"}) -for chunk in streaming: - print(chunk.content, end="", flush=True) +for item in streaming: + print(item.content, end="", flush=True) result = streaming.result print(f"\n\nFinal state:") @@ -373,29 +375,29 @@ print(f"Insights length: {len(flow.state.insights)}") - **تتبع التقدم**: إظهار المرحلة الحالية من سير العمل للمستخدمين - **لوحات المعلومات الحية**: إنشاء واجهات مراقبة لتدفقات الإنتاج -## أنواع أجزاء البث +## قنوات إطارات البث -مثل بث الطاقم، يمكن أن تكون أجزاء التدفق من أنواع مختلفة: +ينتج بث التدفق عناصر `StreamFrame` عبر عدة قنوات: -### أجزاء TEXT +### إطارات LLM محتوى نصي قياسي من استجابات LLM: ```python Code -for chunk in streaming: - if chunk.chunk_type == StreamChunkType.TEXT: - print(chunk.content, end="", flush=True) +for item in streaming: + if item.channel == "llm" and item.content: + print(item.content, end="", flush=True) ``` -### أجزاء TOOL_CALL +### إطارات الأدوات معلومات حول استدعاءات الأدوات داخل التدفق: ```python Code -for chunk in streaming: - if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call: - print(f"\nTool: {chunk.tool_call.tool_name}") - print(f"Args: {chunk.tool_call.arguments}") +for item in streaming: + if item.channel == "tools": + print(f"\nTool event: {item.type}") + print(f"Payload: {item.event}") ``` ## معالجة الأخطاء @@ -407,8 +409,8 @@ flow = ResearchFlow() streaming = flow.kickoff() try: - for chunk in streaming: - print(chunk.content, end="", flush=True) + for item in streaming: + print(item.content, end="", flush=True) result = streaming.result print(f"\nSuccess! Result: {result}") @@ -453,7 +455,7 @@ finally: - يجب التكرار عبر جميع عناصر stream قبل الوصول إلى خاصية `.result` - يعمل البث مع كل من حالة التدفق المنظمة وغير المنظمة - يلتقط بث التدفق المخرجات من جميع الأطقم واستدعاءات LLM في التدفق -- يتضمن كل جزء سياقاً حول الوكيل والمهمة التي ولدته +- يتضمن كل إطار سياق حدث مهيكلاً مثل القناة والنوع والنطاق والحمولة - يضيف البث حملاً ضئيلاً لتنفيذ التدفق ## الدمج مع تصور التدفق @@ -467,8 +469,8 @@ flow.plot("research_flow") # Creates HTML visualization # Run with streaming streaming = flow.kickoff() -for chunk in streaming: - print(chunk.content, end="", flush=True) +for item in streaming: + print(item.content, end="", flush=True) result = streaming.result print(f"\nFlow complete! View structure at: research_flow.html") diff --git a/docs/edge/en/learn/streaming-flow-execution.mdx b/docs/edge/en/learn/streaming-flow-execution.mdx index 429d794c4..e0c5ae2cc 100644 --- a/docs/edge/en/learn/streaming-flow-execution.mdx +++ b/docs/edge/en/learn/streaming-flow-execution.mdx @@ -11,7 +11,7 @@ CrewAI Flows support streaming output, allowing you to receive real-time updates ## How Flow Streaming Works -When streaming is enabled on a Flow, CrewAI captures and streams output from any crews or LLM calls within the flow. The stream delivers structured chunks containing the content, task context, and agent information as execution progresses. +When streaming is enabled on a Flow, CrewAI captures and streams output from any crews, LLM calls, tools, and lifecycle events within the flow. The stream delivers ordered `StreamFrame` items with printable content plus structured event data as execution progresses. ## Enabling Streaming @@ -180,13 +180,14 @@ flow = MultiStepFlow() streaming = flow.kickoff() current_step = "" -for chunk in streaming: +for item in streaming: # Track which flow step is executing - if chunk.task_name != current_step: - current_step = chunk.task_name - print(f"\n\n=== {chunk.task_name} ===\n") + step_name = item.event.get("method_name") or item.event.get("task_name") + if step_name and step_name != current_step: + current_step = step_name + print(f"\n\n=== {step_name} ===\n") - print(chunk.content, end="", flush=True) + print(item.content, end="", flush=True) result = streaming.result print(f"\n\nFinal analysis: {result}") @@ -200,7 +201,6 @@ Here's a complete example showing how to build a progress dashboard with streami import asyncio from crewai.flow.flow import Flow, listen, start from crewai import Agent, Crew, Task -from crewai.types.streaming import StreamChunkType class ResearchPipeline(Flow): stream = True @@ -253,33 +253,35 @@ async def run_with_dashboard(): current_agent = "" current_task = "" - chunk_count = 0 + frame_count = 0 - async for chunk in streaming: - chunk_count += 1 + async for item in streaming: + frame_count += 1 # Display phase transitions - if chunk.task_name != current_task: - current_task = chunk.task_name - current_agent = chunk.agent_role + task_name = item.event.get("task_name", "") + agent_role = item.event.get("agent_role", "") + if task_name and task_name != current_task: + current_task = task_name + current_agent = agent_role print(f"\n\n📋 Phase: {current_task}") print(f"👤 Agent: {current_agent}") print("-" * 60) # Display text output - if chunk.chunk_type == StreamChunkType.TEXT: - print(chunk.content, end="", flush=True) + if item.content: + print(item.content, end="", flush=True) # Display tool usage - elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call: - print(f"\n🔧 Tool: {chunk.tool_call.tool_name}") + elif item.channel == "tools": + print(f"\n🔧 Tool event: {item.type}") # Show completion summary result = streaming.result print(f"\n\n{'='*60}") print("PIPELINE COMPLETE") print(f"{'='*60}") - print(f"Total chunks: {chunk_count}") + print(f"Total frames: {frame_count}") print(f"Final output length: {len(str(result))} characters") asyncio.run(run_with_dashboard()) @@ -352,8 +354,8 @@ class StatefulStreamingFlow(Flow[AnalysisState]): flow = StatefulStreamingFlow() streaming = flow.kickoff(inputs={"topic": "quantum computing"}) -for chunk in streaming: - print(chunk.content, end="", flush=True) +for item in streaming: + print(item.content, end="", flush=True) result = streaming.result print(f"\n\nFinal state:") @@ -373,29 +375,29 @@ Flow streaming is particularly valuable for: - **Progress Tracking**: Show users which stage of the workflow is currently executing - **Live Dashboards**: Create monitoring interfaces for production flows -## Stream Chunk Types +## Stream Frame Channels -Like crew streaming, flow chunks can be of different types: +Flow streaming yields `StreamFrame` items across several channels: -### TEXT Chunks +### LLM Frames Standard text content from LLM responses: ```python Code -for chunk in streaming: - if chunk.chunk_type == StreamChunkType.TEXT: - print(chunk.content, end="", flush=True) +for item in streaming: + if item.channel == "llm" and item.content: + print(item.content, end="", flush=True) ``` -### TOOL_CALL Chunks +### Tool Frames Information about tool calls within the flow: ```python Code -for chunk in streaming: - if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call: - print(f"\nTool: {chunk.tool_call.tool_name}") - print(f"Args: {chunk.tool_call.arguments}") +for item in streaming: + if item.channel == "tools": + print(f"\nTool event: {item.type}") + print(f"Payload: {item.event}") ``` ## Error Handling @@ -407,8 +409,8 @@ flow = ResearchFlow() streaming = flow.kickoff() try: - for chunk in streaming: - print(chunk.content, end="", flush=True) + for item in streaming: + print(item.content, end="", flush=True) result = streaming.result print(f"\nSuccess! Result: {result}") @@ -453,7 +455,7 @@ After cancellation, `streaming.is_cancelled` and `streaming.is_completed` are bo - You must iterate through all stream items before accessing the `.result` property - Streaming works with both structured and unstructured flow state - Flow streaming captures output from all crews and LLM calls in the flow -- Each chunk includes context about which agent and task generated it +- Each frame includes structured event context such as channel, type, namespace, and payload - Streaming adds minimal overhead to flow execution ## Combining with Flow Visualization @@ -467,8 +469,8 @@ flow.plot("research_flow") # Creates HTML visualization # Run with streaming streaming = flow.kickoff() -for chunk in streaming: - print(chunk.content, end="", flush=True) +for item in streaming: + print(item.content, end="", flush=True) result = streaming.result print(f"\nFlow complete! View structure at: research_flow.html") diff --git a/lib/crewai/src/crewai/events/event_bus.py b/lib/crewai/src/crewai/events/event_bus.py index b6536b728..69d950b59 100644 --- a/lib/crewai/src/crewai/events/event_bus.py +++ b/lib/crewai/src/crewai/events/event_bus.py @@ -777,9 +777,7 @@ class CrewAIEventsBus: source: The object emitting the event event: The event instance to emit """ - self._register_source(source) - event.emission_sequence = get_next_emission_sequence() - self._record_event(event) + self._prepare_event(source, event) event_type = type(event) diff --git a/lib/crewai/src/crewai/experimental/conversational_mixin.py b/lib/crewai/src/crewai/experimental/conversational_mixin.py index f83db93e0..709702892 100644 --- a/lib/crewai/src/crewai/experimental/conversational_mixin.py +++ b/lib/crewai/src/crewai/experimental/conversational_mixin.py @@ -45,6 +45,7 @@ from crewai.experimental.conversational import ( _conversational_only, message_to_llm_dict, ) +from crewai.flow.async_feedback import HumanFeedbackPending from crewai.flow.conversation import ( append_message as _append_conversation_message, get_conversation_messages, @@ -382,7 +383,6 @@ class _ConversationalMixin: self._pending_intents = list(intents) if intents else None self._pending_intent_llm = intent_llm - failed_event: ConversationTurnFailedEvent | None = None try: if "from_checkpoint" not in kickoff_kwargs: self._reset_turn_execution_state() @@ -407,6 +407,8 @@ class _ConversationalMixin: and self._is_public_turn_result(result) ): self.append_assistant_message(self._stringify_result(result)) + except HumanFeedbackPending as exc: + return exc except Exception as exc: failed_event = ConversationTurnFailedEvent( type="conversation_turn_failed", diff --git a/lib/crewai/src/crewai/flow/runtime/__init__.py b/lib/crewai/src/crewai/flow/runtime/__init__.py index ca1f9073f..86a366167 100644 --- a/lib/crewai/src/crewai/flow/runtime/__init__.py +++ b/lib/crewai/src/crewai/flow/runtime/__init__.py @@ -1950,6 +1950,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): return self.stream_events( inputs=inputs, input_files=input_files, + from_checkpoint=from_checkpoint, restore_from_state_id=restore_from_state_id, ) @@ -2015,6 +2016,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): return self.astream( inputs=inputs, input_files=input_files, + from_checkpoint=from_checkpoint, restore_from_state_id=restore_from_state_id, ) diff --git a/lib/crewai/src/crewai/llms/base_llm.py b/lib/crewai/src/crewai/llms/base_llm.py index a28592290..d805b04aa 100644 --- a/lib/crewai/src/crewai/llms/base_llm.py +++ b/lib/crewai/src/crewai/llms/base_llm.py @@ -339,20 +339,16 @@ class BaseLLM(BaseModel, ABC): output_holder: list[StreamSession[Any]] = [] def run_llm_call() -> Any: - original_stream = self.stream - try: - self.stream = True - return self.call( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, - ) - finally: - self.stream = original_stream + streaming_llm = self.model_copy(update={"stream": True}) + return streaming_llm.call( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) stream_session: StreamSession[Any] = StreamSession( sync_iterator=create_frame_generator(state, run_llm_call, output_holder) diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 8aa2ca296..52501fcca 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -29,6 +29,7 @@ from crewai.experimental import ( RouterConfig, ) from crewai.flow import Flow, ChatState, listen, start +from crewai.flow.async_feedback import HumanFeedbackPending, PendingFeedbackContext from crewai.flow.flow_context import ( current_flow_defer_trace_finalization, current_flow_id, @@ -220,6 +221,28 @@ class TestConversationalFlow: if frame.type == "llm_stream_chunk" ] == ["po", "ng"] + def test_stream_turn_returns_pending_feedback_without_failure_event(self) -> None: + flow = ConversationalFlow() + pending = HumanFeedbackPending( + context=PendingFeedbackContext( + flow_id="session-1", + flow_class="tests.PendingFeedbackFlow", + method_name="review", + method_output="draft", + message="Please review", + ) + ) + + def kickoff_side_effect(*_: Any, **__: Any) -> None: + raise pending + + with patch.object(flow, "kickoff", side_effect=kickoff_side_effect): + stream = flow.stream_turn("review this", session_id="session-1") + frames = list(stream.events) + + assert stream.result is pending + assert [frame.type for frame in frames] == ["conversation_turn_started"] + def test_deferred_multi_turn_emits_single_flow_finished(self) -> None: """A deferred multi-turn session lands as one trace: exactly one ``FlowFinishedEvent`` is emitted at ``finalize_session_traces()``, not diff --git a/lib/crewai/tests/test_stream_frames.py b/lib/crewai/tests/test_stream_frames.py index 272b70f2a..d844ae78f 100644 --- a/lib/crewai/tests/test_stream_frames.py +++ b/lib/crewai/tests/test_stream_frames.py @@ -4,7 +4,7 @@ from __future__ import annotations import asyncio from collections.abc import AsyncIterator -from typing import Any +from typing import Any, ClassVar import pytest @@ -59,7 +59,12 @@ class FrameFlow(Flow): class DirectStreamingLLM(BaseLLM): + call_stream_values: ClassVar[list[bool | None]] = [] + call_instance_ids: ClassVar[list[int]] = [] + def call(self, messages: Any, *args: Any, **kwargs: Any) -> str: + self.call_stream_values.append(self.stream) + self.call_instance_ids.append(id(self)) crewai_event_bus.emit( self, LLMStreamChunkEvent( @@ -149,6 +154,8 @@ def test_flow_streaming_returns_iterable_frame_session() -> None: def test_direct_llm_stream_events_scope_and_restore_stream_flag() -> None: + DirectStreamingLLM.call_stream_values = [] + DirectStreamingLLM.call_instance_ids = [] llm = DirectStreamingLLM(model="gpt-4o-mini", stream=False) with llm.stream_events("hello") as stream: @@ -158,6 +165,8 @@ def test_direct_llm_stream_events_scope_and_restore_stream_flag() -> None: assert frames[0].event["chunk"] == "hel" assert stream.result == "hello" assert llm.stream is False + assert DirectStreamingLLM.call_stream_values == [True] + assert DirectStreamingLLM.call_instance_ids != [id(llm)] @pytest.mark.asyncio diff --git a/lib/crewai/tests/test_streaming.py b/lib/crewai/tests/test_streaming.py index 60d41c77a..e3d84f4c9 100644 --- a/lib/crewai/tests/test_streaming.py +++ b/lib/crewai/tests/test_streaming.py @@ -11,6 +11,7 @@ from crewai import Agent, Crew, Task from crewai.events.event_bus import crewai_event_bus from crewai.events.types.llm_events import LLMStreamChunkEvent, ToolCall, FunctionCall from crewai.flow.flow import Flow, start +from crewai.state.checkpoint_config import CheckpointConfig from crewai.types.streaming import ( AsyncStreamSession, CrewStreamingOutput, @@ -509,6 +510,25 @@ class TestFlowKickoffStreaming: result = streaming.result assert result == "flow result" + def test_streaming_kickoff_passes_checkpoint_config_to_stream_events(self) -> None: + """stream=True preserves checkpoint config when routing to stream_events.""" + + class TestFlow(Flow[dict[str, Any]]): + @start() + def generate(self) -> str: + return "flow result" + + flow = TestFlow() + flow.stream = True + checkpoint = CheckpointConfig() + + with patch.object(flow, "stream_events", wraps=flow.stream_events) as spy: + streaming = flow.kickoff(from_checkpoint=checkpoint) + list(streaming) + + assert spy.call_args.kwargs["from_checkpoint"] is checkpoint + assert streaming.result == "flow result" + class TestFlowKickoffStreamingAsync: """Tests for Flow(stream=True).kickoff_async() method.""" @@ -611,6 +631,29 @@ class TestFlowKickoffStreamingAsync: result = streaming.result assert result == "async flow result" + @pytest.mark.asyncio + async def test_streaming_kickoff_async_passes_checkpoint_config_to_astream( + self, + ) -> None: + """stream=True preserves checkpoint config when routing to astream.""" + + class TestFlow(Flow[dict[str, Any]]): + @start() + async def generate(self) -> str: + return "async flow result" + + flow = TestFlow() + flow.stream = True + checkpoint = CheckpointConfig() + + with patch.object(flow, "astream", wraps=flow.astream) as spy: + streaming = await flow.kickoff_async(from_checkpoint=checkpoint) + async for _ in streaming: + pass + + assert spy.call_args.kwargs["from_checkpoint"] is checkpoint + assert streaming.result == "async flow result" + class TestStreamingEdgeCases: """Tests for edge cases in streaming functionality.""" diff --git a/lib/crewai/tests/test_streaming_integration.py b/lib/crewai/tests/test_streaming_integration.py index 93869f682..91f92f7cf 100644 --- a/lib/crewai/tests/test_streaming_integration.py +++ b/lib/crewai/tests/test_streaming_integration.py @@ -3,6 +3,8 @@ import pytest from crewai import Agent, Crew, Task +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.llm_events import LLMStreamChunkEvent from crewai.flow.flow import Flow, start from crewai.types.streaming import AsyncStreamSession, CrewStreamingOutput, StreamSession @@ -232,6 +234,14 @@ class TestStreamingFlowIntegration: @start() def execute(self) -> str: + crewai_event_bus.emit( + self, + LLMStreamChunkEvent( + type="llm_stream_chunk", + chunk="Flow result", + call_id="call-1", + ), + ) return "Flow result" flow = SimpleFlow() @@ -241,8 +251,10 @@ class TestStreamingFlowIntegration: pass assert streaming.is_completed is True - full_text = "".join(frame.content for frame in streaming.frames) - assert isinstance(full_text, str) + content_frames = [frame for frame in streaming.frames if frame.content] + full_text = "".join(frame.content for frame in content_frames) + assert full_text == "Flow result" + assert len(content_frames) == 1 assert len(streaming.frames) > 0 result = streaming.result diff --git a/lib/crewai/tests/utilities/events/test_async_event_bus.py b/lib/crewai/tests/utilities/events/test_async_event_bus.py index 00f8c88e5..cc21445e6 100644 --- a/lib/crewai/tests/utilities/events/test_async_event_bus.py +++ b/lib/crewai/tests/utilities/events/test_async_event_bus.py @@ -9,6 +9,7 @@ import pytest from crewai.events.base_events import BaseEvent from crewai.events.event_bus import crewai_event_bus +from crewai.events.stream_context import add_stream_sink, reset_stream_sinks class AsyncTestEvent(BaseEvent): @@ -53,6 +54,24 @@ async def test_aemit_with_async_handlers(): assert received_events[0] == event +@pytest.mark.asyncio +async def test_aemit_publishes_to_active_stream_sinks(): + published_events = [] + + def sink(source: object, event: BaseEvent) -> None: + published_events.append((source, event)) + + event = AsyncTestEvent(type="async_test") + token = add_stream_sink(sink) + try: + await crewai_event_bus.aemit("test_source", event) + finally: + reset_stream_sinks(token) + + assert published_events == [("test_source", event)] + assert event.emission_sequence is not None + + @pytest.mark.asyncio async def test_multiple_async_handlers(): received_events_1 = []