Files
crewAI/docs/en/concepts/production-architecture.mdx
Tiago Freire cd2b9ee38a feat(flow): add restore_from_state_id kickoff parameter (#5674)
## Summary

- Reverts `b0e2fda` ("fix(flow): add execution_id separate from state.id", COR-48): removes `Flow.execution_id` and points `current_flow_id` / `current_flow_request_id` back at `flow_id` (i.e. `state.id`). The separate per-run tracking id was no longer the right abstraction once `restore_from_state_id` reshapes how `state.id` is assigned;

- Adds an optional `restore_from_state_id` kwarg to `Flow.kickoff` / `Flow.kickoff_async` that hydrates state from a previously-persisted flow's latest snapshot

- Reassigns `state.id` to a fresh value (or `inputs["id"]` if pinned) so the new run's `@persist` writes don't extend the source's history

- Existing `inputs["id"]` resume, `@persist`, and `from_checkpoint` paths are unchanged

## Problem
`@persist` only supports *resume* today: `kickoff(inputs={"id": <uuid>})` hydrates state and continues writing under the same `flow_uuid`. There's no way to **fork** — hydrate from a snapshot but persist under a separate key, leaving the source's history intact. This PR adds that.

| | `state.id` after kickoff | `@persist` writes land under |
|---|---|---|
| `inputs["id"]` (resume) | supplied id | supplied id (extends history) |
| `restore_from_state_id` (fork) | fresh id, or `inputs["id"]` if pinned | new id (source preserved) |

## Behavior

| `inputs.id` | `restore_from_state_id` | Effect |
|---|---|---|
| — | — | Fresh kickoff |
| set | — | Existing resume |
| — | UUID | Fork — new `state.id`, hydrated from source |
| set | UUID | Fork into a pinned `state.id`, hydrated from source |

- Source not found → silent fallback (mirrors existing resume)
- Both `from_checkpoint` and `restore_from_state_id` set → `ValueError`
- `restore_from_state_id=None` → byte-identical to current main

## Design
Fork hydration runs before the existing `inputs` block in `kickoff_async`. On a hit, it calls the same `_restore_state` primitive used by resume, then overwrites `state.id` with a fresh UUID (or `inputs["id"]`). A `fork_succeeded` flag gates the existing `inputs["id"]` path so we don't double-load. `_completed_methods` / `_is_execution_resuming` are intentionally untouched — skip-completed-methods remains the territory of `apply_checkpoint` and `from_pending`.

## Test plan
- [ ] `pytest tests/test_flow_persistence.py` — 5 new tests (four-row matrix, not-found fallback, default no-op, conflict raise) + 6 existing as regression
- [ ] `pytest tests/test_flow.py` — broader flow suite
- [ ] Manual end-to-end against an HITL `@persist` flow
2026-05-01 11:46:07 -04:00

163 lines
5.5 KiB
Plaintext

---
title: Production Architecture
description: Best practices for building production-ready AI applications with CrewAI
icon: server
mode: "wide"
---
# The Flow-First Mindset
When building production AI applications with CrewAI, **we recommend starting with a Flow**.
While it's possible to run individual Crews or Agents, wrapping them in a Flow provides the necessary structure for a robust, scalable application.
## Why Flows?
1. **State Management**: Flows provide a built-in way to manage state across different steps of your application. This is crucial for passing data between Crews, maintaining context, and handling user inputs.
2. **Control**: Flows allow you to define precise execution paths, including loops, conditionals, and branching logic. This is essential for handling edge cases and ensuring your application behaves predictably.
3. **Observability**: Flows provide a clear structure that makes it easier to trace execution, debug issues, and monitor performance. We recommend using [CrewAI Tracing](/en/observability/tracing) for detailed insights. Simply run `crewai login` to enable free observability features.
## The Architecture
A typical production CrewAI application looks like this:
```mermaid
graph TD
Start((Start)) --> Flow[Flow Orchestrator]
Flow --> State{State Management}
State --> Step1[Step 1: Data Gathering]
Step1 --> Crew1[Research Crew]
Crew1 --> State
State --> Step2{Condition Check}
Step2 -- "Valid" --> Step3[Step 3: Execution]
Step3 --> Crew2[Action Crew]
Step2 -- "Invalid" --> End((End))
Crew2 --> End
```
### 1. The Flow Class
Your `Flow` class is the entry point. It defines the state schema and the methods that execute your logic.
```python
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class AppState(BaseModel):
user_input: str = ""
research_results: str = ""
final_report: str = ""
class ProductionFlow(Flow[AppState]):
@start()
def gather_input(self):
# ... logic to get input ...
pass
@listen(gather_input)
def run_research_crew(self):
# ... trigger a Crew ...
pass
```
### 2. State Management
Use Pydantic models to define your state. This ensures type safety and makes it clear what data is available at each step.
- **Keep it minimal**: Store only what you need to persist between steps.
- **Use structured data**: Avoid unstructured dictionaries when possible.
### 3. Crews as Units of Work
Delegate complex tasks to Crews. A Crew should be focused on a specific goal (e.g., "Research a topic", "Write a blog post").
- **Don't over-engineer Crews**: Keep them focused.
- **Pass state explicitly**: Pass the necessary data from the Flow state to the Crew inputs.
```python
@listen(gather_input)
def run_research_crew(self):
crew = ResearchCrew()
result = crew.kickoff(inputs={"topic": self.state.user_input})
self.state.research_results = result.raw
```
## Control Primitives
Leverage CrewAI's control primitives to add robustness and control to your Crews.
### 1. Task Guardrails
Use [Task Guardrails](/en/concepts/tasks#task-guardrails) to validate task outputs before they are accepted. This ensures that your agents produce high-quality results.
```python
def validate_content(result: TaskOutput) -> Tuple[bool, Any]:
if len(result.raw) < 100:
return (False, "Content is too short. Please expand.")
return (True, result.raw)
task = Task(
...,
guardrail=validate_content
)
```
### 2. Structured Outputs
Always use structured outputs (`output_pydantic` or `output_json`) when passing data between tasks or to your application. This prevents parsing errors and ensures type safety.
```python
class ResearchResult(BaseModel):
summary: str
sources: List[str]
task = Task(
...,
output_pydantic=ResearchResult
)
```
### 3. LLM Hooks
Use [LLM Hooks](/en/learn/llm-hooks) to inspect or modify messages before they are sent to the LLM, or to sanitize responses.
```python
@before_llm_call
def log_request(context):
print(f"Agent {context.agent.role} is calling the LLM...")
```
## Deployment Patterns
When deploying your Flow, consider the following:
### CrewAI Enterprise
The easiest way to deploy your Flow is using CrewAI Enterprise. It handles the infrastructure, authentication, and monitoring for you.
Check out the [Deployment Guide](/en/enterprise/guides/deploy-crew) to get started.
```bash
crewai deploy create
```
### Async Execution
For long-running tasks, use `kickoff_async` to avoid blocking your API.
### Persistence
Use the `@persist` decorator to save the state of your Flow to a database. This allows you to resume execution if the process crashes or if you need to wait for human input.
```python
@persist
class ProductionFlow(Flow[AppState]):
# ...
```
By default, `@persist` resumes a flow when `kickoff(inputs={"id": <uuid>})` is supplied, extending the same `flow_uuid` history. To **fork** a persisted flow into a new lineage — hydrate state from a previous run but write under a fresh `state.id` — pass `restore_from_state_id`:
```python
flow.kickoff(restore_from_state_id="<previous-run-state-id>")
```
The new run gets a fresh `state.id` (auto-generated, or `inputs["id"]` if pinned) so its `@persist` writes don't extend the source's history. Combining with `from_checkpoint` raises a `ValueError`; pick one hydration source.
## Summary
- **Start with a Flow.**
- **Define a clear State.**
- **Use Crews for complex tasks.**
- **Deploy with an API and persistence.**