feat: add streaming result support to flows and crews

* feat: add streaming result support to flows and crews
* docs: add streaming execution documentation and integration tests
This commit is contained in:
Greyson LaLonde
2025-11-24 15:43:48 -05:00
committed by GitHub
parent a978267fa2
commit f3c5d1e351
21 changed files with 15819 additions and 37 deletions

View File

@@ -33,6 +33,7 @@ A crew in crewAI represents a collaborative group of agents working together to
| **Planning** *(optional)* | `planning` | Adds planning ability to the Crew. When activated before each Crew iteration, all Crew data is sent to an AgentPlanner that will plan the tasks and this plan will be added to each task description. |
| **Planning LLM** *(optional)* | `planning_llm` | The language model used by the AgentPlanner in a planning process. |
| **Knowledge Sources** _(optional)_ | `knowledge_sources` | Knowledge sources available at the crew level, accessible to all the agents. |
| **Stream** _(optional)_ | `stream` | Enable streaming output to receive real-time updates during crew execution. Returns a `CrewStreamingOutput` object that can be iterated for chunks. Defaults to `False`. |
<Tip>
**Crew Max RPM**: The `max_rpm` attribute sets the maximum number of requests per minute the crew can perform to avoid rate limits and will override individual agents' `max_rpm` settings if you set it.
@@ -338,6 +339,29 @@ for async_result in async_results:
These methods provide flexibility in how you manage and execute tasks within your crew, allowing for both synchronous and asynchronous workflows tailored to your needs.
### Streaming Crew Execution
For real-time visibility into crew execution, you can enable streaming to receive output as it's generated:
```python Code
# Enable streaming
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
# Iterate over streaming output
streaming = crew.kickoff(inputs={"topic": "AI"})
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access final result
result = streaming.result
```
Learn more about streaming in the [Streaming Crew Execution](/en/learn/streaming-crew-execution) guide.
### Replaying from a Specific Task
You can now replay from a specific task using our CLI command `replay`.

View File

@@ -897,6 +897,31 @@ flow = ExampleFlow()
result = flow.kickoff()
```
### Streaming Flow Execution
For real-time visibility into flow execution, you can enable streaming to receive output as it's generated:
```python
class StreamingFlow(Flow):
stream = True # Enable streaming
@start()
def research(self):
# Your flow implementation
pass
# Iterate over streaming output
flow = StreamingFlow()
streaming = flow.kickoff()
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access final result
result = streaming.result
```
Learn more about streaming in the [Streaming Flow Execution](/en/learn/streaming-flow-execution) guide.
### Using the CLI
Starting from version 0.103.0, you can run flows using the `crewai run` command:

View File

@@ -0,0 +1,320 @@
---
title: Streaming Crew Execution
description: Stream real-time output from your CrewAI crew execution
icon: wave-pulse
mode: "wide"
---
## Introduction
CrewAI provides the ability to stream real-time output during crew execution, allowing you to display results as they're generated rather than waiting for the entire process to complete. This feature is particularly useful for building interactive applications, providing user feedback, and monitoring long-running processes.
## How Streaming Works
When streaming is enabled, CrewAI captures LLM responses and tool calls as they happen, packaging them into structured chunks that include context about which task and agent is executing. You can iterate over these chunks in real-time and access the final result once execution completes.
## Enabling Streaming
To enable streaming, set the `stream` parameter to `True` when creating your crew:
```python Code
from crewai import Agent, Crew, Task
# Create your agents and tasks
researcher = Agent(
role="Research Analyst",
goal="Gather comprehensive information on topics",
backstory="You are an experienced researcher with excellent analytical skills.",
)
task = Task(
description="Research the latest developments in AI",
expected_output="A detailed report on recent AI advancements",
agent=researcher,
)
# Enable streaming
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True # Enable streaming output
)
```
## Synchronous Streaming
When you call `kickoff()` on a crew with streaming enabled, it returns a `CrewStreamingOutput` object that you can iterate over to receive chunks as they arrive:
```python Code
# Start streaming execution
streaming = crew.kickoff(inputs={"topic": "artificial intelligence"})
# Iterate over chunks as they arrive
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result.raw}")
```
### Stream Chunk Information
Each chunk provides rich context about the execution:
```python Code
streaming = crew.kickoff(inputs={"topic": "AI"})
for chunk in streaming:
print(f"Task: {chunk.task_name} (index {chunk.task_index})")
print(f"Agent: {chunk.agent_role}")
print(f"Content: {chunk.content}")
print(f"Type: {chunk.chunk_type}") # TEXT or TOOL_CALL
if chunk.tool_call:
print(f"Tool: {chunk.tool_call.tool_name}")
print(f"Arguments: {chunk.tool_call.arguments}")
```
### Accessing Streaming Results
The `CrewStreamingOutput` object provides several useful properties:
```python Code
streaming = crew.kickoff(inputs={"topic": "AI"})
# Iterate and collect chunks
for chunk in streaming:
print(chunk.content, end="", flush=True)
# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"All chunks: {len(streaming.chunks)}")
print(f"Final result: {streaming.result.raw}")
```
## Asynchronous Streaming
For async applications, use `kickoff_async()` with async iteration:
```python Code
import asyncio
async def stream_crew():
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
# Start async streaming
streaming = await crew.kickoff_async(inputs={"topic": "AI"})
# Async iteration over chunks
async for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access final result
result = streaming.result
print(f"\n\nFinal output: {result.raw}")
asyncio.run(stream_crew())
```
## Streaming with kickoff_for_each
When executing a crew for multiple inputs with `kickoff_for_each()`, streaming works differently depending on whether you use sync or async:
### Synchronous kickoff_for_each
With synchronous `kickoff_for_each()`, you get a list of `CrewStreamingOutput` objects, one for each input:
```python Code
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
inputs_list = [
{"topic": "AI in healthcare"},
{"topic": "AI in finance"}
]
# Returns list of streaming outputs
streaming_outputs = crew.kickoff_for_each(inputs=inputs_list)
# Iterate over each streaming output
for i, streaming in enumerate(streaming_outputs):
print(f"\n=== Input {i + 1} ===")
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\n\nResult {i + 1}: {result.raw}")
```
### Asynchronous kickoff_for_each_async
With async `kickoff_for_each_async()`, you get a single `CrewStreamingOutput` that yields chunks from all crews as they arrive concurrently:
```python Code
import asyncio
async def stream_multiple_crews():
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True
)
inputs_list = [
{"topic": "AI in healthcare"},
{"topic": "AI in finance"}
]
# Returns single streaming output for all crews
streaming = await crew.kickoff_for_each_async(inputs=inputs_list)
# Chunks from all crews arrive as they're generated
async for chunk in streaming:
print(f"[{chunk.task_name}] {chunk.content}", end="", flush=True)
# Access all results
results = streaming.results # List of CrewOutput objects
for i, result in enumerate(results):
print(f"\n\nResult {i + 1}: {result.raw}")
asyncio.run(stream_multiple_crews())
```
## Stream Chunk Types
Chunks can be of different types, indicated by the `chunk_type` field:
### TEXT Chunks
Standard text content from LLM responses:
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
```
### TOOL_CALL Chunks
Information about tool calls being made:
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TOOL_CALL:
print(f"\nCalling tool: {chunk.tool_call.tool_name}")
print(f"Arguments: {chunk.tool_call.arguments}")
```
## Practical Example: Building a UI with Streaming
Here's a complete example showing how to build an interactive application with streaming:
```python Code
import asyncio
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType
async def interactive_research():
# Create crew with streaming enabled
researcher = Agent(
role="Research Analyst",
goal="Provide detailed analysis on any topic",
backstory="You are an expert researcher with broad knowledge.",
)
task = Task(
description="Research and analyze: {topic}",
expected_output="A comprehensive analysis with key insights",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True,
verbose=False
)
# Get user input
topic = input("Enter a topic to research: ")
print(f"\n{'='*60}")
print(f"Researching: {topic}")
print(f"{'='*60}\n")
# Start streaming execution
streaming = await crew.kickoff_async(inputs={"topic": topic})
current_task = ""
async for chunk in streaming:
# Show task transitions
if chunk.task_name != current_task:
current_task = chunk.task_name
print(f"\n[{chunk.agent_role}] Working on: {chunk.task_name}")
print("-" * 60)
# Display text chunks
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
# Display tool calls
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\n🔧 Using tool: {chunk.tool_call.tool_name}")
# Show final result
result = streaming.result
print(f"\n\n{'='*60}")
print("Analysis Complete!")
print(f"{'='*60}")
print(f"\nToken Usage: {result.token_usage}")
asyncio.run(interactive_research())
```
## Use Cases
Streaming is particularly valuable for:
- **Interactive Applications**: Provide real-time feedback to users as agents work
- **Long-Running Tasks**: Show progress for research, analysis, or content generation
- **Debugging and Monitoring**: Observe agent behavior and decision-making in real-time
- **User Experience**: Reduce perceived latency by showing incremental results
- **Live Dashboards**: Build monitoring interfaces that display crew execution status
## Important Notes
- Streaming automatically enables LLM streaming for all agents in the crew
- You must iterate through all chunks before accessing the `.result` property
- For `kickoff_for_each_async()` with streaming, use `.results` (plural) to get all outputs
- Streaming adds minimal overhead and can actually improve perceived performance
- Each chunk includes full context (task, agent, chunk type) for rich UIs
## Error Handling
Handle errors during streaming execution:
```python Code
streaming = crew.kickoff(inputs={"topic": "AI"})
try:
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\nSuccess: {result.raw}")
except Exception as e:
print(f"\nError during streaming: {e}")
if streaming.is_completed:
print("Streaming completed but an error occurred")
```
By leveraging streaming, you can build more responsive and interactive applications with CrewAI, providing users with real-time visibility into agent execution and results.

View File

@@ -0,0 +1,450 @@
---
title: Streaming Flow Execution
description: Stream real-time output from your CrewAI flow execution
icon: wave-pulse
mode: "wide"
---
## Introduction
CrewAI Flows support streaming output, allowing you to receive real-time updates as your flow executes. This feature enables you to build responsive applications that display results incrementally, provide live progress updates, and create better user experiences for long-running workflows.
## How Flow Streaming Works
When streaming is enabled on a Flow, CrewAI captures and streams output from any crews or LLM calls within the flow. The stream delivers structured chunks containing the content, task context, and agent information as execution progresses.
## Enabling Streaming
To enable streaming, set the `stream` attribute to `True` on your Flow class:
```python Code
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
class ResearchFlow(Flow):
stream = True # Enable streaming for the entire flow
@start()
def initialize(self):
return {"topic": "AI trends"}
@listen(initialize)
def research_topic(self, data):
researcher = Agent(
role="Research Analyst",
goal="Research topics thoroughly",
backstory="Expert researcher with analytical skills",
)
task = Task(
description="Research {topic} and provide insights",
expected_output="Detailed research findings",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
)
return crew.kickoff(inputs=data)
```
## Synchronous Streaming
When you call `kickoff()` on a flow with streaming enabled, it returns a `FlowStreamingOutput` object that you can iterate over:
```python Code
flow = ResearchFlow()
# Start streaming execution
streaming = flow.kickoff()
# Iterate over chunks as they arrive
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result}")
```
### Stream Chunk Information
Each chunk provides context about where it originated in the flow:
```python Code
streaming = flow.kickoff()
for chunk in streaming:
print(f"Agent: {chunk.agent_role}")
print(f"Task: {chunk.task_name}")
print(f"Content: {chunk.content}")
print(f"Type: {chunk.chunk_type}") # TEXT or TOOL_CALL
```
### Accessing Streaming Properties
The `FlowStreamingOutput` object provides useful properties and methods:
```python Code
streaming = flow.kickoff()
# Iterate and collect chunks
for chunk in streaming:
print(chunk.content, end="", flush=True)
# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"Total chunks: {len(streaming.chunks)}")
print(f"Final result: {streaming.result}")
```
## Asynchronous Streaming
For async applications, use `kickoff_async()` with async iteration:
```python Code
import asyncio
async def stream_flow():
flow = ResearchFlow()
# Start async streaming
streaming = await flow.kickoff_async()
# Async iteration over chunks
async for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access final result
result = streaming.result
print(f"\n\nFinal output: {result}")
asyncio.run(stream_flow())
```
## Streaming with Multi-Step Flows
Streaming works seamlessly across multiple flow steps, including flows that execute multiple crews:
```python Code
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
class MultiStepFlow(Flow):
stream = True
@start()
def research_phase(self):
"""First crew: Research the topic."""
researcher = Agent(
role="Research Analyst",
goal="Gather comprehensive information",
backstory="Expert at finding relevant information",
)
task = Task(
description="Research AI developments in healthcare",
expected_output="Research findings on AI in healthcare",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state["research"] = result.raw
return result.raw
@listen(research_phase)
def analysis_phase(self, research_data):
"""Second crew: Analyze the research."""
analyst = Agent(
role="Data Analyst",
goal="Analyze information and extract insights",
backstory="Expert at identifying patterns and trends",
)
task = Task(
description="Analyze this research: {research}",
expected_output="Key insights and trends",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
return crew.kickoff(inputs={"research": research_data})
# Stream across both phases
flow = MultiStepFlow()
streaming = flow.kickoff()
current_step = ""
for chunk in streaming:
# Track which flow step is executing
if chunk.task_name != current_step:
current_step = chunk.task_name
print(f"\n\n=== {chunk.task_name} ===\n")
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal analysis: {result}")
```
## Practical Example: Progress Dashboard
Here's a complete example showing how to build a progress dashboard with streaming:
```python Code
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType
class ResearchPipeline(Flow):
stream = True
@start()
def gather_data(self):
researcher = Agent(
role="Data Gatherer",
goal="Collect relevant information",
backstory="Skilled at finding quality sources",
)
task = Task(
description="Gather data on renewable energy trends",
expected_output="Collection of relevant data points",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state["data"] = result.raw
return result.raw
@listen(gather_data)
def analyze_data(self, data):
analyst = Agent(
role="Data Analyst",
goal="Extract meaningful insights",
backstory="Expert at data analysis",
)
task = Task(
description="Analyze: {data}",
expected_output="Key insights and trends",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
return crew.kickoff(inputs={"data": data})
async def run_with_dashboard():
flow = ResearchPipeline()
print("="*60)
print("RESEARCH PIPELINE DASHBOARD")
print("="*60)
streaming = await flow.kickoff_async()
current_agent = ""
current_task = ""
chunk_count = 0
async for chunk in streaming:
chunk_count += 1
# Display phase transitions
if chunk.task_name != current_task:
current_task = chunk.task_name
current_agent = chunk.agent_role
print(f"\n\n📋 Phase: {current_task}")
print(f"👤 Agent: {current_agent}")
print("-" * 60)
# Display text output
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
# Display tool usage
elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\n🔧 Tool: {chunk.tool_call.tool_name}")
# Show completion summary
result = streaming.result
print(f"\n\n{'='*60}")
print("PIPELINE COMPLETE")
print(f"{'='*60}")
print(f"Total chunks: {chunk_count}")
print(f"Final output length: {len(str(result))} characters")
asyncio.run(run_with_dashboard())
```
## Streaming with State Management
Streaming works naturally with Flow state management:
```python Code
from pydantic import BaseModel
class AnalysisState(BaseModel):
topic: str = ""
research: str = ""
insights: str = ""
class StatefulStreamingFlow(Flow[AnalysisState]):
stream = True
@start()
def research(self):
# State is available during streaming
topic = self.state.topic
print(f"Researching: {topic}")
researcher = Agent(
role="Researcher",
goal="Research topics thoroughly",
backstory="Expert researcher",
)
task = Task(
description=f"Research {topic}",
expected_output="Research findings",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state.research = result.raw
return result.raw
@listen(research)
def analyze(self, research):
# Access updated state
print(f"Analyzing {len(self.state.research)} chars of research")
analyst = Agent(
role="Analyst",
goal="Extract insights",
backstory="Expert analyst",
)
task = Task(
description="Analyze: {research}",
expected_output="Key insights",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
result = crew.kickoff(inputs={"research": research})
self.state.insights = result.raw
return result.raw
# Run with streaming
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\n\nFinal state:")
print(f"Topic: {flow.state.topic}")
print(f"Research length: {len(flow.state.research)}")
print(f"Insights length: {len(flow.state.insights)}")
```
## Use Cases
Flow streaming is particularly valuable for:
- **Multi-Stage Workflows**: Show progress across research, analysis, and synthesis phases
- **Complex Pipelines**: Provide visibility into long-running data processing flows
- **Interactive Applications**: Build responsive UIs that display intermediate results
- **Monitoring and Debugging**: Observe flow execution and crew interactions in real-time
- **Progress Tracking**: Show users which stage of the workflow is currently executing
- **Live Dashboards**: Create monitoring interfaces for production flows
## Stream Chunk Types
Like crew streaming, flow chunks can be of different types:
### TEXT Chunks
Standard text content from LLM responses:
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TEXT:
print(chunk.content, end="", flush=True)
```
### TOOL_CALL Chunks
Information about tool calls within the flow:
```python Code
for chunk in streaming:
if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
print(f"\nTool: {chunk.tool_call.tool_name}")
print(f"Args: {chunk.tool_call.arguments}")
```
## Error Handling
Handle errors gracefully during streaming:
```python Code
flow = ResearchFlow()
streaming = flow.kickoff()
try:
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\nSuccess! Result: {result}")
except Exception as e:
print(f"\nError during flow execution: {e}")
if streaming.is_completed:
print("Streaming completed but flow encountered an error")
```
## Important Notes
- Streaming automatically enables LLM streaming for any crews used within the flow
- You must iterate through all chunks before accessing the `.result` property
- Streaming works with both structured and unstructured flow state
- Flow streaming captures output from all crews and LLM calls in the flow
- Each chunk includes context about which agent and task generated it
- Streaming adds minimal overhead to flow execution
## Combining with Flow Visualization
You can combine streaming with flow visualization to provide a complete picture:
```python Code
# Generate flow visualization
flow = ResearchFlow()
flow.plot("research_flow") # Creates HTML visualization
# Run with streaming
streaming = flow.kickoff()
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")
```
By leveraging flow streaming, you can build sophisticated, responsive applications that provide users with real-time visibility into complex multi-stage workflows, making your AI automations more transparent and engaging.

View File

@@ -74,6 +74,7 @@ from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE
from crewai.utilities.crew.models import CrewContext
@@ -90,6 +91,14 @@ from crewai.utilities.logger import Logger
from crewai.utilities.planning_handler import CrewPlanner
from crewai.utilities.printer import PrinterColor
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.streaming import (
TaskInfo,
create_async_chunk_generator,
create_chunk_generator,
create_streaming_state,
signal_end,
signal_error,
)
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -225,6 +234,10 @@ class Crew(FlowTrackable, BaseModel):
"It may be used to adjust the output of the crew."
),
)
stream: bool = Field(
default=False,
description="Whether to stream output from the crew execution.",
)
max_rpm: int | None = Field(
default=None,
description=(
@@ -660,7 +673,43 @@ class Crew(FlowTrackable, BaseModel):
def kickoff(
self,
inputs: dict[str, Any] | None = None,
) -> CrewOutput:
) -> CrewOutput | CrewStreamingOutput:
if self.stream:
for agent in self.agents:
if agent.llm is not None:
agent.llm.stream = True
result_holder: list[CrewOutput] = []
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}
state = create_streaming_state(current_task_info, result_holder)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
def run_crew() -> None:
"""Execute the crew and capture the result."""
try:
self.stream = False
crew_result = self.kickoff(inputs=inputs)
if isinstance(crew_result, CrewOutput):
result_holder.append(crew_result)
except Exception as exc:
signal_error(state, exc)
finally:
self.stream = True
signal_end(state)
streaming_output = CrewStreamingOutput(
sync_iterator=create_chunk_generator(state, run_crew, output_holder)
)
output_holder.append(streaming_output)
return streaming_output
ctx = baggage.set_baggage(
"crew_context", CrewContext(id=str(self.id), key=self.key)
)
@@ -726,11 +775,16 @@ class Crew(FlowTrackable, BaseModel):
finally:
detach(token)
def kickoff_for_each(self, inputs: list[dict[str, Any]]) -> list[CrewOutput]:
"""Executes the Crew's workflow for each input and aggregates results."""
results: list[CrewOutput] = []
def kickoff_for_each(
self, inputs: list[dict[str, Any]]
) -> list[CrewOutput | CrewStreamingOutput]:
"""Executes the Crew's workflow for each input and aggregates results.
If stream=True, returns a list of CrewStreamingOutput objects that must
each be iterated to get stream chunks and access results.
"""
results: list[CrewOutput | CrewStreamingOutput] = []
# Initialize the parent crew's usage metrics
total_usage_metrics = UsageMetrics()
for input_data in inputs:
@@ -738,43 +792,161 @@ class Crew(FlowTrackable, BaseModel):
output = crew.kickoff(inputs=input_data)
if crew.usage_metrics:
if not self.stream and crew.usage_metrics:
total_usage_metrics.add_usage_metrics(crew.usage_metrics)
results.append(output)
self.usage_metrics = total_usage_metrics
if not self.stream:
self.usage_metrics = total_usage_metrics
self._task_output_handler.reset()
return results
async def kickoff_async(self, inputs: dict[str, Any] | None = None) -> CrewOutput:
"""Asynchronous kickoff method to start the crew execution."""
async def kickoff_async(
self, inputs: dict[str, Any] | None = None
) -> CrewOutput | CrewStreamingOutput:
"""Asynchronous kickoff method to start the crew execution.
If stream=True, returns a CrewStreamingOutput that can be async-iterated
to get stream chunks. After iteration completes, access the final result
via .result.
"""
inputs = inputs or {}
if self.stream:
for agent in self.agents:
if agent.llm is not None:
agent.llm.stream = True
result_holder: list[CrewOutput] = []
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}
state = create_streaming_state(
current_task_info, result_holder, use_async=True
)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
async def run_crew() -> None:
try:
self.stream = False
result = await asyncio.to_thread(self.kickoff, inputs)
if isinstance(result, CrewOutput):
result_holder.append(result)
except Exception as e:
signal_error(state, e, is_async=True)
finally:
self.stream = True
signal_end(state, is_async=True)
streaming_output = CrewStreamingOutput(
async_iterator=create_async_chunk_generator(
state, run_crew, output_holder
)
)
output_holder.append(streaming_output)
return streaming_output
return await asyncio.to_thread(self.kickoff, inputs)
async def kickoff_for_each_async(
self, inputs: list[dict[str, Any]]
) -> list[CrewOutput]:
) -> list[CrewOutput | CrewStreamingOutput] | CrewStreamingOutput:
"""Executes the Crew's workflow for each input asynchronously.
If stream=True, returns a single CrewStreamingOutput that yields chunks
from all crews as they arrive. After iteration, access results via .results
(list of CrewOutput).
"""
crew_copies = [self.copy() for _ in inputs]
async def run_crew(crew: Self, input_data: Any) -> CrewOutput:
return await crew.kickoff_async(inputs=input_data)
if self.stream:
result_holder: list[list[CrewOutput]] = [[]]
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}
state = create_streaming_state(
current_task_info, result_holder, use_async=True
)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
async def run_all_crews() -> None:
"""Run all crew copies and aggregate their streaming outputs."""
try:
streaming_outputs: list[CrewStreamingOutput] = []
for i, crew in enumerate(crew_copies):
streaming = await crew.kickoff_async(inputs=inputs[i])
if isinstance(streaming, CrewStreamingOutput):
streaming_outputs.append(streaming)
async def consume_stream(
stream_output: CrewStreamingOutput,
) -> CrewOutput:
"""Consume stream chunks and forward to parent queue.
Args:
stream_output: The streaming output to consume.
Returns:
The final CrewOutput result.
"""
async for chunk in stream_output:
if state.async_queue is not None and state.loop is not None:
state.loop.call_soon_threadsafe(
state.async_queue.put_nowait, chunk
)
return stream_output.result
crew_results = await asyncio.gather(
*[consume_stream(s) for s in streaming_outputs]
)
result_holder[0] = list(crew_results)
except Exception as e:
signal_error(state, e, is_async=True)
finally:
signal_end(state, is_async=True)
streaming_output = CrewStreamingOutput(
async_iterator=create_async_chunk_generator(
state, run_all_crews, output_holder
)
)
def set_results_wrapper(result: Any) -> None:
"""Wrap _set_results to match _set_result signature."""
streaming_output._set_results(result)
streaming_output._set_result = set_results_wrapper # type: ignore[method-assign]
output_holder.append(streaming_output)
return streaming_output
tasks = [
asyncio.create_task(run_crew(crew_copies[i], inputs[i]))
for i in range(len(inputs))
asyncio.create_task(crew_copy.kickoff_async(inputs=input_data))
for crew_copy, input_data in zip(crew_copies, inputs, strict=True)
]
results = await asyncio.gather(*tasks)
total_usage_metrics = UsageMetrics()
for crew in crew_copies:
if crew.usage_metrics:
total_usage_metrics.add_usage_metrics(crew.usage_metrics)
for crew_copy in crew_copies:
if crew_copy.usage_metrics:
total_usage_metrics.add_usage_metrics(crew_copy.usage_metrics)
self.usage_metrics = total_usage_metrics
self._task_output_handler.reset()
return results
return list(results)
def _handle_crew_planning(self) -> None:
"""Handles the Crew planning."""

View File

@@ -409,6 +409,8 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(_: Any, event: LLMCallStartedEvent) -> None:
self.text_stream = StringIO()
self.next_chunk = 0
# Capture the returned tool branch and update the current_tool_branch reference
thinking_branch = self.formatter.handle_llm_call_started(
self.formatter.current_agent_branch,
@@ -420,6 +422,7 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(_: Any, event: LLMCallCompletedEvent) -> None:
self.formatter.handle_llm_stream_completed()
self.formatter.handle_llm_call_completed(
self.formatter.current_tool_branch,
self.formatter.current_agent_branch,
@@ -428,6 +431,7 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(_: Any, event: LLMCallFailedEvent) -> None:
self.formatter.handle_llm_stream_completed()
self.formatter.handle_llm_call_failed(
self.formatter.current_tool_branch,
event.error,
@@ -441,6 +445,14 @@ class EventListener(BaseEventListener):
self.text_stream.read()
self.next_chunk = self.text_stream.tell()
accumulated_text = self.text_stream.getvalue()
self.formatter.handle_llm_stream_chunk(
event.chunk,
accumulated_text,
self.formatter.current_crew_tree,
event.call_type,
)
# ----------- LLM GUARDRAIL EVENTS -----------
@crewai_event_bus.on(LLMGuardrailStartedEvent)

View File

@@ -10,7 +10,7 @@ class LLMEventBase(BaseEvent):
from_task: Any | None = None
from_agent: Any | None = None
def __init__(self, **data):
def __init__(self, **data: Any) -> None:
if data.get("from_task"):
task = data["from_task"]
data["task_id"] = str(task.id)
@@ -84,3 +84,4 @@ class LLMStreamChunkEvent(LLMEventBase):
type: str = "llm_stream_chunk"
chunk: str
tool_call: ToolCall | None = None
call_type: LLMCallType | None = None

View File

@@ -21,7 +21,7 @@ class ConsoleFormatter:
current_reasoning_branch: Tree | None = None
_live_paused: bool = False
current_llm_tool_tree: Tree | None = None
current_a2a_conversation_branch: Tree | None = None
current_a2a_conversation_branch: Tree | str | None = None
current_a2a_turn_count: int = 0
_pending_a2a_message: str | None = None
_pending_a2a_agent_role: str | None = None
@@ -39,6 +39,10 @@ class ConsoleFormatter:
# Once any non-Tree renderable is printed we stop the Live session so the
# final Tree persists on the terminal.
self._live: Live | None = None
self._streaming_live: Live | None = None
self._is_streaming: bool = False
self._just_streamed_final_answer: bool = False
self._last_stream_call_type: Any = None
def create_panel(self, content: Text, title: str, style: str = "blue") -> Panel:
"""Create a standardized panel with consistent styling."""
@@ -146,6 +150,9 @@ To enable tracing, do any one of these:
if len(args) == 1 and isinstance(args[0], Tree):
tree = args[0]
if self._is_streaming:
return
if not self._live:
# Start a new Live session for the first tree
self._live = Live(tree, console=self.console, refresh_per_second=4)
@@ -554,7 +561,7 @@ To enable tracing, do any one of these:
self,
tool_name: str,
tool_args: dict[str, Any] | str,
) -> None:
) -> Tree:
# Create status content for the tool usage
content = self.create_status_content(
"Tool Usage Started", tool_name, Status="In Progress", tool_args=tool_args
@@ -762,11 +769,14 @@ To enable tracing, do any one of these:
thinking_branch_to_remove = None
removed = False
# Method 1: Use the provided tool_branch if it's a thinking node
if tool_branch is not None and "Thinking" in str(tool_branch.label):
# Method 1: Use the provided tool_branch if it's a thinking/streaming node
if tool_branch is not None and (
"Thinking" in str(tool_branch.label)
or "Streaming" in str(tool_branch.label)
):
thinking_branch_to_remove = tool_branch
# Method 2: Fallback - search for any thinking node if tool_branch is None or not thinking
# Method 2: Fallback - search for any thinking/streaming node if tool_branch is None or not found
if thinking_branch_to_remove is None:
parents = [
self.current_lite_agent_branch,
@@ -777,7 +787,8 @@ To enable tracing, do any one of these:
for parent in parents:
if isinstance(parent, Tree):
for child in parent.children:
if "Thinking" in str(child.label):
label_str = str(child.label)
if "Thinking" in label_str or "Streaming" in label_str:
thinking_branch_to_remove = child
break
if thinking_branch_to_remove:
@@ -821,11 +832,13 @@ To enable tracing, do any one of these:
# Find the thinking branch to update (similar to completion logic)
thinking_branch_to_update = None
# Method 1: Use the provided tool_branch if it's a thinking node
if tool_branch is not None and "Thinking" in str(tool_branch.label):
if tool_branch is not None and (
"Thinking" in str(tool_branch.label)
or "Streaming" in str(tool_branch.label)
):
thinking_branch_to_update = tool_branch
# Method 2: Fallback - search for any thinking node if tool_branch is None or not thinking
# Method 2: Fallback - search for any thinking/streaming node if tool_branch is None or not found
if thinking_branch_to_update is None:
parents = [
self.current_lite_agent_branch,
@@ -836,7 +849,8 @@ To enable tracing, do any one of these:
for parent in parents:
if isinstance(parent, Tree):
for child in parent.children:
if "Thinking" in str(child.label):
label_str = str(child.label)
if "Thinking" in label_str or "Streaming" in label_str:
thinking_branch_to_update = child
break
if thinking_branch_to_update:
@@ -860,6 +874,83 @@ To enable tracing, do any one of these:
self.print_panel(error_content, "LLM Error", "red")
def handle_llm_stream_chunk(
self,
chunk: str,
accumulated_text: str,
crew_tree: Tree | None,
call_type: Any = None,
) -> None:
"""Handle LLM stream chunk event - display streaming text in a panel.
Args:
chunk: The new chunk of text received.
accumulated_text: All text accumulated so far.
crew_tree: The current crew tree for rendering.
call_type: The type of LLM call (LLM_CALL or TOOL_CALL).
"""
if not self.verbose:
return
self._is_streaming = True
self._last_stream_call_type = call_type
if self._live:
self._live.stop()
self._live = None
display_text = accumulated_text
max_lines = 20
lines = display_text.split("\n")
if len(lines) > max_lines:
display_text = "\n".join(lines[-max_lines:])
display_text = "...\n" + display_text
content = Text()
from crewai.events.types.llm_events import LLMCallType
if call_type == LLMCallType.TOOL_CALL:
content.append(display_text, style="yellow")
title = "🔧 Tool Arguments"
border_style = "yellow"
else:
content.append(display_text, style="bright_green")
title = "✅ Agent Final Answer"
border_style = "green"
streaming_panel = Panel(
content,
title=title,
border_style=border_style,
padding=(1, 2),
)
if not self._streaming_live:
self._streaming_live = Live(
streaming_panel, console=self.console, refresh_per_second=10
)
self._streaming_live.start()
else:
self._streaming_live.update(streaming_panel, refresh=True)
def handle_llm_stream_completed(self) -> None:
"""Handle completion of LLM streaming - stop the streaming live display."""
self._is_streaming = False
from crewai.events.types.llm_events import LLMCallType
if self._last_stream_call_type == LLMCallType.LLM_CALL:
self._just_streamed_final_answer = True
else:
self._just_streamed_final_answer = False
self._last_stream_call_type = None
if self._streaming_live:
self._streaming_live.stop()
self._streaming_live = None
def handle_crew_test_started(
self, crew_name: str, source_id: str, n_iterations: int
) -> Tree | None:
@@ -1528,6 +1619,10 @@ To enable tracing, do any one of these:
self.print()
elif isinstance(formatted_answer, AgentFinish):
if self._just_streamed_final_answer:
self._just_streamed_final_answer = False
return
is_a2a_delegation = False
try:
output_data = json.loads(formatted_answer.output)
@@ -1866,7 +1961,7 @@ To enable tracing, do any one of these:
agent_id: str,
is_multiturn: bool = False,
turn_number: int = 1,
) -> None:
) -> Tree | None:
"""Handle A2A delegation started event.
Args:
@@ -1979,7 +2074,7 @@ To enable tracing, do any one of these:
if status == "input_required" and error:
pass
elif status == "completed":
if has_tree:
if has_tree and isinstance(self.current_a2a_conversation_branch, Tree):
final_turn = self.current_a2a_conversation_branch.add("")
self.update_tree_label(
final_turn,
@@ -1995,7 +2090,7 @@ To enable tracing, do any one of these:
self.current_a2a_conversation_branch = None
self.current_a2a_turn_count = 0
elif status == "failed":
if has_tree:
if has_tree and isinstance(self.current_a2a_conversation_branch, Tree):
error_turn = self.current_a2a_conversation_branch.add("")
error_msg = (
error[:150] + "..." if error and len(error) > 150 else error

View File

@@ -70,7 +70,16 @@ from crewai.flow.utils import (
is_simple_flow_condition,
)
from crewai.flow.visualization import build_flow_structure, render_interactive
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.utilities.printer import Printer, PrinterColor
from crewai.utilities.streaming import (
TaskInfo,
create_async_chunk_generator,
create_chunk_generator,
create_streaming_state,
signal_end,
signal_error,
)
logger = logging.getLogger(__name__)
@@ -456,6 +465,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
initial_state: type[T] | T | None = None
name: str | None = None
tracing: bool | None = None
stream: bool = False
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]:
class _FlowGeneric(cls): # type: ignore
@@ -822,20 +832,56 @@ class Flow(Generic[T], metaclass=FlowMeta):
if hasattr(self._state, key):
object.__setattr__(self._state, key, value)
def kickoff(self, inputs: dict[str, Any] | None = None) -> Any:
def kickoff(
self, inputs: dict[str, Any] | None = None
) -> Any | FlowStreamingOutput:
"""
Start the flow execution in a synchronous context.
This method wraps kickoff_async so that all state initialization and event
emission is handled in the asynchronous method.
"""
if self.stream:
result_holder: list[Any] = []
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}
state = create_streaming_state(
current_task_info, result_holder, use_async=False
)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
def run_flow() -> None:
try:
self.stream = False
result = self.kickoff(inputs=inputs)
result_holder.append(result)
except Exception as e:
signal_error(state, e)
finally:
self.stream = True
signal_end(state)
streaming_output = FlowStreamingOutput(
sync_iterator=create_chunk_generator(state, run_flow, output_holder)
)
output_holder.append(streaming_output)
return streaming_output
async def _run_flow() -> Any:
return await self.kickoff_async(inputs)
return asyncio.run(_run_flow())
async def kickoff_async(self, inputs: dict[str, Any] | None = None) -> Any:
async def kickoff_async(
self, inputs: dict[str, Any] | None = None
) -> Any | FlowStreamingOutput:
"""
Start the flow execution asynchronously.
@@ -850,6 +896,41 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns:
The final output from the flow, which is the result of the last executed method.
"""
if self.stream:
result_holder: list[Any] = []
current_task_info: TaskInfo = {
"index": 0,
"name": "",
"id": "",
"agent_role": "",
"agent_id": "",
}
state = create_streaming_state(
current_task_info, result_holder, use_async=True
)
output_holder: list[CrewStreamingOutput | FlowStreamingOutput] = []
async def run_flow() -> None:
try:
self.stream = False
result = await self.kickoff_async(inputs=inputs)
result_holder.append(result)
except Exception as e:
signal_error(state, e, is_async=True)
finally:
self.stream = True
signal_end(state, is_async=True)
streaming_output = FlowStreamingOutput(
async_iterator=create_async_chunk_generator(
state, run_flow, output_holder
)
)
output_holder.append(streaming_output)
return streaming_output
ctx = baggage.set_baggage("flow_inputs", inputs or {})
flow_token = attach(ctx)

View File

@@ -386,9 +386,10 @@ class LLM(BaseLLM):
if native_class and not is_litellm and provider in SUPPORTED_NATIVE_PROVIDERS:
try:
# Remove 'provider' from kwargs if it exists to avoid duplicate keyword argument
kwargs_copy = {k: v for k, v in kwargs.items() if k != 'provider'}
kwargs_copy = {k: v for k, v in kwargs.items() if k != "provider"}
return cast(
Self, native_class(model=model_string, provider=provider, **kwargs_copy)
Self,
native_class(model=model_string, provider=provider, **kwargs_copy),
)
except NotImplementedError:
raise
@@ -757,6 +758,7 @@ class LLM(BaseLLM):
chunk=chunk_content,
from_task=from_task,
from_agent=from_agent,
call_type=LLMCallType.LLM_CALL,
),
)
# --- 4) Fallback to non-streaming if no content received
@@ -958,6 +960,7 @@ class LLM(BaseLLM):
chunk=tool_call.function.arguments,
from_task=from_task,
from_agent=from_agent,
call_type=LLMCallType.TOOL_CALL,
),
)

View File

@@ -0,0 +1,361 @@
"""Streaming output types for crew and flow execution."""
from __future__ import annotations
from collections.abc import AsyncIterator, Iterator
from enum import Enum
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from pydantic import BaseModel, Field
if TYPE_CHECKING:
from crewai.crews.crew_output import CrewOutput
T = TypeVar("T")
class StreamChunkType(Enum):
"""Type of streaming chunk."""
TEXT = "text"
TOOL_CALL = "tool_call"
class ToolCallChunk(BaseModel):
"""Tool call information in a streaming chunk.
Attributes:
tool_id: Unique identifier for the tool call
tool_name: Name of the tool being called
arguments: JSON string of tool arguments
index: Index of the tool call in the response
"""
tool_id: str | None = None
tool_name: str | None = None
arguments: str = ""
index: int = 0
class StreamChunk(BaseModel):
"""Base streaming chunk with full context.
Attributes:
content: The streaming content (text or partial content)
chunk_type: Type of the chunk (text, tool_call, etc.)
task_index: Index of the current task (0-based)
task_name: Name or description of the current task
task_id: Unique identifier of the task
agent_role: Role of the agent executing the task
agent_id: Unique identifier of the agent
tool_call: Tool call information if chunk_type is TOOL_CALL
"""
content: str = Field(description="The streaming content")
chunk_type: StreamChunkType = Field(
default=StreamChunkType.TEXT, description="Type of the chunk"
)
task_index: int = Field(default=0, description="Index of the current task")
task_name: str = Field(default="", description="Name of the current task")
task_id: str = Field(default="", description="Unique identifier of the task")
agent_role: str = Field(default="", description="Role of the agent")
agent_id: str = Field(default="", description="Unique identifier of the agent")
tool_call: ToolCallChunk | None = Field(
default=None, description="Tool call information"
)
def __str__(self) -> str:
"""Return the chunk content as a string."""
return self.content
class StreamingOutputBase(Generic[T]):
"""Base class for streaming output with result access.
Provides iteration over stream chunks and access to final result
via the .result property after streaming completes.
"""
def __init__(self) -> None:
"""Initialize streaming output base."""
self._result: T | None = None
self._completed: bool = False
self._chunks: list[StreamChunk] = []
self._error: Exception | None = None
@property
def result(self) -> T:
"""Get the final result after streaming completes.
Returns:
The final output (CrewOutput for crews, Any for flows).
Raises:
RuntimeError: If streaming has not completed yet.
Exception: If streaming failed with an error.
"""
if not self._completed:
raise RuntimeError(
"Streaming has not completed yet. "
"Iterate over all chunks before accessing result."
)
if self._error is not None:
raise self._error
if self._result is None:
raise RuntimeError("No result available")
return self._result
@property
def is_completed(self) -> bool:
"""Check if streaming has completed."""
return self._completed
@property
def chunks(self) -> list[StreamChunk]:
"""Get all collected chunks so far."""
return self._chunks.copy()
def get_full_text(self) -> str:
"""Get all streamed text content concatenated.
Returns:
All text chunks concatenated together.
"""
return "".join(
chunk.content
for chunk in self._chunks
if chunk.chunk_type == StreamChunkType.TEXT
)
class CrewStreamingOutput(StreamingOutputBase["CrewOutput"]):
"""Streaming output wrapper for crew execution.
Provides both sync and async iteration over stream chunks,
with access to the final CrewOutput via the .result property.
For kickoff_for_each_async with streaming, use .results to get list of outputs.
Example:
```python
# Single crew
streaming = crew.kickoff(inputs={"topic": "AI"})
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
# Multiple crews (kickoff_for_each_async)
streaming = await crew.kickoff_for_each_async(
[{"topic": "AI"}, {"topic": "ML"}]
)
async for chunk in streaming:
print(chunk.content, end="", flush=True)
results = streaming.results # List of CrewOutput
```
"""
def __init__(
self,
sync_iterator: Iterator[StreamChunk] | None = None,
async_iterator: AsyncIterator[StreamChunk] | None = None,
) -> None:
"""Initialize crew streaming output.
Args:
sync_iterator: Synchronous iterator for chunks.
async_iterator: Asynchronous iterator for chunks.
"""
super().__init__()
self._sync_iterator = sync_iterator
self._async_iterator = async_iterator
self._results: list[CrewOutput] | None = None
@property
def results(self) -> list[CrewOutput]:
"""Get all results for kickoff_for_each_async.
Returns:
List of CrewOutput from all crews.
Raises:
RuntimeError: If streaming has not completed or results not available.
"""
if not self._completed:
raise RuntimeError(
"Streaming has not completed yet. "
"Iterate over all chunks before accessing results."
)
if self._error is not None:
raise self._error
if self._results is not None:
return self._results
if self._result is not None:
return [self._result]
raise RuntimeError("No results available")
def _set_results(self, results: list[CrewOutput]) -> None:
"""Set multiple results for kickoff_for_each_async.
Args:
results: List of CrewOutput from all crews.
"""
self._results = results
self._completed = True
def __iter__(self) -> Iterator[StreamChunk]:
"""Iterate over stream chunks synchronously.
Yields:
StreamChunk objects as they arrive.
Raises:
RuntimeError: If sync iterator not available.
"""
if self._sync_iterator is None:
raise RuntimeError("Sync iterator not available")
try:
for chunk in self._sync_iterator:
self._chunks.append(chunk)
yield chunk
except Exception as e:
self._error = e
raise
finally:
self._completed = True
def __aiter__(self) -> AsyncIterator[StreamChunk]:
"""Return async iterator for stream chunks.
Returns:
Async iterator for StreamChunk objects.
"""
return self._async_iterate()
async def _async_iterate(self) -> AsyncIterator[StreamChunk]:
"""Iterate over stream chunks asynchronously.
Yields:
StreamChunk objects as they arrive.
Raises:
RuntimeError: If async iterator not available.
"""
if self._async_iterator is None:
raise RuntimeError("Async iterator not available")
try:
async for chunk in self._async_iterator:
self._chunks.append(chunk)
yield chunk
except Exception as e:
self._error = e
raise
finally:
self._completed = True
def _set_result(self, result: CrewOutput) -> None:
"""Set the final result after streaming completes.
Args:
result: The final CrewOutput.
"""
self._result = result
self._completed = True
class FlowStreamingOutput(StreamingOutputBase[Any]):
"""Streaming output wrapper for flow execution.
Provides both sync and async iteration over stream chunks,
with access to the final flow output via the .result property.
Example:
```python
# Sync usage
streaming = flow.kickoff_streaming()
for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
# Async usage
streaming = await flow.kickoff_streaming_async()
async for chunk in streaming:
print(chunk.content, end="", flush=True)
result = streaming.result
```
"""
def __init__(
self,
sync_iterator: Iterator[StreamChunk] | None = None,
async_iterator: AsyncIterator[StreamChunk] | None = None,
) -> None:
"""Initialize flow streaming output.
Args:
sync_iterator: Synchronous iterator for chunks.
async_iterator: Asynchronous iterator for chunks.
"""
super().__init__()
self._sync_iterator = sync_iterator
self._async_iterator = async_iterator
def __iter__(self) -> Iterator[StreamChunk]:
"""Iterate over stream chunks synchronously.
Yields:
StreamChunk objects as they arrive.
Raises:
RuntimeError: If sync iterator not available.
"""
if self._sync_iterator is None:
raise RuntimeError("Sync iterator not available")
try:
for chunk in self._sync_iterator:
self._chunks.append(chunk)
yield chunk
except Exception as e:
self._error = e
raise
finally:
self._completed = True
def __aiter__(self) -> AsyncIterator[StreamChunk]:
"""Return async iterator for stream chunks.
Returns:
Async iterator for StreamChunk objects.
"""
return self._async_iterate()
async def _async_iterate(self) -> AsyncIterator[StreamChunk]:
"""Iterate over stream chunks asynchronously.
Yields:
StreamChunk objects as they arrive.
Raises:
RuntimeError: If async iterator not available.
"""
if self._async_iterator is None:
raise RuntimeError("Async iterator not available")
try:
async for chunk in self._async_iterator:
self._chunks.append(chunk)
yield chunk
except Exception as e:
self._error = e
raise
finally:
self._completed = True
def _set_result(self, result: Any) -> None:
"""Set the final result after streaming completes.
Args:
result: The final flow output.
"""
self._result = result
self._completed = True

View File

@@ -0,0 +1,296 @@
"""Streaming utilities for crew and flow execution."""
import asyncio
from collections.abc import AsyncIterator, Callable, Iterator
import queue
import threading
from typing import Any, NamedTuple
from typing_extensions import TypedDict
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMStreamChunkEvent
from crewai.types.streaming import (
CrewStreamingOutput,
FlowStreamingOutput,
StreamChunk,
StreamChunkType,
ToolCallChunk,
)
class TaskInfo(TypedDict):
"""Task context information for streaming."""
index: int
name: str
id: str
agent_role: str
agent_id: str
class StreamingState(NamedTuple):
"""Immutable state for streaming execution."""
current_task_info: TaskInfo
result_holder: list[Any]
sync_queue: queue.Queue[StreamChunk | None | Exception]
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None
loop: asyncio.AbstractEventLoop | None
handler: Callable[[Any, BaseEvent], None]
def _extract_tool_call_info(
event: LLMStreamChunkEvent,
) -> tuple[StreamChunkType, ToolCallChunk | None]:
"""Extract tool call information from an LLM stream chunk event.
Args:
event: The LLM stream chunk event to process.
Returns:
A tuple of (chunk_type, tool_call_chunk) where tool_call_chunk is None
if the event is not a tool call.
"""
if event.tool_call:
return (
StreamChunkType.TOOL_CALL,
ToolCallChunk(
tool_id=event.tool_call.id,
tool_name=event.tool_call.function.name,
arguments=event.tool_call.function.arguments,
index=event.tool_call.index,
),
)
return StreamChunkType.TEXT, None
def _create_stream_chunk(
event: LLMStreamChunkEvent,
current_task_info: TaskInfo,
) -> StreamChunk:
"""Create a StreamChunk from an LLM stream chunk event.
Args:
event: The LLM stream chunk event to process.
current_task_info: Task context info.
Returns:
A StreamChunk populated with event and task info.
"""
chunk_type, tool_call_chunk = _extract_tool_call_info(event)
return StreamChunk(
content=event.chunk,
chunk_type=chunk_type,
task_index=current_task_info["index"],
task_name=current_task_info["name"],
task_id=current_task_info["id"],
agent_role=event.agent_role or current_task_info["agent_role"],
agent_id=event.agent_id or current_task_info["agent_id"],
tool_call=tool_call_chunk,
)
def _create_stream_handler(
current_task_info: TaskInfo,
sync_queue: queue.Queue[StreamChunk | None | Exception],
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None,
loop: asyncio.AbstractEventLoop | None = None,
) -> Callable[[Any, BaseEvent], None]:
"""Create a stream handler function.
Args:
current_task_info: Task context info.
sync_queue: Synchronous queue for chunks.
async_queue: Optional async queue for chunks.
loop: Optional event loop for async operations.
Returns:
Handler function that can be registered with the event bus.
"""
def stream_handler(_: Any, event: BaseEvent) -> None:
"""Handle LLM stream chunk events and enqueue them.
Args:
_: Event source (unused).
event: The event to process.
"""
if not isinstance(event, LLMStreamChunkEvent):
return
chunk = _create_stream_chunk(event, current_task_info)
if async_queue is not None and loop is not None:
loop.call_soon_threadsafe(async_queue.put_nowait, chunk)
else:
sync_queue.put(chunk)
return stream_handler
def _unregister_handler(handler: Callable[[Any, BaseEvent], None]) -> None:
"""Unregister a stream handler from the event bus.
Args:
handler: The handler function to unregister.
"""
with crewai_event_bus._rwlock.w_locked():
handlers: frozenset[Callable[[Any, BaseEvent], None]] = (
crewai_event_bus._sync_handlers.get(LLMStreamChunkEvent, frozenset())
)
crewai_event_bus._sync_handlers[LLMStreamChunkEvent] = handlers - {handler}
def _finalize_streaming(
state: StreamingState,
streaming_output: CrewStreamingOutput | FlowStreamingOutput,
) -> None:
"""Finalize streaming by unregistering handler and setting result.
Args:
state: The streaming state to finalize.
streaming_output: The streaming output to set the result on.
"""
_unregister_handler(state.handler)
if state.result_holder:
streaming_output._set_result(state.result_holder[0])
def create_streaming_state(
current_task_info: TaskInfo,
result_holder: list[Any],
use_async: bool = False,
) -> StreamingState:
"""Create and register streaming state.
Args:
current_task_info: Task context info.
result_holder: List to hold the final result.
use_async: Whether to use async queue.
Returns:
Initialized StreamingState with registered handler.
"""
sync_queue: queue.Queue[StreamChunk | None | Exception] = queue.Queue()
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None
loop: asyncio.AbstractEventLoop | None = None
if use_async:
async_queue = asyncio.Queue()
loop = asyncio.get_event_loop()
handler = _create_stream_handler(current_task_info, sync_queue, async_queue, loop)
crewai_event_bus.register_handler(LLMStreamChunkEvent, handler)
return StreamingState(
current_task_info=current_task_info,
result_holder=result_holder,
sync_queue=sync_queue,
async_queue=async_queue,
loop=loop,
handler=handler,
)
def signal_end(state: StreamingState, is_async: bool = False) -> None:
"""Signal end of stream.
Args:
state: The streaming state.
is_async: Whether this is an async stream.
"""
if is_async and state.async_queue is not None and state.loop is not None:
state.loop.call_soon_threadsafe(state.async_queue.put_nowait, None)
else:
state.sync_queue.put(None)
def signal_error(
state: StreamingState, error: Exception, is_async: bool = False
) -> None:
"""Signal an error in the stream.
Args:
state: The streaming state.
error: The exception to signal.
is_async: Whether this is an async stream.
"""
if is_async and state.async_queue is not None and state.loop is not None:
state.loop.call_soon_threadsafe(state.async_queue.put_nowait, error)
else:
state.sync_queue.put(error)
def create_chunk_generator(
state: StreamingState,
run_func: Callable[[], None],
output_holder: list[CrewStreamingOutput | FlowStreamingOutput],
) -> Iterator[StreamChunk]:
"""Create a chunk generator that uses a holder to access streaming output.
Args:
state: The streaming state.
run_func: Function to run in a separate thread.
output_holder: Single-element list that will contain the streaming output.
Yields:
StreamChunk objects as they arrive.
"""
thread = threading.Thread(target=run_func, daemon=True)
thread.start()
try:
while True:
item = state.sync_queue.get()
if item is None:
break
if isinstance(item, Exception):
raise item
yield item
finally:
thread.join()
if output_holder:
_finalize_streaming(state, output_holder[0])
else:
_unregister_handler(state.handler)
async def create_async_chunk_generator(
state: StreamingState,
run_coro: Callable[[], Any],
output_holder: list[CrewStreamingOutput | FlowStreamingOutput],
) -> AsyncIterator[StreamChunk]:
"""Create an async chunk generator that uses a holder to access streaming output.
Args:
state: The streaming state.
run_coro: Coroutine function to run as a task.
output_holder: Single-element list that will contain the streaming output.
Yields:
StreamChunk objects as they arrive.
"""
if state.async_queue is None:
raise RuntimeError(
"Async queue not initialized. Use create_streaming_state(use_async=True)."
)
task = asyncio.create_task(run_coro())
try:
while True:
item = await state.async_queue.get()
if item is None:
break
if isinstance(item, Exception):
raise item
yield item
finally:
await task
if output_holder:
_finalize_streaming(state, output_holder[0])
else:
_unregister_handler(state.handler)

View File

@@ -0,0 +1,717 @@
"""Tests for streaming output functionality in crews and flows."""
import asyncio
from collections.abc import AsyncIterator, Generator
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from crewai import Agent, Crew, Task
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMStreamChunkEvent, ToolCall, FunctionCall
from crewai.flow.flow import Flow, start
from crewai.types.streaming import (
CrewStreamingOutput,
FlowStreamingOutput,
StreamChunk,
StreamChunkType,
ToolCallChunk,
)
@pytest.fixture
def researcher() -> Agent:
"""Create a researcher agent for testing."""
return Agent(
role="Researcher",
goal="Research and analyze topics thoroughly",
backstory="You are an expert researcher with deep analytical skills.",
allow_delegation=False,
)
@pytest.fixture
def simple_task(researcher: Agent) -> Task:
"""Create a simple task for testing."""
return Task(
description="Write a brief analysis of AI trends",
expected_output="A concise analysis of current AI trends",
agent=researcher,
)
@pytest.fixture
def simple_crew(researcher: Agent, simple_task: Task) -> Crew:
"""Create a simple crew with one agent and one task."""
return Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
)
@pytest.fixture
def streaming_crew(researcher: Agent, simple_task: Task) -> Crew:
"""Create a streaming crew with one agent and one task."""
return Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
stream=True,
)
class TestStreamChunk:
"""Tests for StreamChunk model."""
def test_stream_chunk_creation(self) -> None:
"""Test creating a basic stream chunk."""
chunk = StreamChunk(
content="Hello, world!",
chunk_type=StreamChunkType.TEXT,
task_index=0,
task_name="Test Task",
task_id="task-123",
agent_role="Researcher",
agent_id="agent-456",
)
assert chunk.content == "Hello, world!"
assert chunk.chunk_type == StreamChunkType.TEXT
assert chunk.task_index == 0
assert chunk.task_name == "Test Task"
assert str(chunk) == "Hello, world!"
def test_stream_chunk_with_tool_call(self) -> None:
"""Test creating a stream chunk with tool call information."""
tool_call = ToolCallChunk(
tool_id="call-123",
tool_name="search",
arguments='{"query": "AI trends"}',
index=0,
)
chunk = StreamChunk(
content="",
chunk_type=StreamChunkType.TOOL_CALL,
tool_call=tool_call,
)
assert chunk.chunk_type == StreamChunkType.TOOL_CALL
assert chunk.tool_call is not None
assert chunk.tool_call.tool_name == "search"
class TestCrewStreamingOutput:
"""Tests for CrewStreamingOutput functionality."""
def test_result_before_iteration_raises_error(self) -> None:
"""Test that accessing result before iteration raises error."""
def empty_gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="test")
streaming = CrewStreamingOutput(sync_iterator=empty_gen())
with pytest.raises(RuntimeError, match="Streaming has not completed yet"):
_ = streaming.result
def test_is_completed_property(self) -> None:
"""Test the is_completed property."""
def simple_gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="test")
streaming = CrewStreamingOutput(sync_iterator=simple_gen())
assert streaming.is_completed is False
list(streaming)
assert streaming.is_completed is True
def test_get_full_text(self) -> None:
"""Test getting full text from chunks."""
def gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="Hello ")
yield StreamChunk(content="World!")
yield StreamChunk(content="", chunk_type=StreamChunkType.TOOL_CALL)
streaming = CrewStreamingOutput(sync_iterator=gen())
list(streaming)
assert streaming.get_full_text() == "Hello World!"
def test_chunks_property(self) -> None:
"""Test accessing collected chunks."""
def gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="chunk1")
yield StreamChunk(content="chunk2")
streaming = CrewStreamingOutput(sync_iterator=gen())
list(streaming)
assert len(streaming.chunks) == 2
assert streaming.chunks[0].content == "chunk1"
class TestFlowStreamingOutput:
"""Tests for FlowStreamingOutput functionality."""
def test_result_before_iteration_raises_error(self) -> None:
"""Test that accessing result before iteration raises error."""
def empty_gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="test")
streaming = FlowStreamingOutput(sync_iterator=empty_gen())
with pytest.raises(RuntimeError, match="Streaming has not completed yet"):
_ = streaming.result
def test_is_completed_property(self) -> None:
"""Test the is_completed property."""
def simple_gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="test")
streaming = FlowStreamingOutput(sync_iterator=simple_gen())
assert streaming.is_completed is False
list(streaming)
assert streaming.is_completed is True
class TestCrewKickoffStreaming:
"""Tests for Crew(stream=True).kickoff() method."""
def test_kickoff_streaming_returns_streaming_output(self, streaming_crew: Crew) -> None:
"""Test that kickoff with stream=True returns CrewStreamingOutput."""
with patch.object(Crew, "kickoff") as mock_kickoff:
mock_output = MagicMock()
mock_output.raw = "Test output"
def side_effect(*args: Any, **kwargs: Any) -> Any:
return mock_output
mock_kickoff.side_effect = side_effect
streaming = streaming_crew.kickoff()
assert isinstance(streaming, CrewStreamingOutput)
def test_kickoff_streaming_captures_chunks(self, researcher: Agent, simple_task: Task) -> None:
"""Test that streaming captures LLM chunks."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
stream=True,
)
mock_output = MagicMock()
mock_output.raw = "Test output"
original_kickoff = Crew.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
crewai_event_bus.emit(
crew,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Hello ",
),
)
crewai_event_bus.emit(
crew,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="World!",
),
)
return mock_output
with patch.object(Crew, "kickoff", mock_kickoff_fn):
streaming = crew.kickoff()
assert isinstance(streaming, CrewStreamingOutput)
chunks = list(streaming)
assert len(chunks) >= 2
contents = [c.content for c in chunks]
assert "Hello " in contents
assert "World!" in contents
def test_kickoff_streaming_result_available_after_iteration(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test that result is available after iterating all chunks."""
mock_output = MagicMock()
mock_output.raw = "Final result"
def gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="test chunk")
streaming = CrewStreamingOutput(sync_iterator=gen())
# Iterate all chunks
_ = list(streaming)
# Simulate what _finalize_streaming does
streaming._set_result(mock_output)
result = streaming.result
assert result.raw == "Final result"
def test_kickoff_streaming_handles_tool_calls(self, researcher: Agent, simple_task: Task) -> None:
"""Test that streaming handles tool call chunks correctly."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
stream=True,
)
mock_output = MagicMock()
mock_output.raw = "Test output"
original_kickoff = Crew.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
crewai_event_bus.emit(
crew,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="",
tool_call=ToolCall(
id="call-123",
function=FunctionCall(
name="search",
arguments='{"query": "test"}',
),
type="function",
index=0,
),
),
)
return mock_output
with patch.object(Crew, "kickoff", mock_kickoff_fn):
streaming = crew.kickoff()
assert isinstance(streaming, CrewStreamingOutput)
chunks = list(streaming)
tool_chunks = [c for c in chunks if c.chunk_type == StreamChunkType.TOOL_CALL]
assert len(tool_chunks) >= 1
assert tool_chunks[0].tool_call is not None
assert tool_chunks[0].tool_call.tool_name == "search"
class TestCrewKickoffStreamingAsync:
"""Tests for Crew(stream=True).kickoff_async() method."""
@pytest.mark.asyncio
async def test_kickoff_streaming_async_returns_streaming_output(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test that kickoff_async with stream=True returns CrewStreamingOutput."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
stream=True,
)
mock_output = MagicMock()
mock_output.raw = "Test output"
original_kickoff = Crew.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
return mock_output
with patch.object(Crew, "kickoff", mock_kickoff_fn):
streaming = await crew.kickoff_async()
assert isinstance(streaming, CrewStreamingOutput)
@pytest.mark.asyncio
async def test_kickoff_streaming_async_captures_chunks(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test that async streaming captures LLM chunks."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
stream=True,
)
mock_output = MagicMock()
mock_output.raw = "Test output"
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
crewai_event_bus.emit(
crew,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async ",
),
)
crewai_event_bus.emit(
crew,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Stream!",
),
)
return mock_output
with patch.object(Crew, "kickoff", mock_kickoff_fn):
streaming = await crew.kickoff_async()
assert isinstance(streaming, CrewStreamingOutput)
chunks: list[StreamChunk] = []
async for chunk in streaming:
chunks.append(chunk)
assert len(chunks) >= 2
contents = [c.content for c in chunks]
assert "Async " in contents
assert "Stream!" in contents
@pytest.mark.asyncio
async def test_kickoff_streaming_async_result_available_after_iteration(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test that result is available after async iteration."""
mock_output = MagicMock()
mock_output.raw = "Async result"
async def async_gen() -> AsyncIterator[StreamChunk]:
yield StreamChunk(content="test chunk")
streaming = CrewStreamingOutput(async_iterator=async_gen())
# Iterate all chunks
async for _ in streaming:
pass
# Simulate what _finalize_streaming does
streaming._set_result(mock_output)
result = streaming.result
assert result.raw == "Async result"
class TestFlowKickoffStreaming:
"""Tests for Flow(stream=True).kickoff() method."""
def test_kickoff_streaming_returns_streaming_output(self) -> None:
"""Test that flow kickoff with stream=True returns FlowStreamingOutput."""
class SimpleFlow(Flow[dict[str, Any]]):
@start()
def generate(self) -> str:
return "result"
flow = SimpleFlow()
flow.stream = True
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
def test_flow_kickoff_streaming_captures_chunks(self) -> None:
"""Test that flow streaming captures LLM chunks from crew execution."""
class TestFlow(Flow[dict[str, Any]]):
@start()
def run_crew(self) -> str:
return "done"
flow = TestFlow()
flow.stream = True
original_kickoff = Flow.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
crewai_event_bus.emit(
flow,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Flow ",
),
)
crewai_event_bus.emit(
flow,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="output!",
),
)
return "done"
with patch.object(Flow, "kickoff", mock_kickoff_fn):
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
chunks = list(streaming)
assert len(chunks) >= 2
contents = [c.content for c in chunks]
assert "Flow " in contents
assert "output!" in contents
def test_flow_kickoff_streaming_result_available(self) -> None:
"""Test that flow result is available after iteration."""
class TestFlow(Flow[dict[str, Any]]):
@start()
def generate(self) -> str:
return "flow result"
flow = TestFlow()
flow.stream = True
original_kickoff = Flow.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
return "flow result"
with patch.object(Flow, "kickoff", mock_kickoff_fn):
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
_ = list(streaming)
result = streaming.result
assert result == "flow result"
class TestFlowKickoffStreamingAsync:
"""Tests for Flow(stream=True).kickoff_async() method."""
@pytest.mark.asyncio
async def test_kickoff_streaming_async_returns_streaming_output(self) -> None:
"""Test that flow kickoff_async with stream=True returns FlowStreamingOutput."""
class SimpleFlow(Flow[dict[str, Any]]):
@start()
async def generate(self) -> str:
return "async result"
flow = SimpleFlow()
flow.stream = True
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
@pytest.mark.asyncio
async def test_flow_kickoff_streaming_async_captures_chunks(self) -> None:
"""Test that async flow streaming captures LLM chunks."""
class TestFlow(Flow[dict[str, Any]]):
@start()
async def run_crew(self) -> str:
return "done"
flow = TestFlow()
flow.stream = True
original_kickoff = Flow.kickoff_async
call_count = [0]
async def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return await original_kickoff(self, inputs)
else:
await asyncio.sleep(0.01)
crewai_event_bus.emit(
flow,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async flow ",
),
)
await asyncio.sleep(0.01)
crewai_event_bus.emit(
flow,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="stream!",
),
)
await asyncio.sleep(0.01)
return "done"
with patch.object(Flow, "kickoff_async", mock_kickoff_fn):
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
chunks: list[StreamChunk] = []
async for chunk in streaming:
chunks.append(chunk)
assert len(chunks) >= 2
contents = [c.content for c in chunks]
assert "Async flow " in contents
assert "stream!" in contents
@pytest.mark.asyncio
async def test_flow_kickoff_streaming_async_result_available(self) -> None:
"""Test that async flow result is available after iteration."""
class TestFlow(Flow[dict[str, Any]]):
@start()
async def generate(self) -> str:
return "async flow result"
flow = TestFlow()
flow.stream = True
original_kickoff = Flow.kickoff_async
call_count = [0]
async def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return await original_kickoff(self, inputs)
else:
return "async flow result"
with patch.object(Flow, "kickoff_async", mock_kickoff_fn):
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
async for _ in streaming:
pass
result = streaming.result
assert result == "async flow result"
class TestStreamingEdgeCases:
"""Tests for edge cases in streaming functionality."""
def test_streaming_handles_exceptions(self, researcher: Agent, simple_task: Task) -> None:
"""Test that streaming properly propagates exceptions."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
verbose=False,
stream=True,
)
original_kickoff = Crew.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
raise ValueError("Test error")
with patch.object(Crew, "kickoff", mock_kickoff_fn):
streaming = crew.kickoff()
with pytest.raises(ValueError, match="Test error"):
list(streaming)
def test_streaming_with_empty_content_chunks(self) -> None:
"""Test streaming when LLM chunks have empty content."""
mock_output = MagicMock()
mock_output.raw = "No streaming"
def gen() -> Generator[StreamChunk, None, None]:
yield StreamChunk(content="")
streaming = CrewStreamingOutput(sync_iterator=gen())
chunks = list(streaming)
assert streaming.is_completed
assert len(chunks) == 1
assert chunks[0].content == ""
# Simulate what _finalize_streaming does
streaming._set_result(mock_output)
result = streaming.result
assert result.raw == "No streaming"
def test_streaming_with_multiple_tasks(self, researcher: Agent) -> None:
"""Test streaming with multiple tasks tracks task context."""
task1 = Task(
description="First task",
expected_output="First output",
agent=researcher,
)
task2 = Task(
description="Second task",
expected_output="Second output",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task1, task2],
verbose=False,
stream=True,
)
mock_output = MagicMock()
mock_output.raw = "Multi-task output"
original_kickoff = Crew.kickoff
call_count = [0]
def mock_kickoff_fn(self: Any, inputs: Any = None) -> Any:
call_count[0] += 1
if call_count[0] == 1:
return original_kickoff(self, inputs)
else:
crewai_event_bus.emit(
crew,
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Task 1",
task_name="First task",
),
)
return mock_output
with patch.object(Crew, "kickoff", mock_kickoff_fn):
streaming = crew.kickoff()
assert isinstance(streaming, CrewStreamingOutput)
chunks = list(streaming)
assert len(chunks) >= 1
assert streaming.is_completed
class TestStreamingImports:
"""Tests for correct imports of streaming types."""
def test_streaming_types_importable_from_types_module(self) -> None:
"""Test that streaming types can be imported from crewai.types.streaming."""
from crewai.types.streaming import (
CrewStreamingOutput,
FlowStreamingOutput,
StreamChunk,
StreamChunkType,
ToolCallChunk,
)
assert CrewStreamingOutput is not None
assert FlowStreamingOutput is not None
assert StreamChunk is not None
assert StreamChunkType is not None
assert ToolCallChunk is not None

View File

@@ -0,0 +1,290 @@
"""Integration tests for streaming with real LLM interactions using cassettes."""
import pytest
from crewai import Agent, Crew, Task
from crewai.flow.flow import Flow, start
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
@pytest.fixture
def researcher() -> Agent:
"""Create a researcher agent for testing."""
return Agent(
role="Research Analyst",
goal="Gather comprehensive information on topics",
backstory="You are an experienced researcher with excellent analytical skills.",
allow_delegation=False,
)
@pytest.fixture
def simple_task(researcher: Agent) -> Task:
"""Create a simple research task."""
return Task(
description="Research the latest developments in {topic}",
expected_output="A brief summary of recent developments",
agent=researcher,
)
class TestStreamingCrewIntegration:
"""Integration tests for crew streaming that match documentation examples."""
@pytest.mark.vcr(filter_headers=["authorization"])
def test_basic_crew_streaming_from_docs(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test basic streaming example from documentation."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
stream=True,
verbose=False,
)
streaming = crew.kickoff(inputs={"topic": "artificial intelligence"})
assert isinstance(streaming, CrewStreamingOutput)
chunks = []
for chunk in streaming:
chunks.append(chunk.content)
assert len(chunks) > 0
result = streaming.result
assert result.raw is not None
assert len(result.raw) > 0
@pytest.mark.vcr(filter_headers=["authorization"])
def test_streaming_with_chunk_context_from_docs(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test streaming with chunk context example from documentation."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
stream=True,
verbose=False,
)
streaming = crew.kickoff(inputs={"topic": "AI"})
chunk_contexts = []
for chunk in streaming:
chunk_contexts.append(
{
"task_name": chunk.task_name,
"task_index": chunk.task_index,
"agent_role": chunk.agent_role,
"content": chunk.content,
"type": chunk.chunk_type,
}
)
assert len(chunk_contexts) > 0
assert all("agent_role" in ctx for ctx in chunk_contexts)
result = streaming.result
assert result is not None
@pytest.mark.vcr(filter_headers=["authorization"])
def test_streaming_properties_from_docs(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test streaming properties example from documentation."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
stream=True,
verbose=False,
)
streaming = crew.kickoff(inputs={"topic": "AI"})
for _ in streaming:
pass
assert streaming.is_completed is True
full_text = streaming.get_full_text()
assert len(full_text) > 0
assert len(streaming.chunks) > 0
result = streaming.result
assert result.raw is not None
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.mark.asyncio
async def test_async_streaming_from_docs(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test async streaming example from documentation."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
stream=True,
verbose=False,
)
streaming = await crew.kickoff_async(inputs={"topic": "AI"})
assert isinstance(streaming, CrewStreamingOutput)
chunks = []
async for chunk in streaming:
chunks.append(chunk.content)
assert len(chunks) > 0
result = streaming.result
assert result.raw is not None
@pytest.mark.vcr(filter_headers=["authorization"])
def test_kickoff_for_each_streaming_from_docs(
self, researcher: Agent, simple_task: Task
) -> None:
"""Test kickoff_for_each streaming example from documentation."""
crew = Crew(
agents=[researcher],
tasks=[simple_task],
stream=True,
verbose=False,
)
inputs_list = [{"topic": "AI in healthcare"}, {"topic": "AI in finance"}]
streaming_outputs = crew.kickoff_for_each(inputs=inputs_list)
assert len(streaming_outputs) == 2
assert all(isinstance(s, CrewStreamingOutput) for s in streaming_outputs)
results = []
for streaming in streaming_outputs:
for _ in streaming:
pass
result = streaming.result
results.append(result)
assert len(results) == 2
assert all(r.raw is not None for r in results)
class TestStreamingFlowIntegration:
"""Integration tests for flow streaming that match documentation examples."""
@pytest.mark.vcr(filter_headers=["authorization"])
def test_basic_flow_streaming_from_docs(self) -> None:
"""Test basic flow streaming example from documentation."""
class ResearchFlow(Flow):
stream = True
@start()
def research_topic(self) -> str:
researcher = Agent(
role="Research Analyst",
goal="Research topics thoroughly",
backstory="Expert researcher with analytical skills",
allow_delegation=False,
)
task = Task(
description="Research AI trends and provide insights",
expected_output="Detailed research findings",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
stream=True,
verbose=False,
)
streaming = crew.kickoff()
for _ in streaming:
pass
return streaming.result.raw
flow = ResearchFlow()
streaming = flow.kickoff()
assert isinstance(streaming, FlowStreamingOutput)
chunks = []
for chunk in streaming:
chunks.append(chunk.content)
assert len(chunks) > 0
result = streaming.result
assert result is not None
@pytest.mark.vcr(filter_headers=["authorization"])
def test_flow_streaming_properties_from_docs(self) -> None:
"""Test flow streaming properties example from documentation."""
class SimpleFlow(Flow):
stream = True
@start()
def execute(self) -> str:
return "Flow result"
flow = SimpleFlow()
streaming = flow.kickoff()
for _ in streaming:
pass
assert streaming.is_completed is True
streaming.get_full_text()
assert len(streaming.chunks) >= 0
result = streaming.result
assert result is not None
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.mark.asyncio
async def test_async_flow_streaming_from_docs(self) -> None:
"""Test async flow streaming example from documentation."""
class AsyncResearchFlow(Flow):
stream = True
@start()
def research(self) -> str:
researcher = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
allow_delegation=False,
)
task = Task(
description="Research AI",
expected_output="Research findings",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task], stream=True, verbose=False)
streaming = crew.kickoff()
for _ in streaming:
pass
return streaming.result.raw
flow = AsyncResearchFlow()
streaming = await flow.kickoff_async()
assert isinstance(streaming, FlowStreamingOutput)
chunks = []
async for chunk in streaming:
chunks.append(chunk.content)
result = streaming.result
assert result is not None