Files
crewAI/docs/en/concepts/flows.mdx
2026-02-19 11:33:49 -08:00

941 lines
33 KiB
Plaintext

---
title: Flows
description: Learn how to create and manage AI workflows using CrewAI Flows.
icon: arrow-progress
mode: "wide"
---
## Overview
CrewAI Flows is a powerful feature designed to streamline the creation and management of AI workflows. Flows allow developers to combine and coordinate coding tasks and Crews efficiently, providing a robust framework for building sophisticated AI automations.
Flows allow you to create structured, event-driven workflows. They provide a seamless way to connect multiple tasks, manage state, and control the flow of execution in your AI applications. With Flows, you can easily design and implement multi-step processes that leverage the full potential of CrewAI's capabilities.
1. **Simplified Workflow Creation**: Easily chain together multiple Crews and tasks to create complex AI workflows.
2. **State Management**: Flows make it super easy to manage and share state between different tasks in your workflow.
3. **Event-Driven Architecture**: Built on an event-driven model, allowing for dynamic and responsive workflows.
4. **Flexible Control Flow**: Implement conditional logic, loops, and branching within your workflows.
## When to Use Flows
- You need deterministic orchestration and branching logic.
- You need explicit state transitions across multiple steps.
- You need resumable workflows with persistence.
- You need to combine crews, direct model calls, and Python logic in one runtime.
## When Not to Use Flows
- A single prompt/response call is sufficient.
- A single crew kickoff with no orchestration logic is sufficient.
- You do not need stateful multi-step execution.
## Getting Started
The example below shows a realistic Flow for support-ticket triage. It demonstrates features teams use in production: typed state, routing, memory access, and persistence.
```python Code
from crewai.flow.flow import Flow, listen, router, start
from crewai.flow.persistence import persist
from pydantic import BaseModel, Field
class SupportTriageState(BaseModel):
ticket_id: str = ""
customer_tier: str = "standard" # standard | enterprise
issue: str = ""
urgency: str = "normal"
route: str = ""
draft_reply: str = ""
internal_notes: list[str] = Field(default_factory=list)
@persist()
class SupportTriageFlow(Flow[SupportTriageState]):
@start()
def ingest_ticket(self):
# kickoff(inputs={...}) is merged into typed state fields
print(f"Flow State ID: {self.state.id}")
self.remember(
f"Ticket {self.state.ticket_id}: {self.state.issue}",
scope=f"/support/{self.state.ticket_id}",
)
issue = self.state.issue.lower()
if "security" in issue or "breach" in issue:
self.state.urgency = "critical"
elif self.state.customer_tier == "enterprise":
self.state.urgency = "high"
else:
self.state.urgency = "normal"
return self.state.issue
@router(ingest_ticket)
def route_ticket(self):
issue = self.state.issue.lower()
if "security" in issue or "breach" in issue:
self.state.route = "security"
return "security_review"
if self.state.customer_tier == "enterprise" or self.state.urgency == "high":
self.state.route = "priority"
return "priority_queue"
self.state.route = "standard"
return "standard_queue"
@listen("security_review")
def handle_security(self):
self.state.internal_notes.append("Escalated to Security Incident Response")
self.state.draft_reply = (
"We have escalated your case to our security team and will update you shortly."
)
return self.state.draft_reply
@listen("priority_queue")
def handle_priority(self):
history = self.recall("SLA commitments for enterprise support", limit=2)
self.state.internal_notes.append(
f"Loaded {len(history)} memory hits for priority handling"
)
self.state.draft_reply = (
"Your ticket has been prioritized and assigned to a senior support engineer."
)
return self.state.draft_reply
@listen("standard_queue")
def handle_standard(self):
self.state.internal_notes.append("Routed to standard support queue")
self.state.draft_reply = "Thanks for reporting this. Our team will follow up soon."
return self.state.draft_reply
flow = SupportTriageFlow()
flow.plot("support_triage_flow")
result = flow.kickoff(
inputs={
"ticket_id": "TCK-1024",
"customer_tier": "enterprise",
"issue": "Cannot access SSO after enabling new policy",
}
)
print("Final reply:", result)
print("Route:", flow.state.route)
print("Notes:", flow.state.internal_notes)
```
![Flow Visual image](/images/crewai-flow-1.png)
In this example, one flow demonstrates several core features together:
1. `@start()` initializes and normalizes state for downstream steps.
2. `@router()` performs deterministic branching into labeled routes.
3. Route listeners implement lane-specific behavior (`security`, `priority`, `standard`).
4. `@persist()` keeps the flow state recoverable between runs.
5. Built-in memory methods (`remember`, `recall`) add durable context beyond a single method call.
This pattern mirrors typical production workflows where request classification, policy-aware routing, and auditable state all happen in one orchestrated flow.
### @start()
The `@start()` decorator marks entry points for a Flow. You can:
- Declare multiple unconditional starts: `@start()`
- Gate a start on a prior method or router label: `@start("method_or_label")`
- Provide a callable condition to control when a start should fire
All satisfied `@start()` methods will execute (often in parallel) when the Flow begins or resumes.
### @listen()
The `@listen()` decorator is used to mark a method as a listener for the output of another task in the Flow. The method decorated with `@listen()` will be executed when the specified task emits an output. The method can access the output of the task it is listening to as an argument.
#### Usage
The `@listen()` decorator can be used in several ways:
1. **Listening to a Method by Name**: You can pass the name of the method you want to listen to as a string. When that method completes, the listener method will be triggered.
```python Code
@listen("upstream_method")
def downstream_method(self, upstream_result):
# Implementation
```
2. **Listening to a Method Directly**: You can pass the method itself. When that method completes, the listener method will be triggered.
```python Code
@listen(upstream_method)
def downstream_method(self, upstream_result):
# Implementation
```
### Flow Output
Accessing and handling the output of a Flow is essential for integrating your AI workflows into larger applications or systems. CrewAI Flows provide straightforward mechanisms to retrieve the final output, access intermediate results, and manage the overall state of your Flow.
#### Retrieving the Final Output
When you run a Flow, the final output is determined by the last method that completes. The `kickoff()` method returns the output of this final method.
Here's how you can access the final output:
<CodeGroup>
```python Code
from crewai.flow.flow import Flow, listen, start
class OutputExampleFlow(Flow):
@start()
def first_method(self):
return "Output from first_method"
@listen(first_method)
def second_method(self, first_output):
return f"Second method received: {first_output}"
flow = OutputExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print("---- Final Output ----")
print(final_output)
```
```text Output
---- Final Output ----
Second method received: Output from first_method
```
</CodeGroup>
![Flow Visual image](/images/crewai-flow-2.png)
In this example, the `second_method` is the last method to complete, so its output will be the final output of the Flow.
The `kickoff()` method will return the final output, which is then printed to the console. The `plot()` method will generate the HTML file, which will help you understand the flow.
#### Accessing and Updating State
In addition to retrieving the final output, you can also access and update the state within your Flow. The state can be used to store and share data between different methods in the Flow. After the Flow has run, you can access the state to retrieve any information that was added or updated during the execution.
Here's an example of how to update and access the state:
<CodeGroup>
```python Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
counter: int = 0
message: str = ""
class StateExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
self.state.message = "Hello from first_method"
self.state.counter += 1
@listen(first_method)
def second_method(self):
self.state.message += " - updated by second_method"
self.state.counter += 1
return self.state.message
flow = StateExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)
```
```text Output
Final Output: Hello from first_method - updated by second_method
Final State:
counter=2 message='Hello from first_method - updated by second_method'
```
</CodeGroup>
![Flow Visual image](/images/crewai-flow-2.png)
In this example, the state is updated by both `first_method` and `second_method`.
After the Flow has run, you can access the final state to see the updates made by these methods.
By ensuring that the final method's output is returned and providing access to the state, CrewAI Flows make it easy to integrate the results of your AI workflows into larger applications or systems,
while also maintaining and accessing the state throughout the Flow's execution.
## Flow State Management
Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management,
allowing developers to choose the approach that best fits their application's needs.
### Unstructured State Management
In unstructured state management, all state is stored in the `state` attribute of the `Flow` class.
This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.
Even with unstructured states, CrewAI Flows automatically generates and maintains a unique identifier (UUID) for each state instance.
```python Code
from crewai.flow.flow import Flow, listen, start
class UnstructuredExampleFlow(Flow):
@start()
def first_method(self):
# The state automatically includes an 'id' field
print(f"State ID: {self.state['id']}")
self.state['counter'] = 0
self.state['message'] = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state['counter'] += 1
self.state['message'] += " - updated"
@listen(second_method)
def third_method(self):
self.state['counter'] += 1
self.state['message'] += " - updated again"
print(f"State after third_method: {self.state}")
flow = UnstructuredExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
```
![Flow Visual image](/images/crewai-flow-3.png)
**Note:** The `id` field is automatically generated and preserved throughout the flow's execution. You don't need to manage or set it manually, and it will be maintained even when updating the state with new data.
**Key Points:**
- **Flexibility:** You can dynamically add attributes to `self.state` without predefined constraints.
- **Simplicity:** Ideal for straightforward workflows where state structure is minimal or varies significantly.
### Structured State Management
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow.
By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.
Each state in CrewAI Flows automatically receives a unique identifier (UUID) to help track and manage state instances. This ID is automatically generated and managed by the Flow system.
```python Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
# Note: 'id' field is automatically added to all states
counter: int = 0
message: str = ""
class StructuredExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
# Access the auto-generated ID if needed
print(f"State ID: {self.state.id}")
self.state.message = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state.counter += 1
self.state.message += " - updated"
@listen(second_method)
def third_method(self):
self.state.counter += 1
self.state.message += " - updated again"
print(f"State after third_method: {self.state}")
flow = StructuredExampleFlow()
flow.kickoff()
```
![Flow Visual image](/images/crewai-flow-3.png)
**Key Points:**
- **Defined Schema:** `ExampleState` clearly outlines the state structure, enhancing code readability and maintainability.
- **Type Safety:** Leveraging Pydantic ensures that state attributes adhere to the specified types, reducing runtime errors.
- **Auto-Completion:** IDEs can provide better auto-completion and error checking based on the defined state model.
### Choosing Between Unstructured and Structured State Management
- **Use Unstructured State Management when:**
- The workflow's state is simple or highly dynamic.
- Flexibility is prioritized over strict state definitions.
- Rapid prototyping is required without the overhead of defining schemas.
- **Use Structured State Management when:**
- The workflow requires a well-defined and consistent state structure.
- Type safety and validation are important for your application's reliability.
- You want to leverage IDE features like auto-completion and type checking for better developer experience.
By providing both unstructured and structured state management options, CrewAI Flows empowers developers to build AI workflows that are both flexible and robust, catering to a wide range of application requirements.
## Flow Persistence
The @persist decorator enables automatic state persistence in CrewAI Flows, allowing you to maintain flow state across restarts or different workflow executions. This decorator can be applied at either the class level or method level, providing flexibility in how you manage state persistence.
### Class-Level Persistence
When applied at the class level, the @persist decorator automatically persists all flow method states:
```python
@persist # Using SQLiteFlowPersistence by default
class MyFlow(Flow[MyState]):
@start()
def initialize_flow(self):
# This method will automatically have its state persisted
self.state.counter = 1
print("Initialized flow. State ID:", self.state.id)
@listen(initialize_flow)
def next_step(self):
# The state (including self.state.id) is automatically reloaded
self.state.counter += 1
print("Flow state is persisted. Counter:", self.state.counter)
```
### Method-Level Persistence
For more granular control, you can apply @persist to specific methods:
```python
class AnotherFlow(Flow[dict]):
@persist # Persists only this method's state
@start()
def begin(self):
if "runs" not in self.state:
self.state["runs"] = 0
self.state["runs"] += 1
print("Method-level persisted runs:", self.state["runs"])
```
### How It Works
1. **Unique State Identification**
- Each flow state automatically receives a unique UUID
- The ID is preserved across state updates and method calls
- Supports both structured (Pydantic BaseModel) and unstructured (dictionary) states
2. **Default SQLite Backend**
- SQLiteFlowPersistence is the default storage backend
- States are automatically saved to a local SQLite database
- Robust error handling ensures clear messages if database operations fail
3. **Error Handling**
- Comprehensive error messages for database operations
- Automatic state validation during save and load
- Clear feedback when persistence operations encounter issues
### Important Considerations
- **State Types**: Both structured (Pydantic BaseModel) and unstructured (dictionary) states are supported
- **Automatic ID**: The `id` field is automatically added if not present
- **State Recovery**: Failed or restarted flows can automatically reload their previous state
- **Custom Implementation**: You can provide your own FlowPersistence implementation for specialized storage needs
### Technical Advantages
1. **Precise Control Through Low-Level Access**
- Direct access to persistence operations for advanced use cases
- Fine-grained control via method-level persistence decorators
- Built-in state inspection and debugging capabilities
- Full visibility into state changes and persistence operations
2. **Enhanced Reliability**
- Automatic state recovery after system failures or restarts
- Transaction-based state updates for data integrity
- Comprehensive error handling with clear error messages
- Robust validation during state save and load operations
3. **Extensible Architecture**
- Customizable persistence backend through FlowPersistence interface
- Support for specialized storage solutions beyond SQLite
- Compatible with both structured (Pydantic) and unstructured (dict) states
- Seamless integration with existing CrewAI flow patterns
The persistence system's architecture emphasizes technical precision and customization options, allowing developers to maintain full control over state management while benefiting from built-in reliability features.
## Flow Control
### Conditional Logic: `or`
The `or_` function in Flows allows you to listen to multiple methods and trigger the listener method when any of the specified methods emit an output.
<CodeGroup>
```python Code
from crewai.flow.flow import Flow, listen, or_, start
class OrExampleFlow(Flow):
@start()
def start_method(self):
return "Hello from the start method"
@listen(start_method)
def second_method(self):
return "Hello from the second method"
@listen(or_(start_method, second_method))
def logger(self, result):
print(f"Logger: {result}")
flow = OrExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
```
```text Output
Logger: Hello from the start method
Logger: Hello from the second method
```
</CodeGroup>
![Flow Visual image](/images/crewai-flow-4.png)
When you run this Flow, the `logger` method will be triggered by the output of either the `start_method` or the `second_method`.
The `or_` function is used to listen to multiple methods and trigger the listener method when any of the specified methods emit an output.
### Conditional Logic: `and`
The `and_` function in Flows allows you to listen to multiple methods and trigger the listener method only when all the specified methods emit an output.
<CodeGroup>
```python Code
from crewai.flow.flow import Flow, and_, listen, start
class AndExampleFlow(Flow):
@start()
def start_method(self):
self.state["greeting"] = "Hello from the start method"
@listen(start_method)
def second_method(self):
self.state["joke"] = "What do computers eat? Microchips."
@listen(and_(start_method, second_method))
def logger(self):
print("---- Logger ----")
print(self.state)
flow = AndExampleFlow()
flow.plot()
flow.kickoff()
```
```text Output
---- Logger ----
{'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}
```
</CodeGroup>
![Flow Visual image](/images/crewai-flow-5.png)
When you run this Flow, the `logger` method will be triggered only when both the `start_method` and the `second_method` emit an output.
The `and_` function is used to listen to multiple methods and trigger the listener method only when all the specified methods emit an output.
### Router
The `@router()` decorator in Flows allows you to define conditional routing logic based on the output of a method.
You can specify different routes based on the output of the method, allowing you to control the flow of execution dynamically.
<CodeGroup>
```python Code
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class ExampleState(BaseModel):
success_flag: bool = False
class RouterFlow(Flow[ExampleState]):
@start()
def start_method(self):
print("Starting the structured flow")
random_boolean = random.choice([True, False])
self.state.success_flag = random_boolean
@router(start_method)
def second_method(self):
if self.state.success_flag:
return "success"
else:
return "failed"
@listen("success")
def third_method(self):
print("Third method running")
@listen("failed")
def fourth_method(self):
print("Fourth method running")
flow = RouterFlow()
flow.plot("my_flow_plot")
flow.kickoff()
```
```text Output
Starting the structured flow
Third method running
Fourth method running
```
</CodeGroup>
![Flow Visual image](/images/crewai-flow-6.png)
In the above example, the `start_method` generates a random boolean value and sets it in the state.
The `second_method` uses the `@router()` decorator to define conditional routing logic based on the value of the boolean.
If the boolean is `True`, the method returns `"success"`, and if it is `False`, the method returns `"failed"`.
The `third_method` and `fourth_method` listen to the output of the `second_method` and execute based on the returned value.
When you run this Flow, the output will change based on the random boolean value generated by the `start_method`.
### Human in the Loop (human feedback)
<Note>
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**.
</Note>
The `@human_feedback` decorator enables human-in-the-loop workflows by pausing flow execution to collect feedback from a human. This is useful for approval gates, quality review, and decision points that require human judgment.
```python Code
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Do you approve this content?",
emit=["approved", "rejected", "needs_revision"],
llm="gpt-4o-mini",
default_outcome="needs_revision",
)
def generate_content(self):
return "Content to be reviewed..."
@listen("approved")
def on_approval(self, result: HumanFeedbackResult):
print(f"Approved! Feedback: {result.feedback}")
@listen("rejected")
def on_rejection(self, result: HumanFeedbackResult):
print(f"Rejected. Reason: {result.feedback}")
```
When `emit` is specified, the human's free-form feedback is interpreted by an LLM and collapsed into one of the specified outcomes, which then triggers the corresponding `@listen` decorator.
You can also use `@human_feedback` without routing to simply collect feedback:
```python Code
@start()
@human_feedback(message="Any comments on this output?")
def my_method(self):
return "Output for review"
@listen(my_method)
def next_step(self, result: HumanFeedbackResult):
# Access feedback via result.feedback
# Access original output via result.output
pass
```
Access all feedback collected during a flow via `self.last_human_feedback` (most recent) or `self.human_feedback_history` (all feedback as a list).
For a complete guide on human feedback in flows, including **async/non-blocking feedback** with custom providers (Slack, webhooks, etc.), see [Human Feedback in Flows](/en/learn/human-feedback-in-flows).
## Adding Agents to Flows
Agents can be seamlessly integrated into your flows, providing a lightweight alternative to full Crews when you need simpler, focused task execution. Here's an example of how to use an Agent within a flow to perform market research:
```python
import asyncio
from typing import Any, Dict, List
from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start
# Define a structured output format
class MarketAnalysis(BaseModel):
key_trends: List[str] = Field(description="List of identified market trends")
market_size: str = Field(description="Estimated market size")
competitors: List[str] = Field(description="Major competitors in the space")
# Define flow state
class MarketResearchState(BaseModel):
product: str = ""
analysis: MarketAnalysis | None = None
# Create a flow class
class MarketResearchFlow(Flow[MarketResearchState]):
@start()
def initialize_research(self) -> Dict[str, Any]:
print(f"Starting market research for {self.state.product}")
return {"product": self.state.product}
@listen(initialize_research)
async def analyze_market(self) -> Dict[str, Any]:
# Create an Agent for market research
analyst = Agent(
role="Market Research Analyst",
goal=f"Analyze the market for {self.state.product}",
backstory="You are an experienced market analyst with expertise in "
"identifying market trends and opportunities.",
tools=[SerperDevTool()],
verbose=True,
)
# Define the research query
query = f"""
Research the market for {self.state.product}. Include:
1. Key market trends
2. Market size
3. Major competitors
Format your response according to the specified structure.
"""
# Execute the analysis with structured output format
result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
if result.pydantic:
print("result", result.pydantic)
else:
print("result", result)
# Return the analysis to update the state
return {"analysis": result.pydantic}
@listen(analyze_market)
def present_results(self, analysis) -> None:
print("\nMarket Analysis Results")
print("=====================")
if isinstance(analysis, dict):
# If we got a dict with 'analysis' key, extract the actual analysis object
market_analysis = analysis.get("analysis")
else:
market_analysis = analysis
if market_analysis and isinstance(market_analysis, MarketAnalysis):
print("\nKey Market Trends:")
for trend in market_analysis.key_trends:
print(f"- {trend}")
print(f"\nMarket Size: {market_analysis.market_size}")
print("\nMajor Competitors:")
for competitor in market_analysis.competitors:
print(f"- {competitor}")
else:
print("No structured analysis data available.")
print("Raw analysis:", analysis)
# Usage example
async def run_flow():
flow = MarketResearchFlow()
flow.plot("MarketResearchFlowPlot")
result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
return result
# Run the flow
if __name__ == "__main__":
asyncio.run(run_flow())
```
![Flow Visual image](/images/crewai-flow-7.png)
This example demonstrates several key features of using Agents in flows:
1. **Structured Output**: Using Pydantic models to define the expected output format (`MarketAnalysis`) ensures type safety and structured data throughout the flow.
2. **State Management**: The flow state (`MarketResearchState`) maintains context between steps and stores both inputs and outputs.
3. **Tool Integration**: Agents can use tools (like `WebsiteSearchTool`) to enhance their capabilities.
## Multi-Crew Flows and Plotting
Detailed build walkthroughs and project scaffolding are documented in guide pages to keep this concepts page focused.
- Build your first flow: [/en/guides/flows/first-flow](/en/guides/flows/first-flow)
- Master state and persistence: [/en/guides/flows/mastering-flow-state](/en/guides/flows/mastering-flow-state)
- Real-world chat-state pattern: [/en/learn/flowstate-chat-history](/en/learn/flowstate-chat-history)
For visualization:
- Use `flow.plot("my_flow_plot")` in code, or
- Use `crewai flow plot` in CLI projects.
## Running Flows
There are two ways to run a flow:
### Using the Flow API
You can run a flow programmatically by creating an instance of your flow class and calling the `kickoff()` method:
```python
flow = SupportTriageFlow()
result = flow.kickoff()
```
### Streaming Flow Execution
For real-time visibility into flow execution, you can enable streaming to receive output as it's generated:
```python
class StreamingFlow(Flow):
stream = True # Enable streaming
@start()
def research(self):
# Your flow implementation
pass
# Iterate over streaming output
flow = StreamingFlow()
streaming = flow.kickoff()
for chunk in streaming:
print(chunk.content, end="", flush=True)
# Access final result
result = streaming.result
```
Learn more about streaming in the [Streaming Flow Execution](/en/learn/streaming-flow-execution) guide.
## Memory in Flows
Every Flow automatically has access to CrewAI's unified [Memory](/concepts/memory) system. You can store, recall, and extract memories directly inside any flow method using three built-in convenience methods.
### Built-in Methods
| Method | Description |
| :--- | :--- |
| `self.remember(content, **kwargs)` | Store content in memory. Accepts optional `scope`, `categories`, `metadata`, `importance`. |
| `self.recall(query, **kwargs)` | Retrieve relevant memories. Accepts optional `scope`, `categories`, `limit`, `depth`. |
| `self.extract_memories(content)` | Break raw text into discrete, self-contained memory statements. |
A default `Memory()` instance is created automatically when the Flow initializes. You can also pass a custom one:
```python
from crewai.flow.flow import Flow
from crewai import Memory
custom_memory = Memory(
recency_weight=0.5,
recency_half_life_days=7,
embedder={"provider": "ollama", "config": {"model_name": "mxbai-embed-large"}},
)
flow = MyFlow(memory=custom_memory)
```
### Example: Research and Analyze Flow
```python
from crewai.flow.flow import Flow, listen, start
class ResearchAnalysisFlow(Flow):
@start()
def gather_data(self):
# Simulate research findings
findings = (
"PostgreSQL handles 10k concurrent connections with connection pooling. "
"MySQL caps at around 5k. MongoDB scales horizontally but adds complexity."
)
# Extract atomic facts and remember each one
memories = self.extract_memories(findings)
for mem in memories:
self.remember(mem, scope="/research/databases")
return findings
@listen(gather_data)
def analyze(self, raw_findings):
# Recall relevant past research (from this run or previous runs)
past = self.recall("database performance and scaling", limit=10, depth="shallow")
context_lines = [f"- {m.record.content}" for m in past]
context = "\n".join(context_lines) if context_lines else "No prior context."
return {
"new_findings": raw_findings,
"prior_context": context,
"total_memories": len(past),
}
flow = ResearchAnalysisFlow()
result = flow.kickoff()
print(result)
```
Because memory persists across runs (backed by LanceDB on disk), the `analyze` step will recall findings from previous executions too -- enabling flows that learn and accumulate knowledge over time.
See the [Memory documentation](/concepts/memory) for details on scopes, slices, composite scoring, embedder configuration, and more.
### Using the CLI
Starting from version 0.103.0, you can run flows using the `crewai run` command:
```shell
crewai run
```
This command automatically detects if your project is a flow (based on the `type = "flow"` setting in your pyproject.toml) and runs it accordingly. This is the recommended way to run flows from the command line.
For backward compatibility, you can also use:
```shell
crewai flow kickoff
```
However, the `crewai run` command is now the preferred method as it works for both crews and flows.
## Common Failure Modes
### Router branch not firing
- Cause: returned label does not match a `@listen("label")` value.
- Fix: align router return strings with listener labels exactly.
### State fields missing at runtime
- Cause: untyped dynamic fields or missing kickoff inputs.
- Fix: use typed state and validate required fields in `@start()`.
### Prompt/token growth over time
- Cause: appending unbounded message history in state.
- Fix: apply sliding-window state and summary compaction patterns.
### Non-idempotent retries
- Cause: side effects executed on retried steps.
- Fix: add idempotency keys/markers to state and guard external writes.