Files
crewAI/lib/crewai/tests/test_streaming.py
Greyson LaLonde f3c5d1e351 feat: add streaming result support to flows and crews
* feat: add streaming result support to flows and crews
* docs: add streaming execution documentation and integration tests
2025-11-24 15:43:48 -05:00

718 lines
23 KiB
Python

"""Tests for streaming output functionality in crews and flows."""
import asyncio
from collections.abc import AsyncIterator, Generator
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from crewai import Agent, Crew, Task
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMStreamChunkEvent, ToolCall, FunctionCall
from crewai.flow.flow import Flow, start
from crewai.types.streaming import (
CrewStreamingOutput,
FlowStreamingOutput,
StreamChunk,
StreamChunkType,
ToolCallChunk,
)
@pytest.fixture
def researcher() -> Agent:
"""Create a researcher agent for testing."""
return Agent(
role="Researcher",
goal="Research and analyze topics thoroughly",
backstory="You are an expert researcher with deep analytical skills.",
allow_delegation=False,
)
@pytest.fixture
def simple_task(researcher: Agent) -> Task:
"""Create a simple task for testing."""
return Task(
description="Write a brief analysis of AI trends",
expected_output="A concise analysis of current AI trends",
agent=researcher,
)
@pytest.fixture
def simple_crew(researcher: Agent, simple_task: Task) -> Crew:
"""Create a simple crew with one agent and one task."""
return Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
)
@pytest.fixture
def streaming_crew(researcher: Agent, simple_task: Task) -> Crew:
"""Create a streaming crew with one agent and one task."""
return Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
stream=True,
)
class TestStreamChunk:
"""Tests for StreamChunk model."""
def test_stream_chunk_creation(self) -> None:
"""Test creating a basic stream chunk."""
chunk = StreamChunk(
content="Hello, world!",
chunk_type=StreamChunkType.TEXT,
task_index=0,
task_name="Test Task",
task_id="task-123",
agent_role="Researcher",
agent_id="agent-456",
)
assert chunk.content == "Hello, world!"
assert chunk.chunk_type == StreamChunkType.TEXT
assert chunk.task_index == 0
assert chunk.task_name == "Test Task"
assert str(chunk) == "Hello, world!"
def test_stream_chunk_with_tool_call(self) -> None:
"""Test creating a stream chunk with tool call information."""
tool_call = ToolCallChunk(
tool_id="call-123",
tool_name="search",
arguments='{"query": "AI trends"}',
index=0,
)
chunk = StreamChunk(
content="",
chunk_type=StreamChunkType.TOOL_CALL,
tool_call=tool_call,
)
assert chunk.chunk_type == StreamChunkType.TOOL_CALL
assert chunk.tool_call is not None
assert chunk.tool_call.tool_name == "search"
class TestCrewStreamingOutput:
"""Tests for CrewStreamingOutput functionality."""
def test_result_before_iteration_raises_error(self) -> None:
"""Test that accessing result before iteration raises error."""
def empty_gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="test")
streaming = CrewStreamingOutput(sync_iterator=empty_gen())
with pytest.raises(RuntimeError, match="Streaming has not completed yet"):
_ = streaming.result
def test_is_completed_property(self) -> None:
"""Test the is_completed property."""
def simple_gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="test")
streaming = CrewStreamingOutput(sync_iterator=simple_gen())
assert streaming.is_completed is False
list(streaming)
assert streaming.is_completed is True
def test_get_full_text(self) -> None:
"""Test getting full text from chunks."""
def gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="Hello ")
yield StreamChunk(content="World!")
yield StreamChunk(content="", chunk_type=StreamChunkType.TOOL_CALL)
streaming = CrewStreamingOutput(sync_iterator=gen())
list(streaming)
assert streaming.get_full_text() == "Hello World!"
def test_chunks_property(self) -> None:
"""Test accessing collected chunks."""
def gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="chunk1")
yield StreamChunk(content="chunk2")
streaming = CrewStreamingOutput(sync_iterator=gen())
list(streaming)
assert len(streaming.chunks) == 2
assert streaming.chunks[0].content == "chunk1"
class TestFlowStreamingOutput:
"""Tests for FlowStreamingOutput functionality."""
def test_result_before_iteration_raises_error(self) -> None:
"""Test that accessing result before iteration raises error."""
def empty_gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="test")
streaming = FlowStreamingOutput(sync_iterator=empty_gen())
with pytest.raises(RuntimeError, match="Streaming has not completed yet"):
_ = streaming.result
def test_is_completed_property(self) -> None:
"""Test the is_completed property."""
def simple_gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="test")
streaming = FlowStreamingOutput(sync_iterator=simple_gen())
assert streaming.is_completed is False
list(streaming)
assert streaming.is_completed is True
class TestCrewKickoffStreaming:
"""Tests for Crew(stream=True).kickoff() method."""
def test_kickoff_streaming_returns_streaming_output(self, streaming_crew: Crew) -> None:
"""Test that kickoff with stream=True returns CrewStreamingOutput."""
with patch.object(Crew, "kickoff") as mock_kickoff:
mock_output = MagicMock()
mock_output.raw = "Test output"
def side_effect(*args: Any, **kwargs: Any) -> Any:
return mock_output
mock_kickoff.side_effect = side_effect
streaming = streaming_crew.kickoff()
assert isinstance(streaming, CrewStreamingOutput)
def test_kickoff_streaming_captures_chunks(self, researcher: Agent, simple_task: Task) -> None:
"""Test that streaming captures LLM chunks."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
stream=True,
)
mock_output = MagicMock()
mock_output.raw = "Test output"
original_kickoff = Crew.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
crewai_event_bus.emit(
crew,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Hello ",
),
)
crewai_event_bus.emit(
crew,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="World!",
),
)
return mock_output
with patch.object(Crew, "kickoff", mock_kickoff_fn):
streaming = crew.kickoff()
assert isinstance(streaming, CrewStreamingOutput)
chunks = list(streaming)
assert len(chunks) >= 2
contents = [c.content for c in chunks]
assert "Hello " in contents
assert "World!" in contents
def test_kickoff_streaming_result_available_after_iteration(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test that result is available after iterating all chunks."""
mock_output = MagicMock()
mock_output.raw = "Final result"
def gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="test chunk")
streaming = CrewStreamingOutput(sync_iterator=gen())
# Iterate all chunks
_ = list(streaming)
# Simulate what _finalize_streaming does
streaming._set_result(mock_output)
result = streaming.result
assert result.raw == "Final result"
def test_kickoff_streaming_handles_tool_calls(self, researcher: Agent, simple_task: Task) -> None:
"""Test that streaming handles tool call chunks correctly."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
stream=True,
)
mock_output = MagicMock()
mock_output.raw = "Test output"
original_kickoff = Crew.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
crewai_event_bus.emit(
crew,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="",
tool_call=ToolCall(
id="call-123",
function=FunctionCall(
name="search",
arguments='{"query": "test"}',
),
type="function",
index=0,
),
),
)
return mock_output
with patch.object(Crew, "kickoff", mock_kickoff_fn):
streaming = crew.kickoff()
assert isinstance(streaming, CrewStreamingOutput)
chunks = list(streaming)
tool_chunks = [c for c in chunks if c.chunk_type == StreamChunkType.TOOL_CALL]
assert len(tool_chunks) >= 1
assert tool_chunks[0].tool_call is not None
assert tool_chunks[0].tool_call.tool_name == "search"
class TestCrewKickoffStreamingAsync:
"""Tests for Crew(stream=True).kickoff_async() method."""
@pytest.mark.asyncio
async def test_kickoff_streaming_async_returns_streaming_output(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test that kickoff_async with stream=True returns CrewStreamingOutput."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
stream=True,
)
mock_output = MagicMock()
mock_output.raw = "Test output"
original_kickoff = Crew.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
return mock_output
with patch.object(Crew, "kickoff", mock_kickoff_fn):
streaming = await crew.kickoff_async()
assert isinstance(streaming, CrewStreamingOutput)
@pytest.mark.asyncio
async def test_kickoff_streaming_async_captures_chunks(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test that async streaming captures LLM chunks."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
stream=True,
)
mock_output = MagicMock()
mock_output.raw = "Test output"
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
crewai_event_bus.emit(
crew,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async ",
),
)
crewai_event_bus.emit(
crew,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Stream!",
),
)
return mock_output
with patch.object(Crew, "kickoff", mock_kickoff_fn):
streaming = await crew.kickoff_async()
assert isinstance(streaming, CrewStreamingOutput)
chunks: list[StreamChunk] = []
async for chunk in streaming:
chunks.append(chunk)
assert len(chunks) >= 2
contents = [c.content for c in chunks]
assert "Async " in contents
assert "Stream!" in contents
@pytest.mark.asyncio
async def test_kickoff_streaming_async_result_available_after_iteration(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test that result is available after async iteration."""
mock_output = MagicMock()
mock_output.raw = "Async result"
async def async_gen() -> AsyncIterator[StreamChunk]:
yield StreamChunk(content="test chunk")
streaming = CrewStreamingOutput(async_iterator=async_gen())
# Iterate all chunks
async for _ in streaming:
pass
# Simulate what _finalize_streaming does
streaming._set_result(mock_output)
result = streaming.result
assert result.raw == "Async result"
class TestFlowKickoffStreaming:
"""Tests for Flow(stream=True).kickoff() method."""
def test_kickoff_streaming_returns_streaming_output(self) -> None:
"""Test that flow kickoff with stream=True returns FlowStreamingOutput."""
class SimpleFlow(Flow[dict[str, Any]]):
@start()
def generate(self) -> str:
return "result"
flow = SimpleFlow()
flow.stream = True
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
def test_flow_kickoff_streaming_captures_chunks(self) -> None:
"""Test that flow streaming captures LLM chunks from crew execution."""
class TestFlow(Flow[dict[str, Any]]):
@start()
def run_crew(self) -> str:
return "done"
flow = TestFlow()
flow.stream = True
original_kickoff = Flow.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
crewai_event_bus.emit(
flow,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Flow ",
),
)
crewai_event_bus.emit(
flow,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="output!",
),
)
return "done"
with patch.object(Flow, "kickoff", mock_kickoff_fn):
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
chunks = list(streaming)
assert len(chunks) >= 2
contents = [c.content for c in chunks]
assert "Flow " in contents
assert "output!" in contents
def test_flow_kickoff_streaming_result_available(self) -> None:
"""Test that flow result is available after iteration."""
class TestFlow(Flow[dict[str, Any]]):
@start()
def generate(self) -> str:
return "flow result"
flow = TestFlow()
flow.stream = True
original_kickoff = Flow.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
return "flow result"
with patch.object(Flow, "kickoff", mock_kickoff_fn):
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
_ = list(streaming)
result = streaming.result
assert result == "flow result"
class TestFlowKickoffStreamingAsync:
"""Tests for Flow(stream=True).kickoff_async() method."""
@pytest.mark.asyncio
async def test_kickoff_streaming_async_returns_streaming_output(self) -> None:
"""Test that flow kickoff_async with stream=True returns FlowStreamingOutput."""
class SimpleFlow(Flow[dict[str, Any]]):
@start()
async def generate(self) -> str:
return "async result"
flow = SimpleFlow()
flow.stream = True
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
@pytest.mark.asyncio
async def test_flow_kickoff_streaming_async_captures_chunks(self) -> None:
"""Test that async flow streaming captures LLM chunks."""
class TestFlow(Flow[dict[str, Any]]):
@start()
async def run_crew(self) -> str:
return "done"
flow = TestFlow()
flow.stream = True
original_kickoff = Flow.kickoff_async
call_count = [0]
async def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return await original_kickoff(self, inputs)
else:
await asyncio.sleep(0.01)
crewai_event_bus.emit(
flow,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async flow ",
),
)
await asyncio.sleep(0.01)
crewai_event_bus.emit(
flow,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="stream!",
),
)
await asyncio.sleep(0.01)
return "done"
with patch.object(Flow, "kickoff_async", mock_kickoff_fn):
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
chunks: list[StreamChunk] = []
async for chunk in streaming:
chunks.append(chunk)
assert len(chunks) >= 2
contents = [c.content for c in chunks]
assert "Async flow " in contents
assert "stream!" in contents
@pytest.mark.asyncio
async def test_flow_kickoff_streaming_async_result_available(self) -> None:
"""Test that async flow result is available after iteration."""
class TestFlow(Flow[dict[str, Any]]):
@start()
async def generate(self) -> str:
return "async flow result"
flow = TestFlow()
flow.stream = True
original_kickoff = Flow.kickoff_async
call_count = [0]
async def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return await original_kickoff(self, inputs)
else:
return "async flow result"
with patch.object(Flow, "kickoff_async", mock_kickoff_fn):
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
async for _ in streaming:
pass
result = streaming.result
assert result == "async flow result"
class TestStreamingEdgeCases:
"""Tests for edge cases in streaming functionality."""
def test_streaming_handles_exceptions(self, researcher: Agent, simple_task: Task) -> None:
"""Test that streaming properly propagates exceptions."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
stream=True,
)
original_kickoff = Crew.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
raise ValueError("Test error")
with patch.object(Crew, "kickoff", mock_kickoff_fn):
streaming = crew.kickoff()
with pytest.raises(ValueError, match="Test error"):
list(streaming)
def test_streaming_with_empty_content_chunks(self) -> None:
"""Test streaming when LLM chunks have empty content."""
mock_output = MagicMock()
mock_output.raw = "No streaming"
def gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="")
streaming = CrewStreamingOutput(sync_iterator=gen())
chunks = list(streaming)
assert streaming.is_completed
assert len(chunks) == 1
assert chunks[0].content == ""
# Simulate what _finalize_streaming does
streaming._set_result(mock_output)
result = streaming.result
assert result.raw == "No streaming"
def test_streaming_with_multiple_tasks(self, researcher: Agent) -> None:
"""Test streaming with multiple tasks tracks task context."""
task1 = Task(
description="First task",
expected_output="First output",
agent=researcher,
)
task2 = Task(
description="Second task",
expected_output="Second output",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task1, task2],
verbose=False,
stream=True,
)
mock_output = MagicMock()
mock_output.raw = "Multi-task output"
original_kickoff = Crew.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
crewai_event_bus.emit(
crew,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Task 1",
task_name="First task",
),
)
return mock_output
with patch.object(Crew, "kickoff", mock_kickoff_fn):
streaming = crew.kickoff()
assert isinstance(streaming, CrewStreamingOutput)
chunks = list(streaming)
assert len(chunks) >= 1
assert streaming.is_completed
class TestStreamingImports:
"""Tests for correct imports of streaming types."""
def test_streaming_types_importable_from_types_module(self) -> None:
"""Test that streaming types can be imported from crewai.types.streaming."""
from crewai.types.streaming import (
CrewStreamingOutput,
FlowStreamingOutput,
StreamChunk,
StreamChunkType,
ToolCallChunk,
)
assert CrewStreamingOutput is not None
assert FlowStreamingOutput is not None
assert StreamChunk is not None
assert StreamChunkType is not None
assert ToolCallChunk is not None