--- title: Streaming Flow Execution description: Stream real-time output from your CrewAI flow execution icon: wave-pulse mode: "wide" --- ## Introduction CrewAI Flows support streaming output, allowing you to receive real-time updates as your flow executes. This feature enables you to build responsive applications that display results incrementally, provide live progress updates, and create better user experiences for long-running workflows. ## How Flow Streaming Works When streaming is enabled on a Flow, CrewAI captures and streams output from any crews, LLM calls, tools, and lifecycle events within the flow. The stream delivers ordered `StreamFrame` items with printable content plus structured event data as execution progresses. ## Enabling Streaming To enable streaming, set the `stream` attribute to `True` on your Flow class: ```python Code from crewai.flow.flow import Flow, listen, start from crewai import Agent, Crew, Task class ResearchFlow(Flow): stream = True # Enable streaming for the entire flow @start() def initialize(self): return {"topic": "AI trends"} @listen(initialize) def research_topic(self, data): researcher = Agent( role="Research Analyst", goal="Research topics thoroughly", backstory="Expert researcher with analytical skills", ) task = Task( description="Research {topic} and provide insights", expected_output="Detailed research findings", agent=researcher, ) crew = Crew( agents=[researcher], tasks=[task], ) return crew.kickoff(inputs=data) ``` ## Synchronous Streaming When you call `kickoff()` on a flow with streaming enabled, it returns a stream session that yields ordered `StreamFrame` items: ```python Code flow = ResearchFlow() # Start streaming execution streaming = flow.kickoff() # Iterate over stream items as they arrive for item in streaming: print(item.content, end="", flush=True) # Access the final result after streaming completes result = streaming.result print(f"\n\nFinal output: {result}") ``` ### Stream Item Information Each item provides both printable content and structured event data: ```python Code streaming = flow.kickoff() for item in streaming: print(f"Channel: {item.channel}") print(f"Type: {item.type}") print(f"Content: {item.content}") print(f"Event payload: {item.event}") ``` ### Accessing Streaming Properties The stream session provides useful properties and methods: ```python Code streaming = flow.kickoff() # Iterate and collect items for item in streaming: print(item.content, end="", flush=True) # After iteration completes print(f"\nCompleted: {streaming.is_completed}") print(f"Total frames: {len(streaming.frames)}") print(f"Final result: {streaming.result}") ``` ## Asynchronous Streaming For async applications, use `kickoff_async()` with async iteration: ```python Code import asyncio async def stream_flow(): flow = ResearchFlow() # Start async streaming streaming = await flow.kickoff_async() # Async iteration over stream items async for item in streaming: print(item.content, end="", flush=True) # Access final result result = streaming.result print(f"\n\nFinal output: {result}") asyncio.run(stream_flow()) ``` ## Streaming with Multi-Step Flows Streaming works seamlessly across multiple flow steps, including flows that execute multiple crews: ```python Code from crewai.flow.flow import Flow, listen, start from crewai import Agent, Crew, Task class MultiStepFlow(Flow): stream = True @start() def research_phase(self): """First crew: Research the topic.""" researcher = Agent( role="Research Analyst", goal="Gather comprehensive information", backstory="Expert at finding relevant information", ) task = Task( description="Research AI developments in healthcare", expected_output="Research findings on AI in healthcare", agent=researcher, ) crew = Crew(agents=[researcher], tasks=[task]) result = crew.kickoff() self.state["research"] = result.raw return result.raw @listen(research_phase) def analysis_phase(self, research_data): """Second crew: Analyze the research.""" analyst = Agent( role="Data Analyst", goal="Analyze information and extract insights", backstory="Expert at identifying patterns and trends", ) task = Task( description="Analyze this research: {research}", expected_output="Key insights and trends", agent=analyst, ) crew = Crew(agents=[analyst], tasks=[task]) return crew.kickoff(inputs={"research": research_data}) # Stream across both phases flow = MultiStepFlow() streaming = flow.kickoff() current_step = "" for item in streaming: # Track which flow step is executing step_name = item.event.get("method_name") or item.event.get("task_name") if step_name and step_name != current_step: current_step = step_name print(f"\n\n=== {step_name} ===\n") print(item.content, end="", flush=True) result = streaming.result print(f"\n\nFinal analysis: {result}") ``` ## Practical Example: Progress Dashboard Here's a complete example showing how to build a progress dashboard with streaming: ```python Code import asyncio from crewai.flow.flow import Flow, listen, start from crewai import Agent, Crew, Task class ResearchPipeline(Flow): stream = True @start() def gather_data(self): researcher = Agent( role="Data Gatherer", goal="Collect relevant information", backstory="Skilled at finding quality sources", ) task = Task( description="Gather data on renewable energy trends", expected_output="Collection of relevant data points", agent=researcher, ) crew = Crew(agents=[researcher], tasks=[task]) result = crew.kickoff() self.state["data"] = result.raw return result.raw @listen(gather_data) def analyze_data(self, data): analyst = Agent( role="Data Analyst", goal="Extract meaningful insights", backstory="Expert at data analysis", ) task = Task( description="Analyze: {data}", expected_output="Key insights and trends", agent=analyst, ) crew = Crew(agents=[analyst], tasks=[task]) return crew.kickoff(inputs={"data": data}) async def run_with_dashboard(): flow = ResearchPipeline() print("="*60) print("RESEARCH PIPELINE DASHBOARD") print("="*60) streaming = await flow.kickoff_async() current_agent = "" current_task = "" frame_count = 0 async for item in streaming: frame_count += 1 # Display phase transitions task_name = item.event.get("task_name", "") agent_role = item.event.get("agent_role", "") if task_name and task_name != current_task: current_task = task_name current_agent = agent_role print(f"\n\nšŸ“‹ Phase: {current_task}") print(f"šŸ‘¤ Agent: {current_agent}") print("-" * 60) # Display text output if item.content: print(item.content, end="", flush=True) # Display tool usage elif item.channel == "tools": print(f"\nšŸ”§ Tool event: {item.type}") # Show completion summary result = streaming.result print(f"\n\n{'='*60}") print("PIPELINE COMPLETE") print(f"{'='*60}") print(f"Total frames: {frame_count}") print(f"Final output length: {len(str(result))} characters") asyncio.run(run_with_dashboard()) ``` ## Streaming with State Management Streaming works naturally with Flow state management: ```python Code from pydantic import BaseModel class AnalysisState(BaseModel): topic: str = "" research: str = "" insights: str = "" class StatefulStreamingFlow(Flow[AnalysisState]): stream = True @start() def research(self): # State is available during streaming topic = self.state.topic print(f"Researching: {topic}") researcher = Agent( role="Researcher", goal="Research topics thoroughly", backstory="Expert researcher", ) task = Task( description=f"Research {topic}", expected_output="Research findings", agent=researcher, ) crew = Crew(agents=[researcher], tasks=[task]) result = crew.kickoff() self.state.research = result.raw return result.raw @listen(research) def analyze(self, research): # Access updated state print(f"Analyzing {len(self.state.research)} chars of research") analyst = Agent( role="Analyst", goal="Extract insights", backstory="Expert analyst", ) task = Task( description="Analyze: {research}", expected_output="Key insights", agent=analyst, ) crew = Crew(agents=[analyst], tasks=[task]) result = crew.kickoff(inputs={"research": research}) self.state.insights = result.raw return result.raw # Run with streaming flow = StatefulStreamingFlow() streaming = flow.kickoff(inputs={"topic": "quantum computing"}) for item in streaming: print(item.content, end="", flush=True) result = streaming.result print(f"\n\nFinal state:") print(f"Topic: {flow.state.topic}") print(f"Research length: {len(flow.state.research)}") print(f"Insights length: {len(flow.state.insights)}") ``` ## Use Cases Flow streaming is particularly valuable for: - **Multi-Stage Workflows**: Show progress across research, analysis, and synthesis phases - **Complex Pipelines**: Provide visibility into long-running data processing flows - **Interactive Applications**: Build responsive UIs that display intermediate results - **Monitoring and Debugging**: Observe flow execution and crew interactions in real-time - **Progress Tracking**: Show users which stage of the workflow is currently executing - **Live Dashboards**: Create monitoring interfaces for production flows ## Stream Frame Channels Flow streaming yields `StreamFrame` items across several channels: ### LLM Frames Standard text content from LLM responses: ```python Code for item in streaming: if item.channel == "llm" and item.content: print(item.content, end="", flush=True) ``` ### Tool Frames Information about tool calls within the flow: ```python Code for item in streaming: if item.channel == "tools": print(f"\nTool event: {item.type}") print(f"Payload: {item.event}") ``` ## Error Handling Handle errors gracefully during streaming: ```python Code flow = ResearchFlow() streaming = flow.kickoff() try: for item in streaming: print(item.content, end="", flush=True) result = streaming.result print(f"\nSuccess! Result: {result}") except Exception as e: print(f"\nError during flow execution: {e}") if streaming.is_completed: print("Streaming completed but flow encountered an error") ``` ## Cancellation and Resource Cleanup The stream session supports graceful cancellation so that in-flight work stops promptly when the consumer disconnects. ### Async Context Manager ```python Code streaming = await flow.kickoff_async() async with streaming: async for item in streaming: print(item.content, end="", flush=True) ``` ### Explicit Cancellation ```python Code streaming = await flow.kickoff_async() try: async for item in streaming: print(item.content, end="", flush=True) finally: await streaming.aclose() # async # streaming.close() # sync equivalent ``` After cancellation, `streaming.is_cancelled` and `streaming.is_completed` are both `True`. Both `aclose()` and `close()` are idempotent. ## Important Notes - Streaming automatically enables LLM streaming for any crews used within the flow - You must iterate through all stream items before accessing the `.result` property - Streaming works with both structured and unstructured flow state - Flow streaming captures output from all crews and LLM calls in the flow - Each frame includes structured event context such as channel, type, namespace, and payload - Streaming adds minimal overhead to flow execution ## Combining with Flow Visualization You can combine streaming with flow visualization to provide a complete picture: ```python Code # Generate flow visualization flow = ResearchFlow() flow.plot("research_flow") # Creates HTML visualization # Run with streaming streaming = flow.kickoff() for item in streaming: print(item.content, end="", flush=True) result = streaming.result print(f"\nFlow complete! View structure at: research_flow.html") ``` By leveraging flow streaming, you can build sophisticated, responsive applications that provide users with real-time visibility into complex multi-stage workflows, making your AI automations more transparent and engaging.