Compare commits

..

3 Commits

Author SHA1 Message Date
Devin AI
7df2e54749 fix: resolve type-checker errors in cache preload code
- Use explicit type annotation for original_max_tokens in preload_probe
- Use self.__setattr__ to avoid type mismatch with subclass fields
- Replace hasattr checks with isinstance(agent.llm, BaseLLM) for proper
  type narrowing
- Ensure _get_agent_system_prompt returns str without Any leak

Co-Authored-By: João <joao@crewai.com>
2026-05-25 07:10:03 +00:00
Devin AI
2b60f3df16 style: apply ruff format fixes
Co-Authored-By: João <joao@crewai.com>
2026-05-25 07:03:42 +00:00
Devin AI
158d962ea9 feat: add session-start prompt-cache preload for crew kickoff (#5921)
Add opt-in cache_preload and cache_preload_strategy parameters to the
Crew class that fire lightweight 1-token cache-warming probes against
each agent's system prompt at kickoff time. This warms the provider's
prompt cache (Anthropic, OpenAI prefix caching, etc.) before the first
real task runs, reducing first-step latency and cache-write costs.

Implementation:
- BaseLLM.preload_probe(): sends max_tokens=1 completion with the
  agent's system prompt; failures are logged and never propagated
- Crew.cache_preload / Crew.cache_preload_strategy fields
- Crew._preload_caches() with three strategies:
  * parallel: concurrent probes via ThreadPoolExecutor
  * sequential: one-by-one in agent order
  * shared_prefix: warm common prefix once then per-agent suffixes;
    falls back to parallel when prefix < 1024 chars

The feature is opt-in (cache_preload=False by default) and only
activates for crews with 2+ agents.

Co-Authored-By: João <joao@crewai.com>
2026-05-25 07:01:12 +00:00
12 changed files with 1454 additions and 1561 deletions

View File

@@ -5,403 +5,225 @@ icon: floppy-disk
mode: "wide"
---
الـ Checkpointing يحفظ لقطة من حالة التنفيذ أثناء التشغيل بحيث يمكن لطاقم أو تدفق أو وكيل الاستئناف بعد الفشل أو التفرع إلى فرع بديل.
<CardGroup cols={2}>
<Card title="الشرح" icon="lightbulb" href="#الشرح">
كيف يعمل الـ Checkpointing: الأحداث والتخزين والوراثة.
</Card>
<Card title="درس تطبيقي" icon="graduation-cap" href="#درس-تطبيقي-استئناف-طاقم-فاشل">
دليل 5 دقائق: تشغيل، إيقاف، استئناف.
</Card>
<Card title="ادلة عملية" icon="screwdriver-wrench" href="#ادلة-عملية">
وصفات مركزة على المهام لسير العمل الشائع.
</Card>
<Card title="المرجع" icon="book" href="#المرجع">
`CheckpointConfig` والأحداث والمزودات وسطر الأوامر.
</Card>
</CardGroup>
## الشرح
### ما هي نقطة الحفظ
تلتقط نقطة الحفظ كل ما يحتاجه CrewAI لإعادة إنشاء تشغيل أثناء سيره: الحالة الكاملة للطاقم أو التدفق أو الوكيل — التكوين، وذاكرة الوكلاء ومصادر المعرفة، وتقدم المهام، والمخرجات الوسيطة — إلى جانب مدخلات الـ kickoff، وسجل الأحداث حتى تلك النقطة، ومعرف نسب يربط نقطة الحفظ بالتشغيل الذي جاءت منه.
الاستعادة تعيد بناء تلك الحالة وتستمر. تتخطى المهام المكتملة، وتعاد ترطيب الذاكرة والمعرفة، ويعمل العمل التابع على نفس المخرجات التي أنتجها التشغيل الأصلي. التفرع يجري نفس الاستعادة تحت نسب جديد، بحيث يكتب الفرع الجديد والتشغيل الأصلي نقاط الحفظ جنبا إلى جنب دون أن يطمس أحدهما الآخر.
### متى تكتب نقاط الحفظ
الـ Checkpointing مدفوع بالأحداث. يشترك وقت التشغيل في الأحداث التي تحددها عبر `on_events` ويكتب نقطة حفظ عند إطلاق أحدها. الافتراضي `task_completed` ينتج نقطة حفظ لكل مهمة منتهية — توازن معقول بين الدقة واستخدام القرص. الأحداث عالية التردد مثل `llm_call_completed` متاحة للاستعادة الدقيقة لكنها تكتب ملفات أكثر بكثير.
### التخزين
يتضمن CrewAI مزودين:
- `JsonProvider` يكتب ملفا لكل نقطة حفظ. قابل للقراءة وسهل التفقد.
- `SqliteProvider` يكتب إلى قاعدة بيانات SQLite واحدة. أفضل لنقاط الحفظ عالية التردد.
كلاهما يحذف أقدم نقاط الحفظ عند تحديد `max_checkpoints`.
<Note>
كتابة نقاط الحفظ بأفضل جهد. فشل نقطة حفظ يسجل لكنه لا يقاطع التشغيل.
</Note>
### نموذج الوراثة
`Crew` و`Flow` و`Agent` كلها تقبل وسيط `checkpoint`. يرث الأبناء من الأب ما لم يحددوا قيمتهم الخاصة أو يمرروا `False` للانسحاب. فعل الـ Checkpointing مرة واحدة على الطاقم وتشارك كل الوكلاء، أو استبعد وكيلا واحدا بشكل انتقائي.
## درس تطبيقي: استئناف طاقم فاشل
هذا الدليل يستغرق حوالي 5 دقائق. ستشغل طاقما بمهمتين، توقفه في المنتصف، ثم تستأنف من نقطة الحفظ المحفوظة.
<Steps>
<Step title="أنشئ الطاقم مع تفعيل الـ Checkpointing">
```python
from crewai import Agent, Crew, Task
researcher = Agent(role="Researcher", goal="Research", backstory="Expert")
writer = Agent(role="Writer", goal="Write", backstory="Expert")
crew = Crew(
agents=[researcher, writer],
tasks=[
Task(description="Research AI trends", agent=researcher, expected_output="bullets"),
Task(description="Write a summary", agent=writer, expected_output="paragraph"),
],
checkpoint=True,
)
```
</Step>
<Step title="شغله وأوقفه بعد المهمة الأولى">
```python
result = crew.kickoff()
```
اضغط `Ctrl+C` بعد انتهاء المهمة الأولى. في `./.checkpoints/`، الملف بصيغة `<timestamp>_<uuid>.json` هو نقطة الحفظ.
</Step>
<Step title="استأنف من نقطة الحفظ">
```python
from crewai import CheckpointConfig
result = crew.kickoff(
from_checkpoint=CheckpointConfig(
restore_from="./.checkpoints/<timestamp>_<uuid>.json",
),
)
```
يتم تخطي مهمة البحث، ويعمل الكاتب على مخرجات البحث المحفوظة، وينتهي الطاقم.
</Step>
</Steps>
## ادلة عملية
<AccordionGroup>
<Accordion title="تفعيل الـ Checkpointing بالإعدادات الافتراضية" icon="play">
```python
crew = Crew(agents=[...], tasks=[...], checkpoint=True)
```
يكتب إلى `./.checkpoints/` عند كل `task_completed`.
</Accordion>
<Accordion title="تخصيص التخزين والتردد" icon="sliders">
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
</Accordion>
<Accordion title="اختيار مزود التخزين" icon="database">
<CodeGroup>
```python JsonProvider
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
```python SqliteProvider
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
max_checkpoints=50,
),
)
```
</CodeGroup>
<Tip>
SQLite يفعل وضع journal WAL للقراءات المتزامنة. يفضل لنقاط الحفظ عالية التردد.
</Tip>
</Accordion>
<Accordion title="استبعاد وكيل واحد" icon="user-slash">
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...),
Agent(role="Writer", ..., checkpoint=False),
],
tasks=[...],
checkpoint=True,
)
```
</Accordion>
<Accordion title="التفرع إلى فرع جديد" icon="code-branch">
`fork()` يستعيد نقطة حفظ تحت نسب جديد بحيث لا يتصادم التشغيل الجديد مع الأصلي.
```python
config = CheckpointConfig(restore_from="./my_checkpoints/<file>.json")
crew = Crew.fork(config, branch="experiment-a")
result = crew.kickoff(inputs={"strategy": "aggressive"})
```
تسمية `branch` اختيارية؛ يتم إنشاء واحدة إذا أغفلت.
</Accordion>
<Accordion title="Checkpointing لـ Crew أو Flow أو Agent" icon="cubes">
<Tabs>
<Tab title="Crew">
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
المشغل الافتراضي: `task_completed`.
</Tab>
<Tab title="Flow">
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
```
</Tab>
<Tab title="Agent">
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
</Tab>
</Tabs>
</Accordion>
<Accordion title="كتابة نقطة حفظ يدويا" icon="code">
سجل معالجا على أي حدث واستدع `state.checkpoint()`.
<CodeGroup>
```python Sync
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"تم حفظ نقطة الحفظ: {path}")
```
```python Async
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"تم حفظ نقطة الحفظ: {path}")
```
</CodeGroup>
يتم تمرير وسيط `state` تلقائيا عندما يقبل المعالج ثلاثة معاملات. راجع [Event Listeners](/ar/concepts/event-listener) لقائمة الأحداث الكاملة.
</Accordion>
<Accordion title="التصفح والاستئناف والتفرع من سطر الأوامر" icon="terminal">
```bash
crewai checkpoint # كشف تلقائي لـ .checkpoints/ أو .checkpoints.db
crewai checkpoint --location ./my_checkpoints
crewai checkpoint --location ./.checkpoints.db
```
<Frame caption="شجرة نقاط الحفظ — الفروع والتفرعات تتداخل تحت أبيها.">
<img src="/images/checkpoint-tui-tree.png" alt="Checkpoint TUI tree view" />
</Frame>
اللوحة اليسرى تجمع نقاط الحفظ حسب الفرع؛ التفرعات تتداخل تحت أبيها. اختيار نقطة حفظ يفتح لوحة التفاصيل مع بياناتها الوصفية وحالة الكيان وتقدم المهام. **Resume** يكمل التشغيل؛ **Fork** يبدأ فرعا جديدا.
<Frame caption="تبويب النظرة العامة — البيانات الوصفية وحالة الكيان وملخص التشغيل.">
<img src="/images/checkpoint-tui-detail-overview.png" alt="Checkpoint detail overview tab" />
</Frame>
لوحة التفاصيل تعرض منطقتين قابلتين للتحرير:
- **Inputs** — مدخلات الـ kickoff الأصلية، معبأة مسبقا وقابلة للتحرير.
<Frame>
<img src="/images/checkpoint-tui-detail-inputs.png" alt="Editable kickoff inputs" />
</Frame>
- **مخرجات المهام** — مخرجات المهام المكتملة. تحرير مخرج والضغط على **Fork** يبطل المهام التابعة لتعاد بالسياق المعدل.
<Frame>
<img src="/images/checkpoint-tui-detail-tasks.png" alt="Editable task outputs" />
</Frame>
<Frame caption="عرض التفرع — تأكيد فرع جديد من نقطة الحفظ المختارة.">
<img src="/images/checkpoint-tui-details-fork.png" alt="Fork confirmation panel" />
</Frame>
<Tip>
مفيد لاستكشاف "ماذا لو": تفرع، عدل، راقب.
</Tip>
</Accordion>
<Accordion title="تفقد نقاط الحفظ بدون TUI" icon="magnifying-glass">
```bash
crewai checkpoint list ./my_checkpoints
crewai checkpoint info ./my_checkpoints/<file>.json
crewai checkpoint info ./.checkpoints.db
```
</Accordion>
</AccordionGroup>
## المرجع
### `CheckpointConfig`
<ParamField path="location" type="str" default='"./.checkpoints"'>
وجهة التخزين. مجلد لـ `JsonProvider`، مسار ملف قاعدة بيانات لـ `SqliteProvider`.
</ParamField>
<ParamField path="on_events" type="list[CheckpointEventType]" default='["task_completed"]'>
أنواع الأحداث التي تطلق نقطة حفظ. `CheckpointEventType` هو `Literal` — مدقق الأنواع يكمل تلقائيا ويرفض القيم غير المدعومة. راجع [أنواع الأحداث](#أنواع-الأحداث) للقائمة الكاملة.
</ParamField>
<ParamField path="provider" type="BaseProvider" default="JsonProvider()">
واجهة التخزين. `JsonProvider` أو `SqliteProvider`.
</ParamField>
<ParamField path="max_checkpoints" type="int | None" default="None">
الحد الاقصى لنقاط الحفظ المحتفظ بها. الأقدم تحذف بعد كل كتابة.
</ParamField>
<ParamField path="restore_from" type="Path | str | None" default="None">
نقطة الحفظ المراد استعادتها عند تمريرها عبر `from_checkpoint`.
</ParamField>
### قيم حقل `checkpoint`
مقبولة في `Crew` و`Flow` و`Agent`.
<ParamField path="None" type="افتراضي">
يرث من الأب.
</ParamField>
<ParamField path="True" type="bool">
تفعيل بالإعدادات الافتراضية.
</ParamField>
<ParamField path="False" type="bool">
انسحاب صريح. يوقف الوراثة.
</ParamField>
<ParamField path="CheckpointConfig(...)" type="CheckpointConfig">
إعدادات مخصصة.
</ParamField>
### أنواع الأحداث
يقبل `on_events` أي مجموعة من قيم `CheckpointEventType`. الافتراضي `["task_completed"]` يكتب نقطة حفظ لكل مهمة منتهية، و`["*"]` يطابق جميع الأحداث.
<Warning>
`["*"]` والأحداث عالية التردد مثل `llm_call_completed` تكتب نقاط حفظ كثيرة وقد تضر بالاداء. استخدمها مع `max_checkpoints`.
الـ Checkpointing في اصدار مبكر. قد تتغير واجهات البرمجة في الاصدارات المستقبلية.
</Warning>
<Expandable title="جميع الأحداث المدعومة">
## نظرة عامة
- **Task** — `task_started`, `task_completed`, `task_failed`, `task_evaluation`
- **Crew** — `crew_kickoff_started`, `crew_kickoff_completed`, `crew_kickoff_failed`, `crew_train_started`, `crew_train_completed`, `crew_train_failed`, `crew_test_started`, `crew_test_completed`, `crew_test_failed`, `crew_test_result`
- **Agent** — `agent_execution_started`, `agent_execution_completed`, `agent_execution_error`, `lite_agent_execution_started`, `lite_agent_execution_completed`, `lite_agent_execution_error`, `agent_evaluation_started`, `agent_evaluation_completed`, `agent_evaluation_failed`
- **Flow** — `flow_created`, `flow_started`, `flow_finished`, `flow_paused`, `method_execution_started`, `method_execution_finished`, `method_execution_failed`, `method_execution_paused`, `human_feedback_requested`, `human_feedback_received`, `flow_input_requested`, `flow_input_received`
- **LLM** — `llm_call_started`, `llm_call_completed`, `llm_call_failed`, `llm_stream_chunk`, `llm_thinking_chunk`
- **LLM Guardrail** — `llm_guardrail_started`, `llm_guardrail_completed`, `llm_guardrail_failed`
- **Tool** — `tool_usage_started`, `tool_usage_finished`, `tool_usage_error`, `tool_validate_input_error`, `tool_selection_error`, `tool_execution_error`
- **Memory** — `memory_save_started`, `memory_save_completed`, `memory_save_failed`, `memory_query_started`, `memory_query_completed`, `memory_query_failed`, `memory_retrieval_started`, `memory_retrieval_completed`, `memory_retrieval_failed`
- **Knowledge** — `knowledge_search_query_started`, `knowledge_search_query_completed`, `knowledge_query_started`, `knowledge_query_completed`, `knowledge_query_failed`, `knowledge_search_query_failed`
- **Reasoning** — `agent_reasoning_started`, `agent_reasoning_completed`, `agent_reasoning_failed`
- **MCP** — `mcp_connection_started`, `mcp_connection_completed`, `mcp_connection_failed`, `mcp_tool_execution_started`, `mcp_tool_execution_completed`, `mcp_tool_execution_failed`, `mcp_config_fetch_failed`
- **Observation** — `step_observation_started`, `step_observation_completed`, `step_observation_failed`, `plan_refinement`, `plan_replan_triggered`, `goal_achieved_early`
- **Skill** — `skill_discovery_started`, `skill_discovery_completed`, `skill_loaded`, `skill_activated`, `skill_load_failed`
- **Logging** — `agent_logs_started`, `agent_logs_execution`
- **A2A** — `a2a_delegation_started`, `a2a_delegation_completed`, `a2a_conversation_started`, `a2a_conversation_completed`, `a2a_message_sent`, `a2a_response_received`, `a2a_polling_started`, `a2a_polling_status`, `a2a_push_notification_registered`, `a2a_push_notification_received`, `a2a_push_notification_sent`, `a2a_push_notification_timeout`, `a2a_streaming_started`, `a2a_streaming_chunk`, `a2a_agent_card_fetched`, `a2a_authentication_failed`, `a2a_artifact_received`, `a2a_connection_error`, `a2a_server_task_started`, `a2a_server_task_completed`, `a2a_server_task_canceled`, `a2a_server_task_failed`, `a2a_parallel_delegation_started`, `a2a_parallel_delegation_completed`, `a2a_transport_negotiated`, `a2a_content_type_negotiated`, `a2a_context_created`, `a2a_context_expired`, `a2a_context_idle`, `a2a_context_completed`, `a2a_context_pruned`
- **إشارات النظام** — `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGTSTP`, `SIGCONT`
- **حرف بدل** — `"*"` يطابق جميع الأحداث.
يقوم الـ Checkpointing بحفظ حالة التنفيذ تلقائيا اثناء التشغيل. اذا فشل طاقم او تدفق او وكيل اثناء التنفيذ، يمكنك الاستعادة من اخر نقطة حفظ والاستئناف دون اعادة تنفيذ العمل المكتمل.
</Expandable>
## البداية السريعة
### مزودات التخزين
```python
from crewai import Crew, CheckpointConfig
<ParamField path="JsonProvider" type="provider">
ملف واحد لكل نقطة حفظ بصيغة `<timestamp>_<uuid>.json` داخل `location`.
</ParamField>
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=True, # يستخدم الافتراضيات: ./.checkpoints, عند task_completed
)
result = crew.kickoff()
```
<ParamField path="SqliteProvider" type="provider">
ملف قاعدة بيانات واحد في `location` مع journaling WAL.
</ParamField>
تتم كتابة ملفات نقاط الحفظ في `./.checkpoints/` بعد اكتمال كل مهمة.
### سطر الأوامر
## التكوين
| الامر | الغرض |
|:------|:------|
| `crewai checkpoint` | تشغيل TUI؛ كشف التخزين تلقائيا. |
| `crewai checkpoint --location <path>` | تشغيل TUI على موقع محدد. |
| `crewai checkpoint list <path>` | سرد نقاط الحفظ. |
| `crewai checkpoint info <path>` | تفقد ملف نقطة حفظ أو آخر مدخل في قاعدة بيانات SQLite. |
استخدم `CheckpointConfig` للتحكم الكامل:
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
### حقول CheckpointConfig
| الحقل | النوع | الافتراضي | الوصف |
|:------|:------|:----------|:------|
| `location` | `str` | `"./.checkpoints"` | مسار ملفات نقاط الحفظ |
| `on_events` | `list[str]` | `["task_completed"]` | انواع الاحداث التي تطلق نقطة حفظ |
| `provider` | `BaseProvider` | `JsonProvider()` | واجهة التخزين |
| `max_checkpoints` | `int \| None` | `None` | الحد الاقصى للملفات؛ يتم حذف الاقدم اولا |
### الوراثة والانسحاب
يقبل حقل `checkpoint` في Crew و Flow و Agent قيم `CheckpointConfig` او `True` او `False` او `None`:
| القيمة | السلوك |
|:-------|:-------|
| `None` (افتراضي) | يرث من الاصل. الوكيل يرث اعدادات الطاقم. |
| `True` | تفعيل بالاعدادات الافتراضية. |
| `False` | انسحاب صريح. يوقف الوراثة من الاصل. |
| `CheckpointConfig(...)` | اعدادات مخصصة. |
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...), # يرث checkpoint من الطاقم
Agent(role="Writer", ..., checkpoint=False), # منسحب، بدون نقاط حفظ
],
tasks=[...],
checkpoint=True,
)
```
## الاستئناف من نقطة حفظ
```python
# استعادة واستئناف
crew = Crew.from_checkpoint("./my_checkpoints/20260407T120000_abc123.json")
result = crew.kickoff() # يستأنف من اخر مهمة مكتملة
```
يتخطى الطاقم المستعاد المهام المكتملة ويستأنف من اول مهمة غير مكتملة.
## يعمل على Crew و Flow و Agent
### Crew
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
المشغل الافتراضي: `task_completed` (نقطة حفظ واحدة لكل مهمة مكتملة).
### Flow
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
# استئناف
flow = MyFlow.from_checkpoint("./flow_cp/20260407T120000_abc123.json")
result = flow.kickoff()
```
### Agent
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
## مزودات التخزين
يتضمن CrewAI مزودي تخزين لنقاط الحفظ.
### JsonProvider (افتراضي)
يكتب كل نقطة حفظ كملف JSON منفصل.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
### SqliteProvider
يخزن جميع نقاط الحفظ في ملف قاعدة بيانات SQLite واحد.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
),
)
```
## انواع الاحداث
يقبل حقل `on_events` اي مجموعة من سلاسل انواع الاحداث. الخيارات الشائعة:
| حالة الاستخدام | الاحداث |
|:---------------|:--------|
| بعد كل مهمة (Crew) | `["task_completed"]` |
| بعد كل طريقة في التدفق | `["method_execution_finished"]` |
| بعد تنفيذ الوكيل | `["agent_execution_completed"]`, `["lite_agent_execution_completed"]` |
| عند اكتمال الطاقم فقط | `["crew_kickoff_completed"]` |
| بعد كل استدعاء LLM | `["llm_call_completed"]` |
| على كل شيء | `["*"]` |
<Warning>
استخدام `["*"]` او احداث عالية التردد مثل `llm_call_completed` سيكتب العديد من ملفات نقاط الحفظ وقد يؤثر على الاداء. استخدم `max_checkpoints` للحد من استخدام المساحة.
</Warning>
## نقاط الحفظ اليدوية
للتحكم الكامل، سجل معالج الاحداث الخاص بك واستدع `state.checkpoint()` مباشرة:
```python
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
# معالج متزامن
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"تم حفظ نقطة الحفظ: {path}")
# معالج غير متزامن
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"تم حفظ نقطة الحفظ: {path}")
```
وسيط `state` هو `RuntimeState` الذي يتم تمريره تلقائيا بواسطة ناقل الاحداث عندما يقبل المعالج 3 معاملات. يمكنك تسجيل معالجات على اي نوع حدث مدرج في وثائق [Event Listeners](/ar/concepts/event-listener).
الـ Checkpointing يعمل بافضل جهد: اذا فشلت كتابة نقطة حفظ، يتم تسجيل الخطأ ولكن التنفيذ يستمر دون انقطاع.

View File

@@ -5,403 +5,301 @@ icon: floppy-disk
mode: "wide"
---
Checkpointing saves a snapshot of execution state during a run so a crew, flow, or agent can resume after a failure or be forked into an alternate branch.
<CardGroup cols={2}>
<Card title="Explanation" icon="lightbulb" href="#explanation">
How checkpointing works: events, storage, and inheritance.
</Card>
<Card title="Tutorial" icon="graduation-cap" href="#tutorial-resume-a-failing-crew">
A 5-minute walkthrough: run, interrupt, resume.
</Card>
<Card title="How-to guides" icon="screwdriver-wrench" href="#how-to-guides">
Task-focused recipes for common workflows.
</Card>
<Card title="Reference" icon="book" href="#reference">
`CheckpointConfig`, events, providers, and CLI.
</Card>
</CardGroup>
## Explanation
### What a checkpoint is
A checkpoint captures everything CrewAI needs to recreate a run mid-flight: the full state of the crew, flow, or agent — configuration, agent memory and knowledge sources, task progress, intermediate outputs — alongside the kickoff inputs, the event history up to that point, and a lineage ID that ties the checkpoint to the run it came from.
Restoring rebuilds that state and continues. Completed tasks are skipped, memory and knowledge are rehydrated, and downstream work runs against the same outputs the original run produced. Forking does the same restore under a new lineage, so the new branch and the original run can write checkpoints side by side without overwriting each other.
### When checkpoints are written
Checkpointing is event-driven. The runtime subscribes to events you select via `on_events` and writes a checkpoint each time one fires. The default `task_completed` produces one checkpoint per finished task — a sensible tradeoff between granularity and disk use. Higher-frequency events like `llm_call_completed` are available for fine-grained recovery but write far more files.
### Storage
Two providers ship with CrewAI:
- `JsonProvider` writes one file per checkpoint. Human-readable and easy to inspect.
- `SqliteProvider` writes to a single SQLite database. Better for high-frequency checkpointing.
Both prune oldest checkpoints when `max_checkpoints` is set.
<Note>
Checkpoint writes are best-effort. A failed checkpoint is logged but does not interrupt the run.
</Note>
### Inheritance model
`Crew`, `Flow`, and `Agent` all accept a `checkpoint` argument. Children inherit from their parent unless they set their own value or pass `False` to opt out. Enable checkpointing once on the crew and every agent participates, or selectively exclude one agent.
## Tutorial: Resume a failing crew
This walkthrough takes ~5 minutes. You will run a two-task crew, kill it midway, and resume from the saved checkpoint.
<Steps>
<Step title="Create the crew with checkpointing enabled">
```python
from crewai import Agent, Crew, Task
researcher = Agent(role="Researcher", goal="Research", backstory="Expert")
writer = Agent(role="Writer", goal="Write", backstory="Expert")
crew = Crew(
agents=[researcher, writer],
tasks=[
Task(description="Research AI trends", agent=researcher, expected_output="bullets"),
Task(description="Write a summary", agent=writer, expected_output="paragraph"),
],
checkpoint=True,
)
```
</Step>
<Step title="Run it and interrupt after the first task">
```python
result = crew.kickoff()
```
Press `Ctrl+C` after the first task finishes. Look in `./.checkpoints/` — a file named `<timestamp>_<uuid>.json` is the checkpoint.
</Step>
<Step title="Resume from the checkpoint">
```python
from crewai import CheckpointConfig
result = crew.kickoff(
from_checkpoint=CheckpointConfig(
restore_from="./.checkpoints/<timestamp>_<uuid>.json",
),
)
```
The research task is skipped, the writer runs against the saved research output, and the crew finishes.
</Step>
</Steps>
## How-to guides
<AccordionGroup>
<Accordion title="Enable checkpointing with defaults" icon="play">
```python
crew = Crew(agents=[...], tasks=[...], checkpoint=True)
```
Writes to `./.checkpoints/` on every `task_completed`.
</Accordion>
<Accordion title="Customize storage and frequency" icon="sliders">
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
</Accordion>
<Accordion title="Choose a storage provider" icon="database">
<CodeGroup>
```python JsonProvider
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
```python SqliteProvider
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
max_checkpoints=50,
),
)
```
</CodeGroup>
<Tip>
SQLite enables WAL journal mode for concurrent reads. Prefer it for high-frequency checkpointing.
</Tip>
</Accordion>
<Accordion title="Opt one agent out" icon="user-slash">
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...),
Agent(role="Writer", ..., checkpoint=False),
],
tasks=[...],
checkpoint=True,
)
```
</Accordion>
<Accordion title="Fork into a new branch" icon="code-branch">
`fork()` restores a checkpoint under a fresh lineage so the new run does not collide with the original.
```python
config = CheckpointConfig(restore_from="./my_checkpoints/<file>.json")
crew = Crew.fork(config, branch="experiment-a")
result = crew.kickoff(inputs={"strategy": "aggressive"})
```
The `branch` label is optional; one is generated if omitted.
</Accordion>
<Accordion title="Checkpoint a Crew, Flow, or Agent" icon="cubes">
<Tabs>
<Tab title="Crew">
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
Default trigger: `task_completed`.
</Tab>
<Tab title="Flow">
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
```
</Tab>
<Tab title="Agent">
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
</Tab>
</Tabs>
</Accordion>
<Accordion title="Write a checkpoint manually" icon="code">
Register a handler on any event and call `state.checkpoint()`.
<CodeGroup>
```python Sync
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"Saved checkpoint: {path}")
```
```python Async
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"Saved checkpoint: {path}")
```
</CodeGroup>
A `state` argument is supplied automatically when the handler takes three parameters. See [Event Listeners](/en/concepts/event-listener) for the full event catalog.
</Accordion>
<Accordion title="Browse, resume, and fork from the CLI" icon="terminal">
```bash
crewai checkpoint # auto-detects .checkpoints/ or .checkpoints.db
crewai checkpoint --location ./my_checkpoints
crewai checkpoint --location ./.checkpoints.db
```
<Frame caption="Checkpoint tree — branches and forks nest under their parent.">
<img src="/images/checkpoint-tui-tree.png" alt="Checkpoint TUI tree view" />
</Frame>
The left panel groups checkpoints by branch; forks nest under their parent. Selecting a checkpoint opens the detail panel with metadata, entity state, and task progress. **Resume** continues the run; **Fork** starts a new branch.
<Frame caption="Overview tab — metadata, entity state, and run summary.">
<img src="/images/checkpoint-tui-detail-overview.png" alt="Checkpoint detail overview tab" />
</Frame>
The detail panel exposes two editable areas:
- **Inputs** — original kickoff inputs, pre-filled and editable.
<Frame>
<img src="/images/checkpoint-tui-detail-inputs.png" alt="Editable kickoff inputs" />
</Frame>
- **Task outputs** — outputs of completed tasks. Editing an output and hitting **Fork** invalidates downstream tasks so they re-run against the modified context.
<Frame>
<img src="/images/checkpoint-tui-detail-tasks.png" alt="Editable task outputs" />
</Frame>
<Frame caption="Fork view — confirm a new branch from the selected checkpoint.">
<img src="/images/checkpoint-tui-details-fork.png" alt="Fork confirmation panel" />
</Frame>
<Tip>
Useful for "what if" exploration: fork, tweak, observe.
</Tip>
</Accordion>
<Accordion title="Inspect checkpoints without the TUI" icon="magnifying-glass">
```bash
crewai checkpoint list ./my_checkpoints
crewai checkpoint info ./my_checkpoints/<file>.json
crewai checkpoint info ./.checkpoints.db
```
</Accordion>
</AccordionGroup>
## Reference
### `CheckpointConfig`
<ParamField path="location" type="str" default='"./.checkpoints"'>
Storage destination. A directory for `JsonProvider`, a database file path for `SqliteProvider`.
</ParamField>
<ParamField path="on_events" type="list[CheckpointEventType]" default='["task_completed"]'>
Event types that trigger a checkpoint. `CheckpointEventType` is a `Literal` — your type checker will autocomplete and reject unsupported values. See [event types](#event-types) for the full list.
</ParamField>
<ParamField path="provider" type="BaseProvider" default="JsonProvider()">
Storage backend. Either `JsonProvider` or `SqliteProvider`.
</ParamField>
<ParamField path="max_checkpoints" type="int | None" default="None">
Maximum checkpoints to retain. Oldest are pruned after each write.
</ParamField>
<ParamField path="restore_from" type="Path | str | None" default="None">
Checkpoint to restore from when passed via `from_checkpoint`.
</ParamField>
### `checkpoint` field values
Accepted by `Crew`, `Flow`, and `Agent`.
<ParamField path="None" type="default">
Inherit from parent.
</ParamField>
<ParamField path="True" type="bool">
Enable with defaults.
</ParamField>
<ParamField path="False" type="bool">
Explicit opt-out. Stops inheritance.
</ParamField>
<ParamField path="CheckpointConfig(...)" type="CheckpointConfig">
Custom configuration.
</ParamField>
### Event types
`on_events` accepts any combination of `CheckpointEventType` values. The default `["task_completed"]` writes one checkpoint per finished task; `["*"]` matches every event.
<Warning>
`["*"]` and high-frequency events like `llm_call_completed` write many checkpoints and can degrade performance. Pair them with `max_checkpoints`.
Checkpointing is in early release. APIs may change in future versions.
</Warning>
<Expandable title="All supported events">
## Overview
- **Task** — `task_started`, `task_completed`, `task_failed`, `task_evaluation`
- **Crew** — `crew_kickoff_started`, `crew_kickoff_completed`, `crew_kickoff_failed`, `crew_train_started`, `crew_train_completed`, `crew_train_failed`, `crew_test_started`, `crew_test_completed`, `crew_test_failed`, `crew_test_result`
- **Agent** — `agent_execution_started`, `agent_execution_completed`, `agent_execution_error`, `lite_agent_execution_started`, `lite_agent_execution_completed`, `lite_agent_execution_error`, `agent_evaluation_started`, `agent_evaluation_completed`, `agent_evaluation_failed`
- **Flow** — `flow_created`, `flow_started`, `flow_finished`, `flow_paused`, `method_execution_started`, `method_execution_finished`, `method_execution_failed`, `method_execution_paused`, `human_feedback_requested`, `human_feedback_received`, `flow_input_requested`, `flow_input_received`
- **LLM** — `llm_call_started`, `llm_call_completed`, `llm_call_failed`, `llm_stream_chunk`, `llm_thinking_chunk`
- **LLM Guardrail** — `llm_guardrail_started`, `llm_guardrail_completed`, `llm_guardrail_failed`
- **Tool** — `tool_usage_started`, `tool_usage_finished`, `tool_usage_error`, `tool_validate_input_error`, `tool_selection_error`, `tool_execution_error`
- **Memory** — `memory_save_started`, `memory_save_completed`, `memory_save_failed`, `memory_query_started`, `memory_query_completed`, `memory_query_failed`, `memory_retrieval_started`, `memory_retrieval_completed`, `memory_retrieval_failed`
- **Knowledge** — `knowledge_search_query_started`, `knowledge_search_query_completed`, `knowledge_query_started`, `knowledge_query_completed`, `knowledge_query_failed`, `knowledge_search_query_failed`
- **Reasoning** — `agent_reasoning_started`, `agent_reasoning_completed`, `agent_reasoning_failed`
- **MCP** — `mcp_connection_started`, `mcp_connection_completed`, `mcp_connection_failed`, `mcp_tool_execution_started`, `mcp_tool_execution_completed`, `mcp_tool_execution_failed`, `mcp_config_fetch_failed`
- **Observation** — `step_observation_started`, `step_observation_completed`, `step_observation_failed`, `plan_refinement`, `plan_replan_triggered`, `goal_achieved_early`
- **Skill** — `skill_discovery_started`, `skill_discovery_completed`, `skill_loaded`, `skill_activated`, `skill_load_failed`
- **Logging** — `agent_logs_started`, `agent_logs_execution`
- **A2A** — `a2a_delegation_started`, `a2a_delegation_completed`, `a2a_conversation_started`, `a2a_conversation_completed`, `a2a_message_sent`, `a2a_response_received`, `a2a_polling_started`, `a2a_polling_status`, `a2a_push_notification_registered`, `a2a_push_notification_received`, `a2a_push_notification_sent`, `a2a_push_notification_timeout`, `a2a_streaming_started`, `a2a_streaming_chunk`, `a2a_agent_card_fetched`, `a2a_authentication_failed`, `a2a_artifact_received`, `a2a_connection_error`, `a2a_server_task_started`, `a2a_server_task_completed`, `a2a_server_task_canceled`, `a2a_server_task_failed`, `a2a_parallel_delegation_started`, `a2a_parallel_delegation_completed`, `a2a_transport_negotiated`, `a2a_content_type_negotiated`, `a2a_context_created`, `a2a_context_expired`, `a2a_context_idle`, `a2a_context_completed`, `a2a_context_pruned`
- **System signals** — `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGTSTP`, `SIGCONT`
- **Wildcard** — `"*"` matches every event.
Checkpointing automatically saves execution state during a run. If a crew, flow, or agent fails mid-execution, you can restore from the last checkpoint and resume without re-running completed work.
</Expandable>
## Quick Start
### Storage providers
```python
from crewai import Crew, CheckpointConfig
<ParamField path="JsonProvider" type="provider">
One file per checkpoint, named `<timestamp>_<uuid>.json` inside `location`.
</ParamField>
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=True, # uses defaults: ./.checkpoints, on task_completed
)
result = crew.kickoff()
```
<ParamField path="SqliteProvider" type="provider">
Single database file at `location` with WAL journaling.
</ParamField>
Checkpoint files are written to `./.checkpoints/` after each completed task.
### CLI
## Configuration
| Command | Purpose |
|:--------|:--------|
| `crewai checkpoint` | Launch the TUI; auto-detect storage. |
| `crewai checkpoint --location <path>` | Launch the TUI against a specific location. |
| `crewai checkpoint list <path>` | List checkpoints. |
| `crewai checkpoint info <path>` | Inspect a checkpoint file or the latest entry in a SQLite database. |
Use `CheckpointConfig` for full control:
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
### CheckpointConfig Fields
| Field | Type | Default | Description |
|:------|:-----|:--------|:------------|
| `location` | `str` | `"./.checkpoints"` | Storage destination — a directory for `JsonProvider`, a database file path for `SqliteProvider` |
| `on_events` | `list[str]` | `["task_completed"]` | Event types that trigger a checkpoint |
| `provider` | `BaseProvider` | `JsonProvider()` | Storage backend |
| `max_checkpoints` | `int \| None` | `None` | Max checkpoints to keep. Oldest are pruned after each write. Pruning is handled by the provider. |
| `restore_from` | `Path \| str \| None` | `None` | Path to a checkpoint to restore from. Used when passing config via a kickoff method's `from_checkpoint` parameter. |
### Inheritance and Opt-Out
The `checkpoint` field on Crew, Flow, and Agent accepts `CheckpointConfig`, `True`, `False`, or `None`:
| Value | Behavior |
|:------|:---------|
| `None` (default) | Inherit from parent. An agent inherits its crew's config. |
| `True` | Enable with defaults. |
| `False` | Explicit opt-out. Stops inheritance from parent. |
| `CheckpointConfig(...)` | Custom configuration. |
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...), # inherits crew's checkpoint
Agent(role="Writer", ..., checkpoint=False), # opted out, no checkpoints
],
tasks=[...],
checkpoint=True,
)
```
## Resuming from a Checkpoint
Pass a `CheckpointConfig` with `restore_from` to any kickoff method. The crew restores from that checkpoint, skips completed tasks, and resumes.
```python
from crewai import Crew, CheckpointConfig
crew = Crew(agents=[...], tasks=[...])
result = crew.kickoff(
from_checkpoint=CheckpointConfig(
restore_from="./my_checkpoints/20260407T120000_abc123.json",
),
)
```
Remaining `CheckpointConfig` fields apply to the new run, so checkpointing continues after the restore.
You can also use the classmethod directly:
```python
config = CheckpointConfig(restore_from="./my_checkpoints/20260407T120000_abc123.json")
crew = Crew.from_checkpoint(config)
result = crew.kickoff()
```
## Forking from a Checkpoint
`fork()` restores a checkpoint and starts a new execution branch. Useful for exploring alternative paths from the same point.
```python
from crewai import Crew, CheckpointConfig
config = CheckpointConfig(restore_from="./my_checkpoints/20260407T120000_abc123.json")
crew = Crew.fork(config, branch="experiment-a")
result = crew.kickoff(inputs={"strategy": "aggressive"})
```
Each fork gets a unique lineage ID so checkpoints from different branches don't collide. The `branch` label is optional and auto-generated if omitted.
## Works on Crew, Flow, and Agent
### Crew
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
Default trigger: `task_completed` (one checkpoint per finished task).
### Flow
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
# Resume
config = CheckpointConfig(restore_from="./flow_cp/20260407T120000_abc123.json")
flow = MyFlow.from_checkpoint(config)
result = flow.kickoff()
```
### Agent
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
## Storage Providers
CrewAI ships with two checkpoint storage providers.
### JsonProvider (default)
Writes each checkpoint as a separate JSON file. Simple, human-readable, easy to inspect.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(), # this is the default
max_checkpoints=5, # prunes oldest files
),
)
```
Files are named `<timestamp>_<uuid>.json` inside the location directory.
### SqliteProvider
Stores all checkpoints in a single SQLite database file. Better for high-frequency checkpointing and avoids many small files.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
max_checkpoints=50,
),
)
```
WAL journal mode is enabled for concurrent read access.
## Event Types
The `on_events` field accepts any combination of event type strings. Common choices:
| Use Case | Events |
|:---------|:-------|
| After each task (Crew) | `["task_completed"]` |
| After each flow method | `["method_execution_finished"]` |
| After agent execution | `["agent_execution_completed"]`, `["lite_agent_execution_completed"]` |
| On crew completion only | `["crew_kickoff_completed"]` |
| After every LLM call | `["llm_call_completed"]` |
| On everything | `["*"]` |
<Warning>
Using `["*"]` or high-frequency events like `llm_call_completed` will write many checkpoint files and may impact performance. Use `max_checkpoints` to limit disk usage.
</Warning>
## Manual Checkpointing
For full control, register your own event handler and call `state.checkpoint()` directly:
```python
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
# Sync handler
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"Saved checkpoint: {path}")
# Async handler
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"Saved checkpoint: {path}")
```
The `state` argument is the `RuntimeState` passed automatically by the event bus when your handler accepts 3 parameters. You can register handlers on any event type listed in the [Event Listeners](/en/concepts/event-listener) documentation.
Checkpointing is best-effort: if a checkpoint write fails, the error is logged but execution continues uninterrupted.
## CLI
The `crewai checkpoint` command gives you a TUI for browsing, inspecting, resuming, and forking checkpoints. It auto-detects whether your checkpoints are JSON files or a SQLite database.
```bash
# Launch the TUI — auto-detects .checkpoints/ or .checkpoints.db
crewai checkpoint
# Point at a specific location
crewai checkpoint --location ./my_checkpoints
crewai checkpoint --location ./.checkpoints.db
```
<Frame>
<img src="/images/checkpointing.png" alt="Checkpoint TUI" />
</Frame>
The left panel is a tree view. Checkpoints are grouped by branch, and forks nest under the checkpoint they diverged from. Select a checkpoint to see its metadata, entity state, and task progress in the detail panel. Hit **Resume** to pick up where it left off, or **Fork** to start a new branch from that point.
### Editing inputs and task outputs
When a checkpoint is selected, the detail panel shows:
- **Inputs** — if the original kickoff had inputs (e.g. `{topic}`), they appear as editable fields pre-filled with the original values. Change them before resuming or forking.
- **Task outputs** — completed tasks show their output in editable text areas. Edit a task's output to change the context that downstream tasks receive. When you modify a task output and hit Fork, all subsequent tasks are invalidated and re-run with the new context.
This is useful for "what if" exploration — fork from a checkpoint, tweak a task's result, and see how it changes downstream behavior.
### Subcommands
```bash
# List all checkpoints
crewai checkpoint list ./my_checkpoints
# Inspect a specific checkpoint
crewai checkpoint info ./my_checkpoints/20260407T120000_abc123.json
# Inspect latest in a SQLite database
crewai checkpoint info ./.checkpoints.db
```

Binary file not shown.

Before

Width:  |  Height:  |  Size: 169 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 200 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 189 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 235 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 156 KiB

View File

@@ -5,403 +5,225 @@ icon: floppy-disk
mode: "wide"
---
체크포인팅은 실행 중 실행 상태의 스냅샷을 저장하여 크루, 플로우, 에이전트가 실패 후 재개하거나 대체 브랜치로 분기될 수 있도록 합니다.
<CardGroup cols={2}>
<Card title="설명" icon="lightbulb" href="#설명">
체크포인팅의 작동 방식: 이벤트, 스토리지, 상속.
</Card>
<Card title="튜토리얼" icon="graduation-cap" href="#튜토리얼-실패한-크루-재개하기">
5분 가이드: 실행, 중단, 재개.
</Card>
<Card title="사용 방법" icon="screwdriver-wrench" href="#사용-방법">
일반적인 워크플로우를 위한 작업 중심 레시피.
</Card>
<Card title="레퍼런스" icon="book" href="#레퍼런스">
`CheckpointConfig`, 이벤트, 프로바이더, CLI.
</Card>
</CardGroup>
## 설명
### 체크포인트란
체크포인트는 실행 중인 작업을 재현하기 위해 CrewAI가 필요한 모든 것을 캡처합니다: 크루, 플로우 또는 에이전트의 전체 상태 — 구성, 에이전트의 메모리 및 지식 소스, 태스크 진행 상황, 중간 출력값 — 그리고 kickoff 입력, 해당 시점까지의 이벤트 기록, 그리고 체크포인트를 원본 실행에 연결하는 lineage ID를 포함합니다.
복원하면 해당 상태를 재구성하고 계속 진행합니다. 완료된 태스크는 건너뛰고, 메모리와 지식은 재수화되며, 다운스트림 작업은 원본 실행이 생성한 동일한 출력을 기반으로 실행됩니다. 포크하면 새 lineage 아래에서 동일한 복원을 수행하여 새 브랜치와 원본 실행이 서로 덮어쓰지 않고 나란히 체크포인트를 기록할 수 있습니다.
### 체크포인트가 기록되는 시점
체크포인팅은 이벤트 기반입니다. 런타임은 `on_events`로 선택한 이벤트를 구독하고, 이벤트가 발생할 때마다 체크포인트를 기록합니다. 기본값 `task_completed`는 완료된 태스크당 하나의 체크포인트를 생성합니다 — 세분화와 디스크 사용의 합리적인 균형입니다. `llm_call_completed`와 같은 고빈도 이벤트는 더 세밀한 복구를 위해 사용 가능하지만 훨씬 많은 파일을 기록합니다.
### 스토리지
CrewAI에는 두 가지 프로바이더가 포함되어 있습니다:
- `JsonProvider`는 체크포인트당 하나의 파일을 기록합니다. 사람이 읽기 쉽고 검사하기 편리합니다.
- `SqliteProvider`는 단일 SQLite 데이터베이스에 기록합니다. 고빈도 체크포인팅에 적합합니다.
`max_checkpoints`가 설정되면 두 프로바이더 모두 가장 오래된 체크포인트를 자동으로 제거합니다.
<Note>
체크포인트 기록은 best-effort 방식입니다. 실패한 체크포인트는 로그에 기록되지만 실행을 중단시키지 않습니다.
</Note>
### 상속 모델
`Crew`, `Flow`, `Agent` 모두 `checkpoint` 인수를 받습니다. 자식은 자체 값을 설정하거나 `False`를 전달하여 옵트아웃하지 않는 한 부모로부터 상속합니다. 크루에서 체크포인팅을 한 번 활성화하면 모든 에이전트가 참여하거나, 특정 에이전트만 선택적으로 제외할 수 있습니다.
## 튜토리얼: 실패한 크루 재개하기
이 가이드는 약 5분이 소요됩니다. 두 개의 태스크가 있는 크루를 실행하고 중간에 종료한 다음, 저장된 체크포인트에서 재개합니다.
<Steps>
<Step title="체크포인팅이 활성화된 크루를 생성합니다">
```python
from crewai import Agent, Crew, Task
researcher = Agent(role="Researcher", goal="Research", backstory="Expert")
writer = Agent(role="Writer", goal="Write", backstory="Expert")
crew = Crew(
agents=[researcher, writer],
tasks=[
Task(description="Research AI trends", agent=researcher, expected_output="bullets"),
Task(description="Write a summary", agent=writer, expected_output="paragraph"),
],
checkpoint=True,
)
```
</Step>
<Step title="실행하고 첫 번째 태스크 후에 중단합니다">
```python
result = crew.kickoff()
```
첫 번째 태스크가 완료된 후 `Ctrl+C`를 누릅니다. `./.checkpoints/` 디렉토리에서 `<timestamp>_<uuid>.json` 형식의 파일이 체크포인트입니다.
</Step>
<Step title="체크포인트에서 재개합니다">
```python
from crewai import CheckpointConfig
result = crew.kickoff(
from_checkpoint=CheckpointConfig(
restore_from="./.checkpoints/<timestamp>_<uuid>.json",
),
)
```
연구 태스크는 건너뛰고, 작성자는 저장된 연구 출력에 대해 실행되며, 크루가 완료됩니다.
</Step>
</Steps>
## 사용 방법
<AccordionGroup>
<Accordion title="기본값으로 체크포인팅 활성화" icon="play">
```python
crew = Crew(agents=[...], tasks=[...], checkpoint=True)
```
`task_completed` 이벤트마다 `./.checkpoints/`에 기록합니다.
</Accordion>
<Accordion title="스토리지와 빈도 사용자 정의" icon="sliders">
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
</Accordion>
<Accordion title="스토리지 프로바이더 선택" icon="database">
<CodeGroup>
```python JsonProvider
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
```python SqliteProvider
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
max_checkpoints=50,
),
)
```
</CodeGroup>
<Tip>
SQLite는 동시 읽기를 위해 WAL 저널 모드를 활성화합니다. 고빈도 체크포인팅에는 SQLite를 선호하세요.
</Tip>
</Accordion>
<Accordion title="특정 에이전트 옵트아웃" icon="user-slash">
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...),
Agent(role="Writer", ..., checkpoint=False),
],
tasks=[...],
checkpoint=True,
)
```
</Accordion>
<Accordion title="새 브랜치로 포크" icon="code-branch">
`fork()`는 새 lineage 아래에 체크포인트를 복원하여 새 실행이 원본과 충돌하지 않도록 합니다.
```python
config = CheckpointConfig(restore_from="./my_checkpoints/<file>.json")
crew = Crew.fork(config, branch="experiment-a")
result = crew.kickoff(inputs={"strategy": "aggressive"})
```
`branch` 레이블은 선택 사항이며, 생략하면 자동 생성됩니다.
</Accordion>
<Accordion title="Crew, Flow, Agent 체크포인트" icon="cubes">
<Tabs>
<Tab title="Crew">
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
기본 트리거: `task_completed`.
</Tab>
<Tab title="Flow">
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
```
</Tab>
<Tab title="Agent">
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
</Tab>
</Tabs>
</Accordion>
<Accordion title="수동으로 체크포인트 기록" icon="code">
모든 이벤트에 핸들러를 등록하고 `state.checkpoint()`를 호출합니다.
<CodeGroup>
```python Sync
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"체크포인트 저장: {path}")
```
```python Async
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"체크포인트 저장: {path}")
```
</CodeGroup>
핸들러가 세 개의 매개변수를 받을 때 `state` 인수가 자동으로 제공됩니다. 전체 이벤트 카탈로그는 [Event Listeners](/ko/concepts/event-listener) 문서를 참조하세요.
</Accordion>
<Accordion title="CLI에서 탐색, 재개, 포크" icon="terminal">
```bash
crewai checkpoint # .checkpoints/ 또는 .checkpoints.db 자동 감지
crewai checkpoint --location ./my_checkpoints
crewai checkpoint --location ./.checkpoints.db
```
<Frame caption="체크포인트 트리 — 브랜치와 포크가 부모 아래에 중첩됩니다.">
<img src="/images/checkpoint-tui-tree.png" alt="Checkpoint TUI tree view" />
</Frame>
왼쪽 패널은 체크포인트를 브랜치별로 그룹화하며, 포크는 부모 아래에 중첩됩니다. 체크포인트를 선택하면 메타데이터, 엔티티 상태, 태스크 진행 상황이 있는 세부 정보 패널이 열립니다. **Resume**은 실행을 계속하고, **Fork**는 새 브랜치를 시작합니다.
<Frame caption="개요 탭 — 메타데이터, 엔티티 상태, 실행 요약.">
<img src="/images/checkpoint-tui-detail-overview.png" alt="Checkpoint detail overview tab" />
</Frame>
세부 정보 패널에는 두 개의 편집 가능한 영역이 있습니다:
- **Inputs** — 원래 kickoff의 입력으로, 미리 채워져 있으며 편집 가능합니다.
<Frame>
<img src="/images/checkpoint-tui-detail-inputs.png" alt="Editable kickoff inputs" />
</Frame>
- **태스크 출력** — 완료된 태스크의 출력. 출력을 편집하고 **Fork**를 누르면 다운스트림 태스크가 무효화되어 수정된 컨텍스트로 다시 실행됩니다.
<Frame>
<img src="/images/checkpoint-tui-detail-tasks.png" alt="Editable task outputs" />
</Frame>
<Frame caption="포크 화면 — 선택한 체크포인트에서 새 브랜치를 확인합니다.">
<img src="/images/checkpoint-tui-details-fork.png" alt="Fork confirmation panel" />
</Frame>
<Tip>
"what if" 탐색에 유용합니다: 포크, 조정, 관찰.
</Tip>
</Accordion>
<Accordion title="TUI 없이 체크포인트 검사" icon="magnifying-glass">
```bash
crewai checkpoint list ./my_checkpoints
crewai checkpoint info ./my_checkpoints/<file>.json
crewai checkpoint info ./.checkpoints.db
```
</Accordion>
</AccordionGroup>
## 레퍼런스
### `CheckpointConfig`
<ParamField path="location" type="str" default='"./.checkpoints"'>
스토리지 대상. `JsonProvider`는 디렉토리, `SqliteProvider`는 데이터베이스 파일 경로.
</ParamField>
<ParamField path="on_events" type="list[CheckpointEventType]" default='["task_completed"]'>
체크포인트를 트리거하는 이벤트 타입. `CheckpointEventType`은 `Literal`이므로 타입 체커가 자동 완성하고 지원되지 않는 값을 거부합니다. 전체 목록은 [이벤트 타입](#이벤트-타입) 참조.
</ParamField>
<ParamField path="provider" type="BaseProvider" default="JsonProvider()">
스토리지 백엔드. `JsonProvider` 또는 `SqliteProvider`.
</ParamField>
<ParamField path="max_checkpoints" type="int | None" default="None">
보관할 최대 체크포인트 수. 각 기록 후 가장 오래된 것이 제거됩니다.
</ParamField>
<ParamField path="restore_from" type="Path | str | None" default="None">
`from_checkpoint`를 통해 전달될 때 복원할 체크포인트.
</ParamField>
### `checkpoint` 필드 값
`Crew`, `Flow`, `Agent`에서 사용 가능.
<ParamField path="None" type="기본값">
부모에서 상속.
</ParamField>
<ParamField path="True" type="bool">
기본값으로 활성화.
</ParamField>
<ParamField path="False" type="bool">
명시적 옵트아웃. 상속을 중단합니다.
</ParamField>
<ParamField path="CheckpointConfig(...)" type="CheckpointConfig">
사용자 정의 설정.
</ParamField>
### 이벤트 타입
`on_events`는 `CheckpointEventType` 값의 임의 조합을 받습니다. 기본값 `["task_completed"]`는 완료된 태스크당 하나의 체크포인트를 기록하며, `["*"]`는 모든 이벤트와 일치합니다.
<Warning>
`["*"]` 및 `llm_call_completed`와 같은 고빈도 이벤트는 많은 체크포인트를 기록하고 성능을 저하시킬 수 있습니다. `max_checkpoints`와 함께 사용하세요.
체크포인팅은 초기 릴리스 단계입니다. API는 향후 버전에서 변경될 수 있습니다.
</Warning>
<Expandable title="지원되는 모든 이벤트">
## 개요
- **Task** — `task_started`, `task_completed`, `task_failed`, `task_evaluation`
- **Crew** — `crew_kickoff_started`, `crew_kickoff_completed`, `crew_kickoff_failed`, `crew_train_started`, `crew_train_completed`, `crew_train_failed`, `crew_test_started`, `crew_test_completed`, `crew_test_failed`, `crew_test_result`
- **Agent** — `agent_execution_started`, `agent_execution_completed`, `agent_execution_error`, `lite_agent_execution_started`, `lite_agent_execution_completed`, `lite_agent_execution_error`, `agent_evaluation_started`, `agent_evaluation_completed`, `agent_evaluation_failed`
- **Flow** — `flow_created`, `flow_started`, `flow_finished`, `flow_paused`, `method_execution_started`, `method_execution_finished`, `method_execution_failed`, `method_execution_paused`, `human_feedback_requested`, `human_feedback_received`, `flow_input_requested`, `flow_input_received`
- **LLM** — `llm_call_started`, `llm_call_completed`, `llm_call_failed`, `llm_stream_chunk`, `llm_thinking_chunk`
- **LLM Guardrail** — `llm_guardrail_started`, `llm_guardrail_completed`, `llm_guardrail_failed`
- **Tool** — `tool_usage_started`, `tool_usage_finished`, `tool_usage_error`, `tool_validate_input_error`, `tool_selection_error`, `tool_execution_error`
- **Memory** — `memory_save_started`, `memory_save_completed`, `memory_save_failed`, `memory_query_started`, `memory_query_completed`, `memory_query_failed`, `memory_retrieval_started`, `memory_retrieval_completed`, `memory_retrieval_failed`
- **Knowledge** — `knowledge_search_query_started`, `knowledge_search_query_completed`, `knowledge_query_started`, `knowledge_query_completed`, `knowledge_query_failed`, `knowledge_search_query_failed`
- **Reasoning** — `agent_reasoning_started`, `agent_reasoning_completed`, `agent_reasoning_failed`
- **MCP** — `mcp_connection_started`, `mcp_connection_completed`, `mcp_connection_failed`, `mcp_tool_execution_started`, `mcp_tool_execution_completed`, `mcp_tool_execution_failed`, `mcp_config_fetch_failed`
- **Observation** — `step_observation_started`, `step_observation_completed`, `step_observation_failed`, `plan_refinement`, `plan_replan_triggered`, `goal_achieved_early`
- **Skill** — `skill_discovery_started`, `skill_discovery_completed`, `skill_loaded`, `skill_activated`, `skill_load_failed`
- **Logging** — `agent_logs_started`, `agent_logs_execution`
- **A2A** — `a2a_delegation_started`, `a2a_delegation_completed`, `a2a_conversation_started`, `a2a_conversation_completed`, `a2a_message_sent`, `a2a_response_received`, `a2a_polling_started`, `a2a_polling_status`, `a2a_push_notification_registered`, `a2a_push_notification_received`, `a2a_push_notification_sent`, `a2a_push_notification_timeout`, `a2a_streaming_started`, `a2a_streaming_chunk`, `a2a_agent_card_fetched`, `a2a_authentication_failed`, `a2a_artifact_received`, `a2a_connection_error`, `a2a_server_task_started`, `a2a_server_task_completed`, `a2a_server_task_canceled`, `a2a_server_task_failed`, `a2a_parallel_delegation_started`, `a2a_parallel_delegation_completed`, `a2a_transport_negotiated`, `a2a_content_type_negotiated`, `a2a_context_created`, `a2a_context_expired`, `a2a_context_idle`, `a2a_context_completed`, `a2a_context_pruned`
- **시스템 시그널** — `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGTSTP`, `SIGCONT`
- **와일드카드** — `"*"`는 모든 이벤트와 일치합니다.
체크포인팅은 실행 중 자동으로 실행 상태를 저장합니다. 크루, 플로우 또는 에이전트가 실행 도중 실패하면 마지막 체크포인트에서 복원하여 이미 완료된 작업을 다시 실행하지 않고 재개할 수 있습니다.
</Expandable>
## 빠른 시작
### 스토리지 프로바이더
```python
from crewai import Crew, CheckpointConfig
<ParamField path="JsonProvider" type="provider">
체크포인트당 하나의 파일, `location` 내부에 `<timestamp>_<uuid>.json` 형식으로 명명.
</ParamField>
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=True, # 기본값 사용: ./.checkpoints, task_completed 이벤트
)
result = crew.kickoff()
```
<ParamField path="SqliteProvider" type="provider">
WAL 저널링이 있는 `location`의 단일 데이터베이스 파일.
</ParamField>
각 태스크가 완료된 후 `./.checkpoints/`에 체크포인트 파일이 기록됩니다.
### CLI
## 설정
| 명령 | 목적 |
|:-----|:-----|
| `crewai checkpoint` | TUI 실행; 스토리지 자동 감지. |
| `crewai checkpoint --location <path>` | 특정 위치에 대해 TUI 실행. |
| `crewai checkpoint list <path>` | 체크포인트 나열. |
| `crewai checkpoint info <path>` | 체크포인트 파일 또는 SQLite 데이터베이스의 최신 항목 검사. |
`CheckpointConfig`를 사용하여 세부 설정을 제어합니다:
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
### CheckpointConfig 필드
| 필드 | 타입 | 기본값 | 설명 |
|:-----|:-----|:-------|:-----|
| `location` | `str` | `"./.checkpoints"` | 체크포인트 파일 경로 |
| `on_events` | `list[str]` | `["task_completed"]` | 체크포인트를 트리거하는 이벤트 타입 |
| `provider` | `BaseProvider` | `JsonProvider()` | 스토리지 백엔드 |
| `max_checkpoints` | `int \| None` | `None` | 보관할 최대 파일 수; 오래된 것부터 삭제 |
### 상속 및 옵트아웃
Crew, Flow, Agent의 `checkpoint` 필드는 `CheckpointConfig`, `True`, `False`, `None`을 받습니다:
| 값 | 동작 |
|:---|:-----|
| `None` (기본값) | 부모에서 상속. 에이전트는 크루의 설정을 상속합니다. |
| `True` | 기본값으로 활성화. |
| `False` | 명시적 옵트아웃. 부모 상속을 중단합니다. |
| `CheckpointConfig(...)` | 사용자 정의 설정. |
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...), # 크루의 checkpoint 상속
Agent(role="Writer", ..., checkpoint=False), # 옵트아웃, 체크포인트 없음
],
tasks=[...],
checkpoint=True,
)
```
## 체크포인트에서 재개
```python
# 복원 및 재개
crew = Crew.from_checkpoint("./my_checkpoints/20260407T120000_abc123.json")
result = crew.kickoff() # 마지막으로 완료된 태스크부터 재개
```
복원된 크루는 이미 완료된 태스크를 건너뛰고 첫 번째 미완료 태스크부터 재개합니다.
## Crew, Flow, Agent에서 사용 가능
### Crew
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
기본 트리거: `task_completed` (완료된 태스크당 하나의 체크포인트).
### Flow
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
# 재개
flow = MyFlow.from_checkpoint("./flow_cp/20260407T120000_abc123.json")
result = flow.kickoff()
```
### Agent
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
## 스토리지 프로바이더
CrewAI는 두 가지 체크포인트 스토리지 프로바이더를 제공합니다.
### JsonProvider (기본값)
각 체크포인트를 별도의 JSON 파일로 저장합니다.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
### SqliteProvider
모든 체크포인트를 단일 SQLite 데이터베이스 파일에 저장합니다.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
),
)
```
## 이벤트 타입
`on_events` 필드는 이벤트 타입 문자열의 조합을 받습니다. 일반적인 선택:
| 사용 사례 | 이벤트 |
|:----------|:-------|
| 각 태스크 완료 후 (Crew) | `["task_completed"]` |
| 각 플로우 메서드 완료 후 | `["method_execution_finished"]` |
| 에이전트 실행 완료 후 | `["agent_execution_completed"]`, `["lite_agent_execution_completed"]` |
| 크루 완료 시에만 | `["crew_kickoff_completed"]` |
| 모든 LLM 호출 후 | `["llm_call_completed"]` |
| 모든 이벤트 | `["*"]` |
<Warning>
`["*"]` 또는 `llm_call_completed`와 같은 고빈도 이벤트를 사용하면 많은 체크포인트 파일이 생성되어 성능에 영향을 줄 수 있습니다. `max_checkpoints`를 사용하여 디스크 사용량을 제한하세요.
</Warning>
## 수동 체크포인팅
완전한 제어를 위해 자체 이벤트 핸들러를 등록하고 `state.checkpoint()`를 직접 호출할 수 있습니다:
```python
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
# 동기 핸들러
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"체크포인트 저장: {path}")
# 비동기 핸들러
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"체크포인트 저장: {path}")
```
`state` 인수는 핸들러가 3개의 매개변수를 받을 때 이벤트 버스가 자동으로 전달하는 `RuntimeState`입니다. [Event Listeners](/ko/concepts/event-listener) 문서에 나열된 모든 이벤트 타입에 핸들러를 등록할 수 있습니다.
체크포인팅은 best-effort입니다: 체크포인트 기록이 실패하면 오류가 로그에 기록되지만 실행은 중단 없이 계속됩니다.

View File

@@ -5,403 +5,225 @@ icon: floppy-disk
mode: "wide"
---
O checkpointing salva um snapshot do estado de execucao durante uma execucao para que uma crew, flow ou agente possa retomar apos uma falha ou ser bifurcado em uma branch alternativa.
<CardGroup cols={2}>
<Card title="Explicacao" icon="lightbulb" href="#explicacao">
Como o checkpointing funciona: eventos, armazenamento e heranca.
</Card>
<Card title="Tutorial" icon="graduation-cap" href="#tutorial-retomar-uma-crew-com-falha">
Um passo a passo de 5 minutos: executar, interromper, retomar.
</Card>
<Card title="Guias de uso" icon="screwdriver-wrench" href="#guias-de-uso">
Receitas focadas em tarefas para fluxos comuns.
</Card>
<Card title="Referencia" icon="book" href="#referencia">
`CheckpointConfig`, eventos, provedores e CLI.
</Card>
</CardGroup>
## Explicacao
### O que e um checkpoint
Um checkpoint captura tudo o que o CrewAI precisa para recriar uma execucao em andamento: o estado completo da crew, flow ou agente — configuracao, memoria e fontes de conhecimento dos agentes, progresso das tarefas, saidas intermediarias — junto com os inputs do kickoff, o historico de eventos ate aquele ponto e um ID de linhagem que liga o checkpoint a execucao de origem.
Restaurar reconstroi esse estado e continua. Tarefas concluidas sao puladas, memoria e conhecimento sao reidratados, e o trabalho downstream roda contra as mesmas saidas que a execucao original produziu. Fazer fork executa a mesma restauracao sob uma nova linhagem, para que a nova branch e a execucao original gravem checkpoints lado a lado sem sobrescrever uma a outra.
### Quando os checkpoints sao gravados
O checkpointing e orientado a eventos. O runtime se inscreve nos eventos selecionados em `on_events` e grava um checkpoint sempre que um e disparado. O padrao `task_completed` produz um checkpoint por tarefa finalizada — um equilibrio razoavel entre granularidade e uso de disco. Eventos de alta frequencia como `llm_call_completed` estao disponiveis para recuperacao mais granular, mas gravam muito mais arquivos.
### Armazenamento
Dois provedores acompanham o CrewAI:
- `JsonProvider` grava um arquivo por checkpoint. Legivel e facil de inspecionar.
- `SqliteProvider` grava em um unico banco SQLite. Melhor para checkpointing de alta frequencia.
Ambos removem os checkpoints mais antigos quando `max_checkpoints` esta definido.
<Note>
As gravacoes de checkpoint sao best-effort. Um checkpoint que falha e registrado em log, mas nao interrompe a execucao.
</Note>
### Modelo de heranca
`Crew`, `Flow` e `Agent` aceitam um argumento `checkpoint`. Filhos herdam do pai a menos que definam seu proprio valor ou passem `False` para desativar. Ative o checkpointing uma vez na crew e todos os agentes participam, ou exclua um agente seletivamente.
## Tutorial: Retomar uma crew com falha
Este passo a passo leva cerca de 5 minutos. Voce executara uma crew de duas tarefas, a interrompera no meio e a retomara a partir do checkpoint salvo.
<Steps>
<Step title="Crie a crew com checkpointing ativado">
```python
from crewai import Agent, Crew, Task
researcher = Agent(role="Researcher", goal="Research", backstory="Expert")
writer = Agent(role="Writer", goal="Write", backstory="Expert")
crew = Crew(
agents=[researcher, writer],
tasks=[
Task(description="Research AI trends", agent=researcher, expected_output="bullets"),
Task(description="Write a summary", agent=writer, expected_output="paragraph"),
],
checkpoint=True,
)
```
</Step>
<Step title="Execute e interrompa apos a primeira tarefa">
```python
result = crew.kickoff()
```
Pressione `Ctrl+C` apos a primeira tarefa concluir. Em `./.checkpoints/`, um arquivo `<timestamp>_<uuid>.json` e o checkpoint.
</Step>
<Step title="Retome a partir do checkpoint">
```python
from crewai import CheckpointConfig
result = crew.kickoff(
from_checkpoint=CheckpointConfig(
restore_from="./.checkpoints/<timestamp>_<uuid>.json",
),
)
```
A tarefa de pesquisa e pulada, o escritor executa contra a saida de pesquisa salva e a crew finaliza.
</Step>
</Steps>
## Guias de uso
<AccordionGroup>
<Accordion title="Ativar checkpointing com padroes" icon="play">
```python
crew = Crew(agents=[...], tasks=[...], checkpoint=True)
```
Grava em `./.checkpoints/` em cada `task_completed`.
</Accordion>
<Accordion title="Personalizar armazenamento e frequencia" icon="sliders">
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
</Accordion>
<Accordion title="Escolher um provedor de armazenamento" icon="database">
<CodeGroup>
```python JsonProvider
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
```python SqliteProvider
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
max_checkpoints=50,
),
)
```
</CodeGroup>
<Tip>
O SQLite ativa o modo journal WAL para leituras concorrentes. Prefira-o para checkpointing de alta frequencia.
</Tip>
</Accordion>
<Accordion title="Desativar um agente especifico" icon="user-slash">
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...),
Agent(role="Writer", ..., checkpoint=False),
],
tasks=[...],
checkpoint=True,
)
```
</Accordion>
<Accordion title="Fazer fork em uma nova branch" icon="code-branch">
`fork()` restaura um checkpoint sob uma nova linhagem para que a nova execucao nao colida com a original.
```python
config = CheckpointConfig(restore_from="./my_checkpoints/<file>.json")
crew = Crew.fork(config, branch="experiment-a")
result = crew.kickoff(inputs={"strategy": "aggressive"})
```
O label `branch` e opcional; um e gerado se omitido.
</Accordion>
<Accordion title="Checkpoint em Crew, Flow ou Agent" icon="cubes">
<Tabs>
<Tab title="Crew">
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
Gatilho padrao: `task_completed`.
</Tab>
<Tab title="Flow">
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
```
</Tab>
<Tab title="Agent">
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
</Tab>
</Tabs>
</Accordion>
<Accordion title="Gravar um checkpoint manualmente" icon="code">
Registre um handler em qualquer evento e chame `state.checkpoint()`.
<CodeGroup>
```python Sync
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"Checkpoint salvo: {path}")
```
```python Async
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"Checkpoint salvo: {path}")
```
</CodeGroup>
Um argumento `state` e fornecido automaticamente quando o handler recebe tres parametros. Veja [Event Listeners](/pt-BR/concepts/event-listener) para o catalogo completo de eventos.
</Accordion>
<Accordion title="Navegar, retomar e fazer fork pela CLI" icon="terminal">
```bash
crewai checkpoint # detecta automaticamente .checkpoints/ ou .checkpoints.db
crewai checkpoint --location ./my_checkpoints
crewai checkpoint --location ./.checkpoints.db
```
<Frame caption="Arvore de checkpoints — branches e forks aninham sob seu pai.">
<img src="/images/checkpoint-tui-tree.png" alt="Checkpoint TUI tree view" />
</Frame>
O painel esquerdo agrupa checkpoints por branch; forks aninham sob seu pai. Selecionar um checkpoint abre o painel de detalhes com metadados, estado da entidade e progresso das tarefas. **Resume** continua a execucao; **Fork** inicia uma nova branch.
<Frame caption="Aba de visao geral — metadados, estado da entidade e resumo da execucao.">
<img src="/images/checkpoint-tui-detail-overview.png" alt="Checkpoint detail overview tab" />
</Frame>
O painel de detalhes expoe duas areas editaveis:
- **Inputs** — os inputs originais do kickoff, preenchidos e editaveis.
<Frame>
<img src="/images/checkpoint-tui-detail-inputs.png" alt="Editable kickoff inputs" />
</Frame>
- **Saidas das tarefas** — saidas das tarefas concluidas. Editar uma saida e pressionar **Fork** invalida tarefas downstream para que sejam reexecutadas com o contexto modificado.
<Frame>
<img src="/images/checkpoint-tui-detail-tasks.png" alt="Editable task outputs" />
</Frame>
<Frame caption="Tela de fork — confirme uma nova branch a partir do checkpoint selecionado.">
<img src="/images/checkpoint-tui-details-fork.png" alt="Fork confirmation panel" />
</Frame>
<Tip>
Util para exploracao de cenarios: fork, ajuste, observe.
</Tip>
</Accordion>
<Accordion title="Inspecionar checkpoints sem a TUI" icon="magnifying-glass">
```bash
crewai checkpoint list ./my_checkpoints
crewai checkpoint info ./my_checkpoints/<file>.json
crewai checkpoint info ./.checkpoints.db
```
</Accordion>
</AccordionGroup>
## Referencia
### `CheckpointConfig`
<ParamField path="location" type="str" default='"./.checkpoints"'>
Destino do armazenamento. Diretorio para `JsonProvider`, caminho de arquivo de banco para `SqliteProvider`.
</ParamField>
<ParamField path="on_events" type="list[CheckpointEventType]" default='["task_completed"]'>
Tipos de evento que disparam um checkpoint. `CheckpointEventType` e um `Literal` — seu type checker autocompleta e rejeita valores nao suportados. Veja [tipos de evento](#tipos-de-evento) para a lista completa.
</ParamField>
<ParamField path="provider" type="BaseProvider" default="JsonProvider()">
Backend de armazenamento. `JsonProvider` ou `SqliteProvider`.
</ParamField>
<ParamField path="max_checkpoints" type="int | None" default="None">
Maximo de checkpoints a reter. Os mais antigos sao removidos apos cada gravacao.
</ParamField>
<ParamField path="restore_from" type="Path | str | None" default="None">
Checkpoint a restaurar quando passado via `from_checkpoint`.
</ParamField>
### Valores do campo `checkpoint`
Aceito por `Crew`, `Flow` e `Agent`.
<ParamField path="None" type="padrao">
Herda do pai.
</ParamField>
<ParamField path="True" type="bool">
Ativa com padroes.
</ParamField>
<ParamField path="False" type="bool">
Desativacao explicita. Interrompe a heranca.
</ParamField>
<ParamField path="CheckpointConfig(...)" type="CheckpointConfig">
Configuracao personalizada.
</ParamField>
### Tipos de evento
`on_events` aceita qualquer combinacao de valores `CheckpointEventType`. O padrao `["task_completed"]` grava um checkpoint por tarefa finalizada; `["*"]` corresponde a todos os eventos.
<Warning>
`["*"]` e eventos de alta frequencia como `llm_call_completed` gravam muitos checkpoints e podem degradar o desempenho. Combine com `max_checkpoints`.
O checkpointing esta em versao inicial. As APIs podem mudar em versoes futuras.
</Warning>
<Expandable title="Todos os eventos suportados">
## Visao Geral
- **Task** — `task_started`, `task_completed`, `task_failed`, `task_evaluation`
- **Crew** — `crew_kickoff_started`, `crew_kickoff_completed`, `crew_kickoff_failed`, `crew_train_started`, `crew_train_completed`, `crew_train_failed`, `crew_test_started`, `crew_test_completed`, `crew_test_failed`, `crew_test_result`
- **Agent** — `agent_execution_started`, `agent_execution_completed`, `agent_execution_error`, `lite_agent_execution_started`, `lite_agent_execution_completed`, `lite_agent_execution_error`, `agent_evaluation_started`, `agent_evaluation_completed`, `agent_evaluation_failed`
- **Flow** — `flow_created`, `flow_started`, `flow_finished`, `flow_paused`, `method_execution_started`, `method_execution_finished`, `method_execution_failed`, `method_execution_paused`, `human_feedback_requested`, `human_feedback_received`, `flow_input_requested`, `flow_input_received`
- **LLM** — `llm_call_started`, `llm_call_completed`, `llm_call_failed`, `llm_stream_chunk`, `llm_thinking_chunk`
- **LLM Guardrail** — `llm_guardrail_started`, `llm_guardrail_completed`, `llm_guardrail_failed`
- **Tool** — `tool_usage_started`, `tool_usage_finished`, `tool_usage_error`, `tool_validate_input_error`, `tool_selection_error`, `tool_execution_error`
- **Memory** — `memory_save_started`, `memory_save_completed`, `memory_save_failed`, `memory_query_started`, `memory_query_completed`, `memory_query_failed`, `memory_retrieval_started`, `memory_retrieval_completed`, `memory_retrieval_failed`
- **Knowledge** — `knowledge_search_query_started`, `knowledge_search_query_completed`, `knowledge_query_started`, `knowledge_query_completed`, `knowledge_query_failed`, `knowledge_search_query_failed`
- **Reasoning** — `agent_reasoning_started`, `agent_reasoning_completed`, `agent_reasoning_failed`
- **MCP** — `mcp_connection_started`, `mcp_connection_completed`, `mcp_connection_failed`, `mcp_tool_execution_started`, `mcp_tool_execution_completed`, `mcp_tool_execution_failed`, `mcp_config_fetch_failed`
- **Observation** — `step_observation_started`, `step_observation_completed`, `step_observation_failed`, `plan_refinement`, `plan_replan_triggered`, `goal_achieved_early`
- **Skill** — `skill_discovery_started`, `skill_discovery_completed`, `skill_loaded`, `skill_activated`, `skill_load_failed`
- **Logging** — `agent_logs_started`, `agent_logs_execution`
- **A2A** — `a2a_delegation_started`, `a2a_delegation_completed`, `a2a_conversation_started`, `a2a_conversation_completed`, `a2a_message_sent`, `a2a_response_received`, `a2a_polling_started`, `a2a_polling_status`, `a2a_push_notification_registered`, `a2a_push_notification_received`, `a2a_push_notification_sent`, `a2a_push_notification_timeout`, `a2a_streaming_started`, `a2a_streaming_chunk`, `a2a_agent_card_fetched`, `a2a_authentication_failed`, `a2a_artifact_received`, `a2a_connection_error`, `a2a_server_task_started`, `a2a_server_task_completed`, `a2a_server_task_canceled`, `a2a_server_task_failed`, `a2a_parallel_delegation_started`, `a2a_parallel_delegation_completed`, `a2a_transport_negotiated`, `a2a_content_type_negotiated`, `a2a_context_created`, `a2a_context_expired`, `a2a_context_idle`, `a2a_context_completed`, `a2a_context_pruned`
- **Sinais de sistema** — `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGTSTP`, `SIGCONT`
- **Wildcard** — `"*"` corresponde a todos os eventos.
O checkpointing salva automaticamente o estado de execucao durante uma execucao. Se uma crew, flow ou agente falhar no meio da execucao, voce pode restaurar a partir do ultimo checkpoint e retomar sem reexecutar o trabalho ja concluido.
</Expandable>
## Inicio Rapido
### Provedores de armazenamento
```python
from crewai import Crew, CheckpointConfig
<ParamField path="JsonProvider" type="provider">
Um arquivo por checkpoint, nomeado `<timestamp>_<uuid>.json` dentro de `location`.
</ParamField>
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=True, # usa padroes: ./.checkpoints, em task_completed
)
result = crew.kickoff()
```
<ParamField path="SqliteProvider" type="provider">
Arquivo de banco unico em `location` com journaling WAL.
</ParamField>
Os arquivos de checkpoint sao gravados em `./.checkpoints/` apos cada tarefa concluida.
### CLI
## Configuracao
| Comando | Proposito |
|:--------|:----------|
| `crewai checkpoint` | Inicia a TUI; detecta o armazenamento automaticamente. |
| `crewai checkpoint --location <path>` | Inicia a TUI em uma localizacao especifica. |
| `crewai checkpoint list <path>` | Lista checkpoints. |
| `crewai checkpoint info <path>` | Inspeciona um arquivo de checkpoint ou a entrada mais recente em um banco SQLite. |
Use `CheckpointConfig` para controle total:
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
### Campos do CheckpointConfig
| Campo | Tipo | Padrao | Descricao |
|:------|:-----|:-------|:----------|
| `location` | `str` | `"./.checkpoints"` | Caminho para os arquivos de checkpoint |
| `on_events` | `list[str]` | `["task_completed"]` | Tipos de evento que acionam um checkpoint |
| `provider` | `BaseProvider` | `JsonProvider()` | Backend de armazenamento |
| `max_checkpoints` | `int \| None` | `None` | Maximo de arquivos a manter; os mais antigos sao removidos primeiro |
### Heranca e Desativacao
O campo `checkpoint` em Crew, Flow e Agent aceita `CheckpointConfig`, `True`, `False` ou `None`:
| Valor | Comportamento |
|:------|:--------------|
| `None` (padrao) | Herda do pai. Um agente herda a configuracao da crew. |
| `True` | Ativa com padroes. |
| `False` | Desativacao explicita. Interrompe a heranca do pai. |
| `CheckpointConfig(...)` | Configuracao personalizada. |
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...), # herda checkpoint da crew
Agent(role="Writer", ..., checkpoint=False), # desativado, sem checkpoints
],
tasks=[...],
checkpoint=True,
)
```
## Retomando a partir de um Checkpoint
```python
# Restaurar e retomar
crew = Crew.from_checkpoint("./my_checkpoints/20260407T120000_abc123.json")
result = crew.kickoff() # retoma a partir da ultima tarefa concluida
```
A crew restaurada pula tarefas ja concluidas e retoma a partir da primeira incompleta.
## Funciona em Crew, Flow e Agent
### Crew
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
Gatilho padrao: `task_completed` (um checkpoint por tarefa finalizada).
### Flow
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
# Retomar
flow = MyFlow.from_checkpoint("./flow_cp/20260407T120000_abc123.json")
result = flow.kickoff()
```
### Agent
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
## Provedores de Armazenamento
O CrewAI inclui dois provedores de armazenamento para checkpoints.
### JsonProvider (padrao)
Grava cada checkpoint como um arquivo JSON separado.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
### SqliteProvider
Armazena todos os checkpoints em um unico arquivo SQLite.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
),
)
```
## Tipos de Evento
O campo `on_events` aceita qualquer combinacao de strings de tipo de evento. Escolhas comuns:
| Caso de Uso | Eventos |
|:------------|:--------|
| Apos cada tarefa (Crew) | `["task_completed"]` |
| Apos cada metodo do flow | `["method_execution_finished"]` |
| Apos execucao do agente | `["agent_execution_completed"]`, `["lite_agent_execution_completed"]` |
| Apenas na conclusao da crew | `["crew_kickoff_completed"]` |
| Apos cada chamada LLM | `["llm_call_completed"]` |
| Em tudo | `["*"]` |
<Warning>
Usar `["*"]` ou eventos de alta frequencia como `llm_call_completed` gravara muitos arquivos de checkpoint e pode impactar o desempenho. Use `max_checkpoints` para limitar o uso de disco.
</Warning>
## Checkpointing Manual
Para controle total, registre seu proprio handler de evento e chame `state.checkpoint()` diretamente:
```python
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
# Handler sincrono
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"Checkpoint salvo: {path}")
# Handler assincrono
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"Checkpoint salvo: {path}")
```
O argumento `state` e o `RuntimeState` passado automaticamente pelo barramento de eventos quando seu handler aceita 3 parametros. Voce pode registrar handlers em qualquer tipo de evento listado na documentacao de [Event Listeners](/pt-BR/concepts/event-listener).
O checkpointing e best-effort: se uma gravacao de checkpoint falhar, o erro e registrado no log, mas a execucao continua sem interrupcao.

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
from concurrent.futures import Future
from concurrent.futures import Future, ThreadPoolExecutor
from copy import copy as shallow_copy
from hashlib import md5
import json
@@ -355,6 +355,24 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="Skill search paths, pre-loaded Skill objects, or '@org/name' registry refs applied to all agents in the crew.",
)
cache_preload: bool = Field(
default=False,
description=(
"When True, fire lightweight 1-token cache-warming probes for each "
"agent's system prompt at kickoff time so the provider's prompt cache "
"is warm before the first real task runs."
),
)
cache_preload_strategy: Literal["parallel", "sequential", "shared_prefix"] = Field(
default="parallel",
description=(
"Strategy for cache preloading: "
"'parallel' fires probes concurrently, "
"'sequential' fires them one by one, "
"'shared_prefix' detects the common system-prompt prefix across agents "
"and warms it once before per-agent suffixes."
),
)
security_config: SecurityConfig = Field(
default_factory=SecurityConfig,
@@ -1003,6 +1021,9 @@ class Crew(FlowTrackable, BaseModel):
try:
inputs = prepare_kickoff(self, inputs, input_files)
if self.cache_preload and len(self.agents) >= 2:
self._preload_caches()
if self.process == Process.sequential:
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
@@ -1040,6 +1061,111 @@ class Crew(FlowTrackable, BaseModel):
def _post_kickoff(self, result: CrewOutput) -> CrewOutput:
return result
def _get_agent_system_prompt(self, agent: BaseAgent) -> str:
"""Build the system prompt that would be sent to the LLM for *agent*.
This mirrors how ``Agent.create_agent_executor`` constructs the prompt
via :class:`Prompts` so the cache-warming probe uses the exact same
bytes the provider will later see on the first real call.
"""
from crewai.utilities.prompts import Prompts
prompt_result = Prompts(
agent=agent,
has_tools=bool(agent.tools),
use_system_prompt=getattr(agent, "use_system_prompt", True),
system_template=getattr(agent, "system_template", None),
prompt_template=getattr(agent, "prompt_template", None),
response_template=getattr(agent, "response_template", None),
).task_execution()
system: str = prompt_result.get("system", "") or ""
if system:
return system
prompt: str = prompt_result.get("prompt", "") or ""
return prompt
@staticmethod
def _common_prefix(strings: list[str]) -> str:
"""Return the longest common character prefix of *strings*."""
if not strings:
return ""
shortest = min(strings, key=len)
for i, char in enumerate(shortest):
for s in strings:
if s[i] != char:
return shortest[:i]
return shortest
def _preload_caches(self) -> None:
"""Warm each agent's LLM prompt cache at kickoff time.
Fires lightweight 1-token completions so the provider's cache is
primed before the first real task runs. Supports three strategies:
* ``parallel`` -- probes fired concurrently via a thread-pool.
* ``sequential`` -- probes fired one-by-one in agent order.
* ``shared_prefix`` -- detects the common system-prompt prefix across
agents. If it is >= 1024 characters (the typical provider
cache-breakpoint threshold), it warms the shared prefix once,
then warms each per-agent suffix. Falls back to *parallel* when
no meaningful shared prefix exists.
"""
self._logger.log("info", "Cache preload: warming agent prompt caches")
agent_prompts: list[tuple[BaseAgent, str]] = []
for agent in self.agents:
prompt = self._get_agent_system_prompt(agent)
if prompt:
agent_prompts.append((agent, prompt))
if not agent_prompts:
return
strategy = self.cache_preload_strategy
if strategy == "shared_prefix":
prompts = [p for _, p in agent_prompts]
prefix = self._common_prefix(prompts)
min_prefix_len = 1024
if len(prefix) >= min_prefix_len:
self._logger.log(
"info",
f"Cache preload: shared prefix detected ({len(prefix)} chars), "
"warming shared prefix first",
)
first_agent, _ = agent_prompts[0]
if isinstance(first_agent.llm, BaseLLM):
first_agent.llm.preload_probe(prefix)
for agent, prompt in agent_prompts:
if isinstance(agent.llm, BaseLLM):
agent.llm.preload_probe(prompt)
return
self._logger.log(
"info",
f"Cache preload: shared prefix too short ({len(prefix)} chars), "
"falling back to parallel strategy",
)
strategy = "parallel"
if strategy == "parallel":
with ThreadPoolExecutor(max_workers=min(len(agent_prompts), 4)) as pool:
futures = []
for agent, prompt in agent_prompts:
if isinstance(agent.llm, BaseLLM):
futures.append(pool.submit(agent.llm.preload_probe, prompt))
for f in futures:
f.result()
elif strategy == "sequential":
for agent, prompt in agent_prompts:
if isinstance(agent.llm, BaseLLM):
agent.llm.preload_probe(prompt)
self._logger.log("info", "Cache preload: done")
def kickoff_for_each(
self,
inputs: list[dict[str, Any]],

View File

@@ -67,6 +67,8 @@ class JsonResponseFormat(TypedDict):
type: Literal["json_object"]
logger = logging.getLogger(__name__)
DEFAULT_CONTEXT_WINDOW_SIZE: Final[int] = 4096
DEFAULT_SUPPORTS_STOP_WORDS: Final[bool] = True
_JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTALL)
@@ -373,6 +375,41 @@ class BaseLLM(BaseModel, ABC):
"""
return DEFAULT_SUPPORTS_STOP_WORDS
def preload_probe(self, system_prompt: str) -> None:
"""Fire a 1-token completion to warm the provider's prompt cache.
Sends the agent's system prompt with ``max_tokens=1`` so the provider
commits the prefix to its cache (e.g. Anthropic prompt caching,
OpenAI prefix caching). Subsequent calls within the TTL window get
cache-read pricing instead of the cold-write path.
The call is best-effort: failures are logged as warnings and never
propagated so they cannot break crew execution.
Args:
system_prompt: The full system prompt to warm.
"""
original_max_tokens: int | float | None = getattr(self, "max_tokens", None)
original_temperature = self.temperature
try:
# Temporarily override for the probe call. We go through the
# custom __setattr__ that BaseLLM already provides so that
# subclass fields (max_tokens, temperature) are set correctly
# even if they are not declared on BaseLLM itself.
self.__setattr__("max_tokens", 1)
self.temperature = 0
self.call(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Ready."},
],
)
except Exception as exc:
logger.warning("Cache preload probe failed: %s", exc)
finally:
self.__setattr__("max_tokens", original_max_tokens)
self.temperature = original_temperature
def _supports_stop_words_implementation(self) -> bool:
"""Check if stop words are configured for this LLM instance.

View File

@@ -0,0 +1,366 @@
"""Tests for session-start prompt-cache preload feature (#5921).
Verifies that Crew.cache_preload and Crew.cache_preload_strategy
correctly fire lightweight 1-token probes to warm LLM prompt caches
at kickoff time.
"""
from unittest.mock import MagicMock, patch
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_agent(role: str, goal: str, backstory: str) -> Agent:
return Agent(role=role, goal=goal, backstory=backstory, allow_delegation=False)
def _make_task(description: str, agent: Agent) -> Task:
return Task(description=description, expected_output="output", agent=agent)
# ---------------------------------------------------------------------------
# BaseLLM.preload_probe
# ---------------------------------------------------------------------------
class TestBaseLLMPreloadProbe:
def test_preload_probe_fires_one_token_completion(self):
"""preload_probe should delegate to self.call with max_tokens=1."""
agent = _make_agent("R", "g", "b")
agent.llm.call = MagicMock(return_value="ok")
original_max_tokens = agent.llm.max_tokens
agent.llm.preload_probe("You are a helpful assistant.")
agent.llm.call.assert_called_once()
args, kwargs = agent.llm.call.call_args
# messages may be passed as positional or keyword arg
messages = args[0] if args else kwargs.get("messages")
assert messages[0]["role"] == "system"
assert messages[0]["content"] == "You are a helpful assistant."
# max_tokens should be restored after the call
assert agent.llm.max_tokens == original_max_tokens
def test_preload_probe_does_not_raise_on_failure(self):
"""preload_probe must not propagate exceptions."""
agent = _make_agent("R", "g", "b")
agent.llm.call = MagicMock(side_effect=RuntimeError("boom"))
# Should NOT raise
agent.llm.preload_probe("system prompt")
def test_preload_probe_uses_temperature_zero(self):
"""preload_probe should temporarily set temperature=0."""
agent = _make_agent("R", "g", "b")
captured_temp = []
def capture_call(*_args, **_kwargs):
captured_temp.append(agent.llm.temperature)
return "ok"
agent.llm.call = capture_call
agent.llm.temperature = 0.7
agent.llm.preload_probe("system prompt")
assert captured_temp[0] == 0
assert agent.llm.temperature == 0.7
# ---------------------------------------------------------------------------
# Crew fields
# ---------------------------------------------------------------------------
class TestCachePreloadFields:
def test_cache_preload_defaults_to_false(self):
a = _make_agent("R", "g", "b")
t = _make_task("do it", a)
crew = Crew(agents=[a], tasks=[t])
assert crew.cache_preload is False
def test_cache_preload_strategy_defaults_to_parallel(self):
a = _make_agent("R", "g", "b")
t = _make_task("do it", a)
crew = Crew(agents=[a], tasks=[t])
assert crew.cache_preload_strategy == "parallel"
def test_cache_preload_strategy_accepts_valid_values(self):
a = _make_agent("R", "g", "b")
t = _make_task("do it", a)
for strategy in ("parallel", "sequential", "shared_prefix"):
crew = Crew(
agents=[a],
tasks=[t],
cache_preload=True,
cache_preload_strategy=strategy,
)
assert crew.cache_preload_strategy == strategy
# ---------------------------------------------------------------------------
# Parallel strategy
# ---------------------------------------------------------------------------
class TestParallelStrategy:
def test_parallel_strategy_probes_all_agents(self):
a1 = _make_agent("Researcher", "research AI", "You research stuff.")
a2 = _make_agent("Writer", "write content", "You write stuff.")
t1 = _make_task("research task", a1)
t2 = _make_task("writing task", a2)
crew = Crew(
agents=[a1, a2],
tasks=[t1, t2],
cache_preload=True,
cache_preload_strategy="parallel",
)
a1.llm.preload_probe = MagicMock()
a2.llm.preload_probe = MagicMock()
crew._preload_caches()
a1.llm.preload_probe.assert_called_once()
a2.llm.preload_probe.assert_called_once()
def test_parallel_strategy_passes_system_prompt(self):
a1 = _make_agent("Researcher", "research AI", "You research stuff.")
t1 = _make_task("task", a1)
a2 = _make_agent("Writer", "write content", "You write stuff.")
t2 = _make_task("task2", a2)
crew = Crew(
agents=[a1, a2],
tasks=[t1, t2],
cache_preload=True,
cache_preload_strategy="parallel",
)
a1.llm.preload_probe = MagicMock()
a2.llm.preload_probe = MagicMock()
crew._preload_caches()
probe_arg = a1.llm.preload_probe.call_args[0][0]
assert isinstance(probe_arg, str)
assert len(probe_arg) > 0
# ---------------------------------------------------------------------------
# Sequential strategy
# ---------------------------------------------------------------------------
class TestSequentialStrategy:
def test_sequential_strategy_probes_all_agents(self):
a1 = _make_agent("Researcher", "research AI", "You research stuff.")
a2 = _make_agent("Writer", "write content", "You write stuff.")
t1 = _make_task("research task", a1)
t2 = _make_task("writing task", a2)
crew = Crew(
agents=[a1, a2],
tasks=[t1, t2],
cache_preload=True,
cache_preload_strategy="sequential",
)
a1.llm.preload_probe = MagicMock()
a2.llm.preload_probe = MagicMock()
crew._preload_caches()
a1.llm.preload_probe.assert_called_once()
a2.llm.preload_probe.assert_called_once()
# ---------------------------------------------------------------------------
# Shared-prefix strategy
# ---------------------------------------------------------------------------
class TestSharedPrefixStrategy:
def test_shared_prefix_strategy_with_long_common_prefix(self):
"""When agents share >= 1024 chars of prefix, shared prefix is warmed first."""
shared_backstory = "A" * 2000
a1 = _make_agent("SharedRole", "shared goal", shared_backstory + " agent1 specifics")
a2 = _make_agent("SharedRole", "shared goal", shared_backstory + " agent2 specifics")
t1 = _make_task("task 1", a1)
t2 = _make_task("task 2", a2)
crew = Crew(
agents=[a1, a2],
tasks=[t1, t2],
cache_preload=True,
cache_preload_strategy="shared_prefix",
)
# Verify the prompts actually share a long common prefix
p1 = crew._get_agent_system_prompt(a1)
p2 = crew._get_agent_system_prompt(a2)
prefix = Crew._common_prefix([p1, p2])
assert len(prefix) >= 1024, (
f"Expected common prefix >= 1024 chars, got {len(prefix)}"
)
a1.llm.preload_probe = MagicMock()
a2.llm.preload_probe = MagicMock()
crew._preload_caches()
# first_agent's LLM gets probed twice: once for shared prefix, once for full prompt
assert a1.llm.preload_probe.call_count == 2
# second agent gets probed once for its full prompt
assert a2.llm.preload_probe.call_count == 1
def test_shared_prefix_falls_back_to_parallel_when_prefix_short(self):
"""When the common prefix is < 1024 chars, falls back to parallel."""
a1 = _make_agent("Researcher", "research AI", "Short backstory for researcher.")
a2 = _make_agent("Writer", "write content", "Short backstory for writer.")
t1 = _make_task("task 1", a1)
t2 = _make_task("task 2", a2)
crew = Crew(
agents=[a1, a2],
tasks=[t1, t2],
cache_preload=True,
cache_preload_strategy="shared_prefix",
)
a1.llm.preload_probe = MagicMock()
a2.llm.preload_probe = MagicMock()
crew._preload_caches()
# Falls back to parallel: each agent probed exactly once
a1.llm.preload_probe.assert_called_once()
a2.llm.preload_probe.assert_called_once()
# ---------------------------------------------------------------------------
# Kickoff integration
# ---------------------------------------------------------------------------
class TestKickoffIntegration:
def test_kickoff_calls_preload_when_enabled(self):
a1 = _make_agent("Researcher", "research AI", "backstory")
a2 = _make_agent("Writer", "write content", "backstory")
t1 = _make_task("task 1", a1)
t2 = _make_task("task 2", a2)
crew = Crew(
agents=[a1, a2],
tasks=[t1, t2],
cache_preload=True,
)
with patch.object(crew, "_preload_caches") as mock_preload, \
patch.object(crew, "_run_sequential_process", return_value=MagicMock()):
try:
crew.kickoff()
except Exception:
pass
mock_preload.assert_called_once()
def test_kickoff_skips_preload_when_disabled(self):
a1 = _make_agent("Researcher", "research AI", "backstory")
a2 = _make_agent("Writer", "write content", "backstory")
t1 = _make_task("task 1", a1)
t2 = _make_task("task 2", a2)
crew = Crew(
agents=[a1, a2],
tasks=[t1, t2],
cache_preload=False,
)
with patch.object(crew, "_preload_caches") as mock_preload, \
patch.object(crew, "_run_sequential_process", return_value=MagicMock()):
try:
crew.kickoff()
except Exception:
pass
mock_preload.assert_not_called()
def test_kickoff_skips_preload_for_single_agent(self):
a1 = _make_agent("Researcher", "research AI", "backstory")
t1 = _make_task("task 1", a1)
crew = Crew(
agents=[a1],
tasks=[t1],
cache_preload=True,
)
with patch.object(crew, "_preload_caches") as mock_preload, \
patch.object(crew, "_run_sequential_process", return_value=MagicMock()):
try:
crew.kickoff()
except Exception:
pass
mock_preload.assert_not_called()
# ---------------------------------------------------------------------------
# Crew._common_prefix
# ---------------------------------------------------------------------------
class TestCommonPrefix:
def test_common_prefix_basic(self):
assert Crew._common_prefix(["abc", "abd", "abe"]) == "ab"
def test_common_prefix_empty_list(self):
assert Crew._common_prefix([]) == ""
def test_common_prefix_no_overlap(self):
assert Crew._common_prefix(["abc", "xyz"]) == ""
def test_common_prefix_identical_strings(self):
assert Crew._common_prefix(["hello", "hello"]) == "hello"
def test_common_prefix_single_string(self):
assert Crew._common_prefix(["only"]) == "only"
# ---------------------------------------------------------------------------
# Crew._get_agent_system_prompt
# ---------------------------------------------------------------------------
class TestGetAgentSystemPrompt:
def test_returns_nonempty_string(self):
a = _make_agent("Tester", "test things", "You test stuff.")
t = _make_task("task", a)
crew = Crew(agents=[a], tasks=[t])
prompt = crew._get_agent_system_prompt(a)
assert isinstance(prompt, str)
assert len(prompt) > 0
def test_prompt_contains_agent_role(self):
a = _make_agent("SpecialTester", "test things", "You test stuff.")
t = _make_task("task", a)
crew = Crew(agents=[a], tasks=[t])
prompt = crew._get_agent_system_prompt(a)
assert "SpecialTester" in prompt
def test_prompt_contains_agent_goal(self):
a = _make_agent("Tester", "verify correctness", "You test stuff.")
t = _make_task("task", a)
crew = Crew(agents=[a], tasks=[t])
prompt = crew._get_agent_system_prompt(a)
assert "verify correctness" in prompt