Compare commits

..

2 Commits

Author SHA1 Message Date
Greyson LaLonde
948c90c52e Merge branch 'main' into chore/mypy-strict-cleanup 2026-05-22 03:45:34 +08:00
Greyson LaLonde
b7cf1f0148 chore: tighten mypy strict mode and remove dead code
Enable warn_unreachable, extra_checks, local_partial_types in pyproject.
Remove dead defensive branches and AI-slop union members; replace narrow
band-aid type:ignore with proper signature widening or targeted ignores
for genuine runtime-defensive paths (double-checked locking, hook misuse,
unfollowed-import boundaries).
2026-05-22 03:38:39 +08:00
60 changed files with 1237 additions and 2189 deletions

View File

@@ -23,7 +23,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@d0cc045d04ccac9d8b7881df0226f9e82c39688e # v6
@@ -39,7 +39,7 @@ jobs:
echo "Cache populated successfully"
- name: Save uv caches
uses: actions/cache/save@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/save@v4
with:
path: |
~/.cache/uv

View File

@@ -59,7 +59,7 @@ jobs:
# your codebase is analyzed, see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/codeql-code-scanning-for-compiled-languages
steps:
- name: Checkout repository
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
uses: actions/checkout@v4
# Add any setup steps before running the `github/codeql-action/init` action.
# This includes steps like installing compilers or runtimes (`actions/setup-node`
@@ -69,7 +69,7 @@ jobs:
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4.35.5
uses: github/codeql-action/init@v4
with:
languages: ${{ matrix.language }}
build-mode: ${{ matrix.build-mode }}
@@ -98,6 +98,6 @@ jobs:
exit 1
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4.35.5
uses: github/codeql-action/analyze@v4
with:
category: "/language:${{matrix.language}}"

View File

@@ -18,10 +18,10 @@ jobs:
name: Check broken links
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
- uses: actions/checkout@v4
- name: Set up Node
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
uses: actions/setup-node@v4
with:
node-version: "22"

View File

@@ -28,7 +28,7 @@ jobs:
private-key: ${{ secrets.CREWAI_TOOL_SPECS_PRIVATE_KEY }}
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
uses: actions/checkout@v4
with:
ref: ${{ github.head_ref }}
token: ${{ steps.app-token.outputs.token }}

View File

@@ -12,7 +12,7 @@ jobs:
outputs:
code: ${{ steps.filter.outputs.code }}
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
- uses: actions/checkout@v4
- uses: dorny/paths-filter@d1c1ffe0248fe513906c8e24db8ea791d46f8590 # v3
id: filter
with:
@@ -26,11 +26,11 @@ jobs:
if: needs.changes.outputs.code == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
- uses: actions/checkout@v4
- name: Restore global uv cache
id: cache-restore
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/restore@v4
with:
path: |
~/.cache/uv
@@ -58,7 +58,7 @@ jobs:
- name: Save uv caches
if: steps.cache-restore.outputs.cache-hit != 'true'
uses: actions/cache/save@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/save@v4
with:
path: |
~/.cache/uv

View File

@@ -18,7 +18,7 @@ jobs:
outputs:
has_changes: ${{ steps.check.outputs.has_changes }}
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
- uses: actions/checkout@v4
with:
fetch-depth: 0
@@ -41,7 +41,7 @@ jobs:
permissions:
contents: read
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@d0cc045d04ccac9d8b7881df0226f9e82c39688e # v6
@@ -87,7 +87,7 @@ jobs:
rm dist/.gitignore
- name: Upload artifacts
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
uses: actions/upload-artifact@v4
with:
name: dist
path: dist/
@@ -110,7 +110,7 @@ jobs:
enable-cache: false
- name: Download artifacts
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
uses: actions/download-artifact@v4
with:
name: dist
path: dist

View File

@@ -24,12 +24,12 @@ jobs:
echo "tag=" >> $GITHUB_OUTPUT
fi
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
- uses: actions/checkout@v4
with:
ref: ${{ steps.release.outputs.tag || github.ref }}
- name: Set up Python
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
uses: actions/setup-python@v5
with:
python-version: "3.12"
@@ -42,7 +42,7 @@ jobs:
rm dist/.gitignore
- name: Upload artifacts
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
uses: actions/upload-artifact@v4
with:
name: dist
path: dist/
@@ -58,7 +58,7 @@ jobs:
id-token: write
contents: read
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
- uses: actions/checkout@v4
with:
ref: ${{ inputs.release_tag || github.ref }}
@@ -70,7 +70,7 @@ jobs:
enable-cache: false
- name: Download artifacts
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
uses: actions/download-artifact@v4
with:
name: dist
path: dist

View File

@@ -14,7 +14,7 @@ jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@5bef64f19d7facfb25b37b414482c7164d639639 # v9.1.0
- uses: actions/stale@v9
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
stale-issue-label: 'no-issue-activity'

View File

@@ -12,7 +12,7 @@ jobs:
outputs:
code: ${{ steps.filter.outputs.code }}
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
- uses: actions/checkout@v4
- uses: dorny/paths-filter@d1c1ffe0248fe513906c8e24db8ea791d46f8590 # v3
id: filter
with:
@@ -34,13 +34,13 @@ jobs:
group: [1, 2, 3, 4, 5, 6, 7, 8]
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
uses: actions/checkout@v4
with:
fetch-depth: 0 # Fetch all history for proper diff
- name: Restore global uv cache
id: cache-restore
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/restore@v4
with:
path: |
~/.cache/uv
@@ -61,7 +61,7 @@ jobs:
run: uv sync --all-groups --all-extras
- name: Restore test durations
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/restore@v4
with:
path: .test_durations_py*
key: test-durations-py${{ matrix.python-version }}
@@ -108,7 +108,7 @@ jobs:
- name: Save uv caches
if: steps.cache-restore.outputs.cache-hit != 'true'
uses: actions/cache/save@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/save@v4
with:
path: |
~/.cache/uv

View File

@@ -12,7 +12,7 @@ jobs:
outputs:
code: ${{ steps.filter.outputs.code }}
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
- uses: actions/checkout@v4
- uses: dorny/paths-filter@d1c1ffe0248fe513906c8e24db8ea791d46f8590 # v3
id: filter
with:
@@ -33,11 +33,11 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
uses: actions/checkout@v4
- name: Restore global uv cache
id: cache-restore
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/restore@v4
with:
path: |
~/.cache/uv
@@ -62,7 +62,7 @@ jobs:
- name: Save uv caches
if: steps.cache-restore.outputs.cache-hit != 'true'
uses: actions/cache/save@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/save@v4
with:
path: |
~/.cache/uv

View File

@@ -23,11 +23,11 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
uses: actions/checkout@v4
- name: Restore global uv cache
id: cache-restore
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/restore@v4
with:
path: |
~/.cache/uv
@@ -55,14 +55,14 @@ jobs:
- name: Save durations to cache
if: always()
uses: actions/cache/save@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/save@v4
with:
path: .test_durations_py*
key: test-durations-py${{ matrix.python-version }}
- name: Save uv caches
if: steps.cache-restore.outputs.cache-hit != 'true'
uses: actions/cache/save@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/save@v4
with:
path: |
~/.cache/uv

View File

@@ -16,13 +16,11 @@ jobs:
name: pip-audit
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4.3.1
with:
persist-credentials: false
- uses: actions/checkout@v4
- name: Restore global uv cache
id: cache-restore
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/restore@v4
with:
path: |
~/.cache/uv
@@ -112,14 +110,14 @@ jobs:
- name: Upload pip-audit report
if: always()
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
uses: actions/upload-artifact@v4
with:
name: pip-audit-report
path: pip-audit-report.json
- name: Save uv caches
if: steps.cache-restore.outputs.cache-hit != 'true'
uses: actions/cache/save@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/save@v4
with:
path: |
~/.cache/uv

View File

@@ -5,386 +5,225 @@ icon: floppy-disk
mode: "wide"
---
الـ Checkpointing يحفظ لقطة من حالة التنفيذ أثناء التشغيل بحيث يمكن لطاقم أو تدفق أو وكيل الاستئناف بعد الفشل أو التفرع إلى فرع بديل.
<CardGroup cols={2}>
<Card title="الشرح" icon="lightbulb" href="#الشرح">
كيف يعمل الـ Checkpointing: الأحداث والتخزين والوراثة.
</Card>
<Card title="درس تطبيقي" icon="graduation-cap" href="#درس-تطبيقي-استئناف-طاقم-فاشل">
دليل 5 دقائق: تشغيل، إيقاف، استئناف.
</Card>
<Card title="ادلة عملية" icon="screwdriver-wrench" href="#ادلة-عملية">
وصفات مركزة على المهام لسير العمل الشائع.
</Card>
<Card title="المرجع" icon="book" href="#المرجع">
`CheckpointConfig` والأحداث والمزودات وسطر الأوامر.
</Card>
</CardGroup>
## الشرح
### ما هي نقطة الحفظ
تلتقط نقطة الحفظ كل ما يحتاجه CrewAI لإعادة إنشاء تشغيل أثناء سيره: الحالة الكاملة للطاقم أو التدفق أو الوكيل — التكوين، وذاكرة الوكلاء ومصادر المعرفة، وتقدم المهام، والمخرجات الوسيطة — إلى جانب مدخلات الـ kickoff، وسجل الأحداث حتى تلك النقطة، ومعرف نسب يربط نقطة الحفظ بالتشغيل الذي جاءت منه.
الاستعادة تعيد بناء تلك الحالة وتستمر. تتخطى المهام المكتملة، وتعاد ترطيب الذاكرة والمعرفة، ويعمل العمل التابع على نفس المخرجات التي أنتجها التشغيل الأصلي. التفرع يجري نفس الاستعادة تحت نسب جديد، بحيث يكتب الفرع الجديد والتشغيل الأصلي نقاط الحفظ جنبا إلى جنب دون أن يطمس أحدهما الآخر.
### متى تكتب نقاط الحفظ
الـ Checkpointing مدفوع بالأحداث. يشترك وقت التشغيل في الأحداث التي تحددها عبر `on_events` ويكتب نقطة حفظ عند إطلاق أحدها. الافتراضي `task_completed` ينتج نقطة حفظ لكل مهمة منتهية — توازن معقول بين الدقة واستخدام القرص. الأحداث عالية التردد مثل `llm_call_completed` متاحة للاستعادة الدقيقة لكنها تكتب ملفات أكثر بكثير.
### التخزين
يتضمن CrewAI مزودين:
- `JsonProvider` يكتب ملفا لكل نقطة حفظ. قابل للقراءة وسهل التفقد.
- `SqliteProvider` يكتب إلى قاعدة بيانات SQLite واحدة. أفضل لنقاط الحفظ عالية التردد.
كلاهما يحذف أقدم نقاط الحفظ عند تحديد `max_checkpoints`.
<Note>
كتابة نقاط الحفظ بأفضل جهد. فشل نقطة حفظ يسجل لكنه لا يقاطع التشغيل.
</Note>
### نموذج الوراثة
`Crew` و`Flow` و`Agent` كلها تقبل وسيط `checkpoint`. يرث الأبناء من الأب ما لم يحددوا قيمتهم الخاصة أو يمرروا `False` للانسحاب. فعل الـ Checkpointing مرة واحدة على الطاقم وتشارك كل الوكلاء، أو استبعد وكيلا واحدا بشكل انتقائي.
## درس تطبيقي: استئناف طاقم فاشل
هذا الدليل يستغرق حوالي 5 دقائق. ستشغل طاقما بمهمتين، توقفه في المنتصف، ثم تستأنف من نقطة الحفظ المحفوظة.
<Steps>
<Step title="أنشئ الطاقم مع تفعيل الـ Checkpointing">
```python
from crewai import Agent, Crew, Task
researcher = Agent(role="Researcher", goal="Research", backstory="Expert")
writer = Agent(role="Writer", goal="Write", backstory="Expert")
crew = Crew(
agents=[researcher, writer],
tasks=[
Task(description="Research AI trends", agent=researcher, expected_output="bullets"),
Task(description="Write a summary", agent=writer, expected_output="paragraph"),
],
checkpoint=True,
)
```
</Step>
<Step title="شغله وأوقفه بعد المهمة الأولى">
```python
result = crew.kickoff()
```
اضغط `Ctrl+C` بعد انتهاء المهمة الأولى. في `./.checkpoints/`، الملف بصيغة `<timestamp>_<uuid>.json` هو نقطة الحفظ.
</Step>
<Step title="استأنف من نقطة الحفظ">
```python
from crewai import CheckpointConfig
result = crew.kickoff(
from_checkpoint=CheckpointConfig(
restore_from="./.checkpoints/<timestamp>_<uuid>.json",
),
)
```
يتم تخطي مهمة البحث، ويعمل الكاتب على مخرجات البحث المحفوظة، وينتهي الطاقم.
</Step>
</Steps>
## ادلة عملية
<AccordionGroup>
<Accordion title="تفعيل الـ Checkpointing بالإعدادات الافتراضية" icon="play">
```python
crew = Crew(agents=[...], tasks=[...], checkpoint=True)
```
يكتب إلى `./.checkpoints/` عند كل `task_completed`.
</Accordion>
<Accordion title="تخصيص التخزين والتردد" icon="sliders">
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
</Accordion>
<Accordion title="اختيار مزود التخزين" icon="database">
<CodeGroup>
```python JsonProvider
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
```python SqliteProvider
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
max_checkpoints=50,
),
)
```
</CodeGroup>
<Tip>
SQLite يفعل وضع journal WAL للقراءات المتزامنة. يفضل لنقاط الحفظ عالية التردد.
</Tip>
</Accordion>
<Accordion title="استبعاد وكيل واحد" icon="user-slash">
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...),
Agent(role="Writer", ..., checkpoint=False),
],
tasks=[...],
checkpoint=True,
)
```
</Accordion>
<Accordion title="التفرع إلى فرع جديد" icon="code-branch">
`fork()` يستعيد نقطة حفظ تحت نسب جديد بحيث لا يتصادم التشغيل الجديد مع الأصلي.
```python
config = CheckpointConfig(restore_from="./my_checkpoints/<file>.json")
crew = Crew.fork(config, branch="experiment-a")
result = crew.kickoff(inputs={"strategy": "aggressive"})
```
تسمية `branch` اختيارية؛ يتم إنشاء واحدة إذا أغفلت.
</Accordion>
<Accordion title="Checkpointing لـ Crew أو Flow أو Agent" icon="cubes">
<Tabs>
<Tab title="Crew">
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
المشغل الافتراضي: `task_completed`.
</Tab>
<Tab title="Flow">
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
```
</Tab>
<Tab title="Agent">
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
</Tab>
</Tabs>
</Accordion>
<Accordion title="كتابة نقطة حفظ يدويا" icon="code">
سجل معالجا على أي حدث واستدع `state.checkpoint()`.
<CodeGroup>
```python Sync
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"تم حفظ نقطة الحفظ: {path}")
```
```python Async
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"تم حفظ نقطة الحفظ: {path}")
```
</CodeGroup>
يتم تمرير وسيط `state` تلقائيا عندما يقبل المعالج ثلاثة معاملات. راجع [Event Listeners](/ar/concepts/event-listener) لقائمة الأحداث الكاملة.
</Accordion>
<Accordion title="التصفح والاستئناف والتفرع من سطر الأوامر" icon="terminal">
```bash
crewai checkpoint # كشف تلقائي لـ .checkpoints/ أو .checkpoints.db
crewai checkpoint --location ./my_checkpoints
crewai checkpoint --location ./.checkpoints.db
```
<Frame>
<img src="/images/checkpointing.png" alt="Checkpoint TUI" />
</Frame>
اللوحة اليسرى تجمع نقاط الحفظ حسب الفرع؛ التفرعات تتداخل تحت أبيها. اختيار نقطة حفظ يعرض بياناتها الوصفية وحالة الكيان وتقدم المهام. **Resume** يكمل التشغيل؛ **Fork** يبدأ فرعا جديدا.
لوحة التفاصيل تعرض منطقتين قابلتين للتحرير:
- **Inputs** — مدخلات الـ kickoff الأصلية، معبأة مسبقا وقابلة للتحرير.
- **مخرجات المهام** — مخرجات المهام المكتملة. تحرير مخرج والضغط على **Fork** يبطل المهام التابعة لتعاد بالسياق المعدل.
<Tip>
مفيد لاستكشاف "ماذا لو": تفرع، عدل، راقب.
</Tip>
</Accordion>
<Accordion title="تفقد نقاط الحفظ بدون TUI" icon="magnifying-glass">
```bash
crewai checkpoint list ./my_checkpoints
crewai checkpoint info ./my_checkpoints/<file>.json
crewai checkpoint info ./.checkpoints.db
```
</Accordion>
</AccordionGroup>
## المرجع
### `CheckpointConfig`
<ParamField path="location" type="str" default='"./.checkpoints"'>
وجهة التخزين. مجلد لـ `JsonProvider`، مسار ملف قاعدة بيانات لـ `SqliteProvider`.
</ParamField>
<ParamField path="on_events" type="list[CheckpointEventType]" default='["task_completed"]'>
أنواع الأحداث التي تطلق نقطة حفظ. `CheckpointEventType` هو `Literal` — مدقق الأنواع يكمل تلقائيا ويرفض القيم غير المدعومة. راجع [أنواع الأحداث](#أنواع-الأحداث) للقائمة الكاملة.
</ParamField>
<ParamField path="provider" type="BaseProvider" default="JsonProvider()">
واجهة التخزين. `JsonProvider` أو `SqliteProvider`.
</ParamField>
<ParamField path="max_checkpoints" type="int | None" default="None">
الحد الاقصى لنقاط الحفظ المحتفظ بها. الأقدم تحذف بعد كل كتابة.
</ParamField>
<ParamField path="restore_from" type="Path | str | None" default="None">
نقطة الحفظ المراد استعادتها عند تمريرها عبر `from_checkpoint`.
</ParamField>
### قيم حقل `checkpoint`
مقبولة في `Crew` و`Flow` و`Agent`.
<ParamField path="None" type="افتراضي">
يرث من الأب.
</ParamField>
<ParamField path="True" type="bool">
تفعيل بالإعدادات الافتراضية.
</ParamField>
<ParamField path="False" type="bool">
انسحاب صريح. يوقف الوراثة.
</ParamField>
<ParamField path="CheckpointConfig(...)" type="CheckpointConfig">
إعدادات مخصصة.
</ParamField>
### أنواع الأحداث
يقبل `on_events` أي مجموعة من قيم `CheckpointEventType`. الافتراضي `["task_completed"]` يكتب نقطة حفظ لكل مهمة منتهية، و`["*"]` يطابق جميع الأحداث.
<Warning>
`["*"]` والأحداث عالية التردد مثل `llm_call_completed` تكتب نقاط حفظ كثيرة وقد تضر بالاداء. استخدمها مع `max_checkpoints`.
الـ Checkpointing في اصدار مبكر. قد تتغير واجهات البرمجة في الاصدارات المستقبلية.
</Warning>
<Expandable title="جميع الأحداث المدعومة">
## نظرة عامة
- **Task** — `task_started`, `task_completed`, `task_failed`, `task_evaluation`
- **Crew** — `crew_kickoff_started`, `crew_kickoff_completed`, `crew_kickoff_failed`, `crew_train_started`, `crew_train_completed`, `crew_train_failed`, `crew_test_started`, `crew_test_completed`, `crew_test_failed`, `crew_test_result`
- **Agent** — `agent_execution_started`, `agent_execution_completed`, `agent_execution_error`, `lite_agent_execution_started`, `lite_agent_execution_completed`, `lite_agent_execution_error`, `agent_evaluation_started`, `agent_evaluation_completed`, `agent_evaluation_failed`
- **Flow** — `flow_created`, `flow_started`, `flow_finished`, `flow_paused`, `method_execution_started`, `method_execution_finished`, `method_execution_failed`, `method_execution_paused`, `human_feedback_requested`, `human_feedback_received`, `flow_input_requested`, `flow_input_received`
- **LLM** — `llm_call_started`, `llm_call_completed`, `llm_call_failed`, `llm_stream_chunk`, `llm_thinking_chunk`
- **LLM Guardrail** — `llm_guardrail_started`, `llm_guardrail_completed`, `llm_guardrail_failed`
- **Tool** — `tool_usage_started`, `tool_usage_finished`, `tool_usage_error`, `tool_validate_input_error`, `tool_selection_error`, `tool_execution_error`
- **Memory** — `memory_save_started`, `memory_save_completed`, `memory_save_failed`, `memory_query_started`, `memory_query_completed`, `memory_query_failed`, `memory_retrieval_started`, `memory_retrieval_completed`, `memory_retrieval_failed`
- **Knowledge** — `knowledge_search_query_started`, `knowledge_search_query_completed`, `knowledge_query_started`, `knowledge_query_completed`, `knowledge_query_failed`, `knowledge_search_query_failed`
- **Reasoning** — `agent_reasoning_started`, `agent_reasoning_completed`, `agent_reasoning_failed`
- **MCP** — `mcp_connection_started`, `mcp_connection_completed`, `mcp_connection_failed`, `mcp_tool_execution_started`, `mcp_tool_execution_completed`, `mcp_tool_execution_failed`, `mcp_config_fetch_failed`
- **Observation** — `step_observation_started`, `step_observation_completed`, `step_observation_failed`, `plan_refinement`, `plan_replan_triggered`, `goal_achieved_early`
- **Skill** — `skill_discovery_started`, `skill_discovery_completed`, `skill_loaded`, `skill_activated`, `skill_load_failed`
- **Logging** — `agent_logs_started`, `agent_logs_execution`
- **A2A** — `a2a_delegation_started`, `a2a_delegation_completed`, `a2a_conversation_started`, `a2a_conversation_completed`, `a2a_message_sent`, `a2a_response_received`, `a2a_polling_started`, `a2a_polling_status`, `a2a_push_notification_registered`, `a2a_push_notification_received`, `a2a_push_notification_sent`, `a2a_push_notification_timeout`, `a2a_streaming_started`, `a2a_streaming_chunk`, `a2a_agent_card_fetched`, `a2a_authentication_failed`, `a2a_artifact_received`, `a2a_connection_error`, `a2a_server_task_started`, `a2a_server_task_completed`, `a2a_server_task_canceled`, `a2a_server_task_failed`, `a2a_parallel_delegation_started`, `a2a_parallel_delegation_completed`, `a2a_transport_negotiated`, `a2a_content_type_negotiated`, `a2a_context_created`, `a2a_context_expired`, `a2a_context_idle`, `a2a_context_completed`, `a2a_context_pruned`
- **إشارات النظام** — `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGTSTP`, `SIGCONT`
- **حرف بدل** — `"*"` يطابق جميع الأحداث.
يقوم الـ Checkpointing بحفظ حالة التنفيذ تلقائيا اثناء التشغيل. اذا فشل طاقم او تدفق او وكيل اثناء التنفيذ، يمكنك الاستعادة من اخر نقطة حفظ والاستئناف دون اعادة تنفيذ العمل المكتمل.
</Expandable>
## البداية السريعة
### مزودات التخزين
```python
from crewai import Crew, CheckpointConfig
<ParamField path="JsonProvider" type="provider">
ملف واحد لكل نقطة حفظ بصيغة `<timestamp>_<uuid>.json` داخل `location`.
</ParamField>
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=True, # يستخدم الافتراضيات: ./.checkpoints, عند task_completed
)
result = crew.kickoff()
```
<ParamField path="SqliteProvider" type="provider">
ملف قاعدة بيانات واحد في `location` مع journaling WAL.
</ParamField>
تتم كتابة ملفات نقاط الحفظ في `./.checkpoints/` بعد اكتمال كل مهمة.
### سطر الأوامر
## التكوين
| الامر | الغرض |
|:------|:------|
| `crewai checkpoint` | تشغيل TUI؛ كشف التخزين تلقائيا. |
| `crewai checkpoint --location <path>` | تشغيل TUI على موقع محدد. |
| `crewai checkpoint list <path>` | سرد نقاط الحفظ. |
| `crewai checkpoint info <path>` | تفقد ملف نقطة حفظ أو آخر مدخل في قاعدة بيانات SQLite. |
استخدم `CheckpointConfig` للتحكم الكامل:
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
### حقول CheckpointConfig
| الحقل | النوع | الافتراضي | الوصف |
|:------|:------|:----------|:------|
| `location` | `str` | `"./.checkpoints"` | مسار ملفات نقاط الحفظ |
| `on_events` | `list[str]` | `["task_completed"]` | انواع الاحداث التي تطلق نقطة حفظ |
| `provider` | `BaseProvider` | `JsonProvider()` | واجهة التخزين |
| `max_checkpoints` | `int \| None` | `None` | الحد الاقصى للملفات؛ يتم حذف الاقدم اولا |
### الوراثة والانسحاب
يقبل حقل `checkpoint` في Crew و Flow و Agent قيم `CheckpointConfig` او `True` او `False` او `None`:
| القيمة | السلوك |
|:-------|:-------|
| `None` (افتراضي) | يرث من الاصل. الوكيل يرث اعدادات الطاقم. |
| `True` | تفعيل بالاعدادات الافتراضية. |
| `False` | انسحاب صريح. يوقف الوراثة من الاصل. |
| `CheckpointConfig(...)` | اعدادات مخصصة. |
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...), # يرث checkpoint من الطاقم
Agent(role="Writer", ..., checkpoint=False), # منسحب، بدون نقاط حفظ
],
tasks=[...],
checkpoint=True,
)
```
## الاستئناف من نقطة حفظ
```python
# استعادة واستئناف
crew = Crew.from_checkpoint("./my_checkpoints/20260407T120000_abc123.json")
result = crew.kickoff() # يستأنف من اخر مهمة مكتملة
```
يتخطى الطاقم المستعاد المهام المكتملة ويستأنف من اول مهمة غير مكتملة.
## يعمل على Crew و Flow و Agent
### Crew
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
المشغل الافتراضي: `task_completed` (نقطة حفظ واحدة لكل مهمة مكتملة).
### Flow
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
# استئناف
flow = MyFlow.from_checkpoint("./flow_cp/20260407T120000_abc123.json")
result = flow.kickoff()
```
### Agent
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
## مزودات التخزين
يتضمن CrewAI مزودي تخزين لنقاط الحفظ.
### JsonProvider (افتراضي)
يكتب كل نقطة حفظ كملف JSON منفصل.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
### SqliteProvider
يخزن جميع نقاط الحفظ في ملف قاعدة بيانات SQLite واحد.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
),
)
```
## انواع الاحداث
يقبل حقل `on_events` اي مجموعة من سلاسل انواع الاحداث. الخيارات الشائعة:
| حالة الاستخدام | الاحداث |
|:---------------|:--------|
| بعد كل مهمة (Crew) | `["task_completed"]` |
| بعد كل طريقة في التدفق | `["method_execution_finished"]` |
| بعد تنفيذ الوكيل | `["agent_execution_completed"]`, `["lite_agent_execution_completed"]` |
| عند اكتمال الطاقم فقط | `["crew_kickoff_completed"]` |
| بعد كل استدعاء LLM | `["llm_call_completed"]` |
| على كل شيء | `["*"]` |
<Warning>
استخدام `["*"]` او احداث عالية التردد مثل `llm_call_completed` سيكتب العديد من ملفات نقاط الحفظ وقد يؤثر على الاداء. استخدم `max_checkpoints` للحد من استخدام المساحة.
</Warning>
## نقاط الحفظ اليدوية
للتحكم الكامل، سجل معالج الاحداث الخاص بك واستدع `state.checkpoint()` مباشرة:
```python
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
# معالج متزامن
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"تم حفظ نقطة الحفظ: {path}")
# معالج غير متزامن
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"تم حفظ نقطة الحفظ: {path}")
```
وسيط `state` هو `RuntimeState` الذي يتم تمريره تلقائيا بواسطة ناقل الاحداث عندما يقبل المعالج 3 معاملات. يمكنك تسجيل معالجات على اي نوع حدث مدرج في وثائق [Event Listeners](/ar/concepts/event-listener).
الـ Checkpointing يعمل بافضل جهد: اذا فشلت كتابة نقطة حفظ، يتم تسجيل الخطأ ولكن التنفيذ يستمر دون انقطاع.

View File

@@ -5,386 +5,301 @@ icon: floppy-disk
mode: "wide"
---
Checkpointing saves a snapshot of execution state during a run so a crew, flow, or agent can resume after a failure or be forked into an alternate branch.
<CardGroup cols={2}>
<Card title="Explanation" icon="lightbulb" href="#explanation">
How checkpointing works: events, storage, and inheritance.
</Card>
<Card title="Tutorial" icon="graduation-cap" href="#tutorial-resume-a-failing-crew">
A 5-minute walkthrough: run, interrupt, resume.
</Card>
<Card title="How-to guides" icon="screwdriver-wrench" href="#how-to-guides">
Task-focused recipes for common workflows.
</Card>
<Card title="Reference" icon="book" href="#reference">
`CheckpointConfig`, events, providers, and CLI.
</Card>
</CardGroup>
## Explanation
### What a checkpoint is
A checkpoint captures everything CrewAI needs to recreate a run mid-flight: the full state of the crew, flow, or agent — configuration, agent memory and knowledge sources, task progress, intermediate outputs — alongside the kickoff inputs, the event history up to that point, and a lineage ID that ties the checkpoint to the run it came from.
Restoring rebuilds that state and continues. Completed tasks are skipped, memory and knowledge are rehydrated, and downstream work runs against the same outputs the original run produced. Forking does the same restore under a new lineage, so the new branch and the original run can write checkpoints side by side without overwriting each other.
### When checkpoints are written
Checkpointing is event-driven. The runtime subscribes to events you select via `on_events` and writes a checkpoint each time one fires. The default `task_completed` produces one checkpoint per finished task — a sensible tradeoff between granularity and disk use. Higher-frequency events like `llm_call_completed` are available for fine-grained recovery but write far more files.
### Storage
Two providers ship with CrewAI:
- `JsonProvider` writes one file per checkpoint. Human-readable and easy to inspect.
- `SqliteProvider` writes to a single SQLite database. Better for high-frequency checkpointing.
Both prune oldest checkpoints when `max_checkpoints` is set.
<Note>
Checkpoint writes are best-effort. A failed checkpoint is logged but does not interrupt the run.
</Note>
### Inheritance model
`Crew`, `Flow`, and `Agent` all accept a `checkpoint` argument. Children inherit from their parent unless they set their own value or pass `False` to opt out. Enable checkpointing once on the crew and every agent participates, or selectively exclude one agent.
## Tutorial: Resume a failing crew
This walkthrough takes ~5 minutes. You will run a two-task crew, kill it midway, and resume from the saved checkpoint.
<Steps>
<Step title="Create the crew with checkpointing enabled">
```python
from crewai import Agent, Crew, Task
researcher = Agent(role="Researcher", goal="Research", backstory="Expert")
writer = Agent(role="Writer", goal="Write", backstory="Expert")
crew = Crew(
agents=[researcher, writer],
tasks=[
Task(description="Research AI trends", agent=researcher, expected_output="bullets"),
Task(description="Write a summary", agent=writer, expected_output="paragraph"),
],
checkpoint=True,
)
```
</Step>
<Step title="Run it and interrupt after the first task">
```python
result = crew.kickoff()
```
Press `Ctrl+C` after the first task finishes. Look in `./.checkpoints/` — a file named `<timestamp>_<uuid>.json` is the checkpoint.
</Step>
<Step title="Resume from the checkpoint">
```python
from crewai import CheckpointConfig
result = crew.kickoff(
from_checkpoint=CheckpointConfig(
restore_from="./.checkpoints/<timestamp>_<uuid>.json",
),
)
```
The research task is skipped, the writer runs against the saved research output, and the crew finishes.
</Step>
</Steps>
## How-to guides
<AccordionGroup>
<Accordion title="Enable checkpointing with defaults" icon="play">
```python
crew = Crew(agents=[...], tasks=[...], checkpoint=True)
```
Writes to `./.checkpoints/` on every `task_completed`.
</Accordion>
<Accordion title="Customize storage and frequency" icon="sliders">
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
</Accordion>
<Accordion title="Choose a storage provider" icon="database">
<CodeGroup>
```python JsonProvider
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
```python SqliteProvider
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
max_checkpoints=50,
),
)
```
</CodeGroup>
<Tip>
SQLite enables WAL journal mode for concurrent reads. Prefer it for high-frequency checkpointing.
</Tip>
</Accordion>
<Accordion title="Opt one agent out" icon="user-slash">
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...),
Agent(role="Writer", ..., checkpoint=False),
],
tasks=[...],
checkpoint=True,
)
```
</Accordion>
<Accordion title="Fork into a new branch" icon="code-branch">
`fork()` restores a checkpoint under a fresh lineage so the new run does not collide with the original.
```python
config = CheckpointConfig(restore_from="./my_checkpoints/<file>.json")
crew = Crew.fork(config, branch="experiment-a")
result = crew.kickoff(inputs={"strategy": "aggressive"})
```
The `branch` label is optional; one is generated if omitted.
</Accordion>
<Accordion title="Checkpoint a Crew, Flow, or Agent" icon="cubes">
<Tabs>
<Tab title="Crew">
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
Default trigger: `task_completed`.
</Tab>
<Tab title="Flow">
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
```
</Tab>
<Tab title="Agent">
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
</Tab>
</Tabs>
</Accordion>
<Accordion title="Write a checkpoint manually" icon="code">
Register a handler on any event and call `state.checkpoint()`.
<CodeGroup>
```python Sync
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"Saved checkpoint: {path}")
```
```python Async
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"Saved checkpoint: {path}")
```
</CodeGroup>
A `state` argument is supplied automatically when the handler takes three parameters. See [Event Listeners](/en/concepts/event-listener) for the full event catalog.
</Accordion>
<Accordion title="Browse, resume, and fork from the CLI" icon="terminal">
```bash
crewai checkpoint # auto-detects .checkpoints/ or .checkpoints.db
crewai checkpoint --location ./my_checkpoints
crewai checkpoint --location ./.checkpoints.db
```
<Frame>
<img src="/images/checkpointing.png" alt="Checkpoint TUI" />
</Frame>
The left panel groups checkpoints by branch; forks nest under their parent. Selecting a checkpoint shows its metadata, entity state, and task progress. **Resume** continues the run; **Fork** starts a new branch.
The detail panel exposes two editable areas:
- **Inputs** — original kickoff inputs, pre-filled and editable.
- **Task outputs** — outputs of completed tasks. Editing an output and hitting **Fork** invalidates downstream tasks so they re-run against the modified context.
<Tip>
Useful for "what if" exploration: fork, tweak, observe.
</Tip>
</Accordion>
<Accordion title="Inspect checkpoints without the TUI" icon="magnifying-glass">
```bash
crewai checkpoint list ./my_checkpoints
crewai checkpoint info ./my_checkpoints/<file>.json
crewai checkpoint info ./.checkpoints.db
```
</Accordion>
</AccordionGroup>
## Reference
### `CheckpointConfig`
<ParamField path="location" type="str" default='"./.checkpoints"'>
Storage destination. A directory for `JsonProvider`, a database file path for `SqliteProvider`.
</ParamField>
<ParamField path="on_events" type="list[CheckpointEventType]" default='["task_completed"]'>
Event types that trigger a checkpoint. `CheckpointEventType` is a `Literal` — your type checker will autocomplete and reject unsupported values. See [event types](#event-types) for the full list.
</ParamField>
<ParamField path="provider" type="BaseProvider" default="JsonProvider()">
Storage backend. Either `JsonProvider` or `SqliteProvider`.
</ParamField>
<ParamField path="max_checkpoints" type="int | None" default="None">
Maximum checkpoints to retain. Oldest are pruned after each write.
</ParamField>
<ParamField path="restore_from" type="Path | str | None" default="None">
Checkpoint to restore from when passed via `from_checkpoint`.
</ParamField>
### `checkpoint` field values
Accepted by `Crew`, `Flow`, and `Agent`.
<ParamField path="None" type="default">
Inherit from parent.
</ParamField>
<ParamField path="True" type="bool">
Enable with defaults.
</ParamField>
<ParamField path="False" type="bool">
Explicit opt-out. Stops inheritance.
</ParamField>
<ParamField path="CheckpointConfig(...)" type="CheckpointConfig">
Custom configuration.
</ParamField>
### Event types
`on_events` accepts any combination of `CheckpointEventType` values. The default `["task_completed"]` writes one checkpoint per finished task; `["*"]` matches every event.
<Warning>
`["*"]` and high-frequency events like `llm_call_completed` write many checkpoints and can degrade performance. Pair them with `max_checkpoints`.
Checkpointing is in early release. APIs may change in future versions.
</Warning>
<Expandable title="All supported events">
## Overview
- **Task** — `task_started`, `task_completed`, `task_failed`, `task_evaluation`
- **Crew** — `crew_kickoff_started`, `crew_kickoff_completed`, `crew_kickoff_failed`, `crew_train_started`, `crew_train_completed`, `crew_train_failed`, `crew_test_started`, `crew_test_completed`, `crew_test_failed`, `crew_test_result`
- **Agent** — `agent_execution_started`, `agent_execution_completed`, `agent_execution_error`, `lite_agent_execution_started`, `lite_agent_execution_completed`, `lite_agent_execution_error`, `agent_evaluation_started`, `agent_evaluation_completed`, `agent_evaluation_failed`
- **Flow** — `flow_created`, `flow_started`, `flow_finished`, `flow_paused`, `method_execution_started`, `method_execution_finished`, `method_execution_failed`, `method_execution_paused`, `human_feedback_requested`, `human_feedback_received`, `flow_input_requested`, `flow_input_received`
- **LLM** — `llm_call_started`, `llm_call_completed`, `llm_call_failed`, `llm_stream_chunk`, `llm_thinking_chunk`
- **LLM Guardrail** — `llm_guardrail_started`, `llm_guardrail_completed`, `llm_guardrail_failed`
- **Tool** — `tool_usage_started`, `tool_usage_finished`, `tool_usage_error`, `tool_validate_input_error`, `tool_selection_error`, `tool_execution_error`
- **Memory** — `memory_save_started`, `memory_save_completed`, `memory_save_failed`, `memory_query_started`, `memory_query_completed`, `memory_query_failed`, `memory_retrieval_started`, `memory_retrieval_completed`, `memory_retrieval_failed`
- **Knowledge** — `knowledge_search_query_started`, `knowledge_search_query_completed`, `knowledge_query_started`, `knowledge_query_completed`, `knowledge_query_failed`, `knowledge_search_query_failed`
- **Reasoning** — `agent_reasoning_started`, `agent_reasoning_completed`, `agent_reasoning_failed`
- **MCP** — `mcp_connection_started`, `mcp_connection_completed`, `mcp_connection_failed`, `mcp_tool_execution_started`, `mcp_tool_execution_completed`, `mcp_tool_execution_failed`, `mcp_config_fetch_failed`
- **Observation** — `step_observation_started`, `step_observation_completed`, `step_observation_failed`, `plan_refinement`, `plan_replan_triggered`, `goal_achieved_early`
- **Skill** — `skill_discovery_started`, `skill_discovery_completed`, `skill_loaded`, `skill_activated`, `skill_load_failed`
- **Logging** — `agent_logs_started`, `agent_logs_execution`
- **A2A** — `a2a_delegation_started`, `a2a_delegation_completed`, `a2a_conversation_started`, `a2a_conversation_completed`, `a2a_message_sent`, `a2a_response_received`, `a2a_polling_started`, `a2a_polling_status`, `a2a_push_notification_registered`, `a2a_push_notification_received`, `a2a_push_notification_sent`, `a2a_push_notification_timeout`, `a2a_streaming_started`, `a2a_streaming_chunk`, `a2a_agent_card_fetched`, `a2a_authentication_failed`, `a2a_artifact_received`, `a2a_connection_error`, `a2a_server_task_started`, `a2a_server_task_completed`, `a2a_server_task_canceled`, `a2a_server_task_failed`, `a2a_parallel_delegation_started`, `a2a_parallel_delegation_completed`, `a2a_transport_negotiated`, `a2a_content_type_negotiated`, `a2a_context_created`, `a2a_context_expired`, `a2a_context_idle`, `a2a_context_completed`, `a2a_context_pruned`
- **System signals** — `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGTSTP`, `SIGCONT`
- **Wildcard** — `"*"` matches every event.
Checkpointing automatically saves execution state during a run. If a crew, flow, or agent fails mid-execution, you can restore from the last checkpoint and resume without re-running completed work.
</Expandable>
## Quick Start
### Storage providers
```python
from crewai import Crew, CheckpointConfig
<ParamField path="JsonProvider" type="provider">
One file per checkpoint, named `<timestamp>_<uuid>.json` inside `location`.
</ParamField>
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=True, # uses defaults: ./.checkpoints, on task_completed
)
result = crew.kickoff()
```
<ParamField path="SqliteProvider" type="provider">
Single database file at `location` with WAL journaling.
</ParamField>
Checkpoint files are written to `./.checkpoints/` after each completed task.
### CLI
## Configuration
| Command | Purpose |
|:--------|:--------|
| `crewai checkpoint` | Launch the TUI; auto-detect storage. |
| `crewai checkpoint --location <path>` | Launch the TUI against a specific location. |
| `crewai checkpoint list <path>` | List checkpoints. |
| `crewai checkpoint info <path>` | Inspect a checkpoint file or the latest entry in a SQLite database. |
Use `CheckpointConfig` for full control:
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
### CheckpointConfig Fields
| Field | Type | Default | Description |
|:------|:-----|:--------|:------------|
| `location` | `str` | `"./.checkpoints"` | Storage destination — a directory for `JsonProvider`, a database file path for `SqliteProvider` |
| `on_events` | `list[str]` | `["task_completed"]` | Event types that trigger a checkpoint |
| `provider` | `BaseProvider` | `JsonProvider()` | Storage backend |
| `max_checkpoints` | `int \| None` | `None` | Max checkpoints to keep. Oldest are pruned after each write. Pruning is handled by the provider. |
| `restore_from` | `Path \| str \| None` | `None` | Path to a checkpoint to restore from. Used when passing config via a kickoff method's `from_checkpoint` parameter. |
### Inheritance and Opt-Out
The `checkpoint` field on Crew, Flow, and Agent accepts `CheckpointConfig`, `True`, `False`, or `None`:
| Value | Behavior |
|:------|:---------|
| `None` (default) | Inherit from parent. An agent inherits its crew's config. |
| `True` | Enable with defaults. |
| `False` | Explicit opt-out. Stops inheritance from parent. |
| `CheckpointConfig(...)` | Custom configuration. |
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...), # inherits crew's checkpoint
Agent(role="Writer", ..., checkpoint=False), # opted out, no checkpoints
],
tasks=[...],
checkpoint=True,
)
```
## Resuming from a Checkpoint
Pass a `CheckpointConfig` with `restore_from` to any kickoff method. The crew restores from that checkpoint, skips completed tasks, and resumes.
```python
from crewai import Crew, CheckpointConfig
crew = Crew(agents=[...], tasks=[...])
result = crew.kickoff(
from_checkpoint=CheckpointConfig(
restore_from="./my_checkpoints/20260407T120000_abc123.json",
),
)
```
Remaining `CheckpointConfig` fields apply to the new run, so checkpointing continues after the restore.
You can also use the classmethod directly:
```python
config = CheckpointConfig(restore_from="./my_checkpoints/20260407T120000_abc123.json")
crew = Crew.from_checkpoint(config)
result = crew.kickoff()
```
## Forking from a Checkpoint
`fork()` restores a checkpoint and starts a new execution branch. Useful for exploring alternative paths from the same point.
```python
from crewai import Crew, CheckpointConfig
config = CheckpointConfig(restore_from="./my_checkpoints/20260407T120000_abc123.json")
crew = Crew.fork(config, branch="experiment-a")
result = crew.kickoff(inputs={"strategy": "aggressive"})
```
Each fork gets a unique lineage ID so checkpoints from different branches don't collide. The `branch` label is optional and auto-generated if omitted.
## Works on Crew, Flow, and Agent
### Crew
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
Default trigger: `task_completed` (one checkpoint per finished task).
### Flow
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
# Resume
config = CheckpointConfig(restore_from="./flow_cp/20260407T120000_abc123.json")
flow = MyFlow.from_checkpoint(config)
result = flow.kickoff()
```
### Agent
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
## Storage Providers
CrewAI ships with two checkpoint storage providers.
### JsonProvider (default)
Writes each checkpoint as a separate JSON file. Simple, human-readable, easy to inspect.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(), # this is the default
max_checkpoints=5, # prunes oldest files
),
)
```
Files are named `<timestamp>_<uuid>.json` inside the location directory.
### SqliteProvider
Stores all checkpoints in a single SQLite database file. Better for high-frequency checkpointing and avoids many small files.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
max_checkpoints=50,
),
)
```
WAL journal mode is enabled for concurrent read access.
## Event Types
The `on_events` field accepts any combination of event type strings. Common choices:
| Use Case | Events |
|:---------|:-------|
| After each task (Crew) | `["task_completed"]` |
| After each flow method | `["method_execution_finished"]` |
| After agent execution | `["agent_execution_completed"]`, `["lite_agent_execution_completed"]` |
| On crew completion only | `["crew_kickoff_completed"]` |
| After every LLM call | `["llm_call_completed"]` |
| On everything | `["*"]` |
<Warning>
Using `["*"]` or high-frequency events like `llm_call_completed` will write many checkpoint files and may impact performance. Use `max_checkpoints` to limit disk usage.
</Warning>
## Manual Checkpointing
For full control, register your own event handler and call `state.checkpoint()` directly:
```python
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
# Sync handler
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"Saved checkpoint: {path}")
# Async handler
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"Saved checkpoint: {path}")
```
The `state` argument is the `RuntimeState` passed automatically by the event bus when your handler accepts 3 parameters. You can register handlers on any event type listed in the [Event Listeners](/en/concepts/event-listener) documentation.
Checkpointing is best-effort: if a checkpoint write fails, the error is logged but execution continues uninterrupted.
## CLI
The `crewai checkpoint` command gives you a TUI for browsing, inspecting, resuming, and forking checkpoints. It auto-detects whether your checkpoints are JSON files or a SQLite database.
```bash
# Launch the TUI — auto-detects .checkpoints/ or .checkpoints.db
crewai checkpoint
# Point at a specific location
crewai checkpoint --location ./my_checkpoints
crewai checkpoint --location ./.checkpoints.db
```
<Frame>
<img src="/images/checkpointing.png" alt="Checkpoint TUI" />
</Frame>
The left panel is a tree view. Checkpoints are grouped by branch, and forks nest under the checkpoint they diverged from. Select a checkpoint to see its metadata, entity state, and task progress in the detail panel. Hit **Resume** to pick up where it left off, or **Fork** to start a new branch from that point.
### Editing inputs and task outputs
When a checkpoint is selected, the detail panel shows:
- **Inputs** — if the original kickoff had inputs (e.g. `{topic}`), they appear as editable fields pre-filled with the original values. Change them before resuming or forking.
- **Task outputs** — completed tasks show their output in editable text areas. Edit a task's output to change the context that downstream tasks receive. When you modify a task output and hit Fork, all subsequent tasks are invalidated and re-run with the new context.
This is useful for "what if" exploration — fork from a checkpoint, tweak a task's result, and see how it changes downstream behavior.
### Subcommands
```bash
# List all checkpoints
crewai checkpoint list ./my_checkpoints
# Inspect a specific checkpoint
crewai checkpoint info ./my_checkpoints/20260407T120000_abc123.json
# Inspect latest in a SQLite database
crewai checkpoint info ./.checkpoints.db
```

View File

@@ -5,386 +5,225 @@ icon: floppy-disk
mode: "wide"
---
체크포인팅은 실행 중 실행 상태의 스냅샷을 저장하여 크루, 플로우, 에이전트가 실패 후 재개하거나 대체 브랜치로 분기될 수 있도록 합니다.
<CardGroup cols={2}>
<Card title="설명" icon="lightbulb" href="#설명">
체크포인팅의 작동 방식: 이벤트, 스토리지, 상속.
</Card>
<Card title="튜토리얼" icon="graduation-cap" href="#튜토리얼-실패한-크루-재개하기">
5분 가이드: 실행, 중단, 재개.
</Card>
<Card title="사용 방법" icon="screwdriver-wrench" href="#사용-방법">
일반적인 워크플로우를 위한 작업 중심 레시피.
</Card>
<Card title="레퍼런스" icon="book" href="#레퍼런스">
`CheckpointConfig`, 이벤트, 프로바이더, CLI.
</Card>
</CardGroup>
## 설명
### 체크포인트란
체크포인트는 실행 중인 작업을 재현하기 위해 CrewAI가 필요한 모든 것을 캡처합니다: 크루, 플로우 또는 에이전트의 전체 상태 — 구성, 에이전트의 메모리 및 지식 소스, 태스크 진행 상황, 중간 출력값 — 그리고 kickoff 입력, 해당 시점까지의 이벤트 기록, 그리고 체크포인트를 원본 실행에 연결하는 lineage ID를 포함합니다.
복원하면 해당 상태를 재구성하고 계속 진행합니다. 완료된 태스크는 건너뛰고, 메모리와 지식은 재수화되며, 다운스트림 작업은 원본 실행이 생성한 동일한 출력을 기반으로 실행됩니다. 포크하면 새 lineage 아래에서 동일한 복원을 수행하여 새 브랜치와 원본 실행이 서로 덮어쓰지 않고 나란히 체크포인트를 기록할 수 있습니다.
### 체크포인트가 기록되는 시점
체크포인팅은 이벤트 기반입니다. 런타임은 `on_events`로 선택한 이벤트를 구독하고, 이벤트가 발생할 때마다 체크포인트를 기록합니다. 기본값 `task_completed`는 완료된 태스크당 하나의 체크포인트를 생성합니다 — 세분화와 디스크 사용의 합리적인 균형입니다. `llm_call_completed`와 같은 고빈도 이벤트는 더 세밀한 복구를 위해 사용 가능하지만 훨씬 많은 파일을 기록합니다.
### 스토리지
CrewAI에는 두 가지 프로바이더가 포함되어 있습니다:
- `JsonProvider`는 체크포인트당 하나의 파일을 기록합니다. 사람이 읽기 쉽고 검사하기 편리합니다.
- `SqliteProvider`는 단일 SQLite 데이터베이스에 기록합니다. 고빈도 체크포인팅에 적합합니다.
`max_checkpoints`가 설정되면 두 프로바이더 모두 가장 오래된 체크포인트를 자동으로 제거합니다.
<Note>
체크포인트 기록은 best-effort 방식입니다. 실패한 체크포인트는 로그에 기록되지만 실행을 중단시키지 않습니다.
</Note>
### 상속 모델
`Crew`, `Flow`, `Agent` 모두 `checkpoint` 인수를 받습니다. 자식은 자체 값을 설정하거나 `False`를 전달하여 옵트아웃하지 않는 한 부모로부터 상속합니다. 크루에서 체크포인팅을 한 번 활성화하면 모든 에이전트가 참여하거나, 특정 에이전트만 선택적으로 제외할 수 있습니다.
## 튜토리얼: 실패한 크루 재개하기
이 가이드는 약 5분이 소요됩니다. 두 개의 태스크가 있는 크루를 실행하고 중간에 종료한 다음, 저장된 체크포인트에서 재개합니다.
<Steps>
<Step title="체크포인팅이 활성화된 크루를 생성합니다">
```python
from crewai import Agent, Crew, Task
researcher = Agent(role="Researcher", goal="Research", backstory="Expert")
writer = Agent(role="Writer", goal="Write", backstory="Expert")
crew = Crew(
agents=[researcher, writer],
tasks=[
Task(description="Research AI trends", agent=researcher, expected_output="bullets"),
Task(description="Write a summary", agent=writer, expected_output="paragraph"),
],
checkpoint=True,
)
```
</Step>
<Step title="실행하고 첫 번째 태스크 후에 중단합니다">
```python
result = crew.kickoff()
```
첫 번째 태스크가 완료된 후 `Ctrl+C`를 누릅니다. `./.checkpoints/` 디렉토리에서 `<timestamp>_<uuid>.json` 형식의 파일이 체크포인트입니다.
</Step>
<Step title="체크포인트에서 재개합니다">
```python
from crewai import CheckpointConfig
result = crew.kickoff(
from_checkpoint=CheckpointConfig(
restore_from="./.checkpoints/<timestamp>_<uuid>.json",
),
)
```
연구 태스크는 건너뛰고, 작성자는 저장된 연구 출력에 대해 실행되며, 크루가 완료됩니다.
</Step>
</Steps>
## 사용 방법
<AccordionGroup>
<Accordion title="기본값으로 체크포인팅 활성화" icon="play">
```python
crew = Crew(agents=[...], tasks=[...], checkpoint=True)
```
`task_completed` 이벤트마다 `./.checkpoints/`에 기록합니다.
</Accordion>
<Accordion title="스토리지와 빈도 사용자 정의" icon="sliders">
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
</Accordion>
<Accordion title="스토리지 프로바이더 선택" icon="database">
<CodeGroup>
```python JsonProvider
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
```python SqliteProvider
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
max_checkpoints=50,
),
)
```
</CodeGroup>
<Tip>
SQLite는 동시 읽기를 위해 WAL 저널 모드를 활성화합니다. 고빈도 체크포인팅에는 SQLite를 선호하세요.
</Tip>
</Accordion>
<Accordion title="특정 에이전트 옵트아웃" icon="user-slash">
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...),
Agent(role="Writer", ..., checkpoint=False),
],
tasks=[...],
checkpoint=True,
)
```
</Accordion>
<Accordion title="새 브랜치로 포크" icon="code-branch">
`fork()`는 새 lineage 아래에 체크포인트를 복원하여 새 실행이 원본과 충돌하지 않도록 합니다.
```python
config = CheckpointConfig(restore_from="./my_checkpoints/<file>.json")
crew = Crew.fork(config, branch="experiment-a")
result = crew.kickoff(inputs={"strategy": "aggressive"})
```
`branch` 레이블은 선택 사항이며, 생략하면 자동 생성됩니다.
</Accordion>
<Accordion title="Crew, Flow, Agent 체크포인트" icon="cubes">
<Tabs>
<Tab title="Crew">
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
기본 트리거: `task_completed`.
</Tab>
<Tab title="Flow">
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
```
</Tab>
<Tab title="Agent">
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
</Tab>
</Tabs>
</Accordion>
<Accordion title="수동으로 체크포인트 기록" icon="code">
모든 이벤트에 핸들러를 등록하고 `state.checkpoint()`를 호출합니다.
<CodeGroup>
```python Sync
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"체크포인트 저장: {path}")
```
```python Async
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"체크포인트 저장: {path}")
```
</CodeGroup>
핸들러가 세 개의 매개변수를 받을 때 `state` 인수가 자동으로 제공됩니다. 전체 이벤트 카탈로그는 [Event Listeners](/ko/concepts/event-listener) 문서를 참조하세요.
</Accordion>
<Accordion title="CLI에서 탐색, 재개, 포크" icon="terminal">
```bash
crewai checkpoint # .checkpoints/ 또는 .checkpoints.db 자동 감지
crewai checkpoint --location ./my_checkpoints
crewai checkpoint --location ./.checkpoints.db
```
<Frame>
<img src="/images/checkpointing.png" alt="Checkpoint TUI" />
</Frame>
왼쪽 패널은 체크포인트를 브랜치별로 그룹화하며, 포크는 부모 아래에 중첩됩니다. 체크포인트를 선택하면 메타데이터, 엔티티 상태, 태스크 진행 상황이 표시됩니다. **Resume**은 실행을 계속하고, **Fork**는 새 브랜치를 시작합니다.
세부 정보 패널에는 두 개의 편집 가능한 영역이 있습니다:
- **Inputs** — 원래 kickoff의 입력으로, 미리 채워져 있으며 편집 가능합니다.
- **태스크 출력** — 완료된 태스크의 출력. 출력을 편집하고 **Fork**를 누르면 다운스트림 태스크가 무효화되어 수정된 컨텍스트로 다시 실행됩니다.
<Tip>
"what if" 탐색에 유용합니다: 포크, 조정, 관찰.
</Tip>
</Accordion>
<Accordion title="TUI 없이 체크포인트 검사" icon="magnifying-glass">
```bash
crewai checkpoint list ./my_checkpoints
crewai checkpoint info ./my_checkpoints/<file>.json
crewai checkpoint info ./.checkpoints.db
```
</Accordion>
</AccordionGroup>
## 레퍼런스
### `CheckpointConfig`
<ParamField path="location" type="str" default='"./.checkpoints"'>
스토리지 대상. `JsonProvider`는 디렉토리, `SqliteProvider`는 데이터베이스 파일 경로.
</ParamField>
<ParamField path="on_events" type="list[CheckpointEventType]" default='["task_completed"]'>
체크포인트를 트리거하는 이벤트 타입. `CheckpointEventType`은 `Literal`이므로 타입 체커가 자동 완성하고 지원되지 않는 값을 거부합니다. 전체 목록은 [이벤트 타입](#이벤트-타입) 참조.
</ParamField>
<ParamField path="provider" type="BaseProvider" default="JsonProvider()">
스토리지 백엔드. `JsonProvider` 또는 `SqliteProvider`.
</ParamField>
<ParamField path="max_checkpoints" type="int | None" default="None">
보관할 최대 체크포인트 수. 각 기록 후 가장 오래된 것이 제거됩니다.
</ParamField>
<ParamField path="restore_from" type="Path | str | None" default="None">
`from_checkpoint`를 통해 전달될 때 복원할 체크포인트.
</ParamField>
### `checkpoint` 필드 값
`Crew`, `Flow`, `Agent`에서 사용 가능.
<ParamField path="None" type="기본값">
부모에서 상속.
</ParamField>
<ParamField path="True" type="bool">
기본값으로 활성화.
</ParamField>
<ParamField path="False" type="bool">
명시적 옵트아웃. 상속을 중단합니다.
</ParamField>
<ParamField path="CheckpointConfig(...)" type="CheckpointConfig">
사용자 정의 설정.
</ParamField>
### 이벤트 타입
`on_events`는 `CheckpointEventType` 값의 임의 조합을 받습니다. 기본값 `["task_completed"]`는 완료된 태스크당 하나의 체크포인트를 기록하며, `["*"]`는 모든 이벤트와 일치합니다.
<Warning>
`["*"]` 및 `llm_call_completed`와 같은 고빈도 이벤트는 많은 체크포인트를 기록하고 성능을 저하시킬 수 있습니다. `max_checkpoints`와 함께 사용하세요.
체크포인팅은 초기 릴리스 단계입니다. API는 향후 버전에서 변경될 수 있습니다.
</Warning>
<Expandable title="지원되는 모든 이벤트">
## 개요
- **Task** — `task_started`, `task_completed`, `task_failed`, `task_evaluation`
- **Crew** — `crew_kickoff_started`, `crew_kickoff_completed`, `crew_kickoff_failed`, `crew_train_started`, `crew_train_completed`, `crew_train_failed`, `crew_test_started`, `crew_test_completed`, `crew_test_failed`, `crew_test_result`
- **Agent** — `agent_execution_started`, `agent_execution_completed`, `agent_execution_error`, `lite_agent_execution_started`, `lite_agent_execution_completed`, `lite_agent_execution_error`, `agent_evaluation_started`, `agent_evaluation_completed`, `agent_evaluation_failed`
- **Flow** — `flow_created`, `flow_started`, `flow_finished`, `flow_paused`, `method_execution_started`, `method_execution_finished`, `method_execution_failed`, `method_execution_paused`, `human_feedback_requested`, `human_feedback_received`, `flow_input_requested`, `flow_input_received`
- **LLM** — `llm_call_started`, `llm_call_completed`, `llm_call_failed`, `llm_stream_chunk`, `llm_thinking_chunk`
- **LLM Guardrail** — `llm_guardrail_started`, `llm_guardrail_completed`, `llm_guardrail_failed`
- **Tool** — `tool_usage_started`, `tool_usage_finished`, `tool_usage_error`, `tool_validate_input_error`, `tool_selection_error`, `tool_execution_error`
- **Memory** — `memory_save_started`, `memory_save_completed`, `memory_save_failed`, `memory_query_started`, `memory_query_completed`, `memory_query_failed`, `memory_retrieval_started`, `memory_retrieval_completed`, `memory_retrieval_failed`
- **Knowledge** — `knowledge_search_query_started`, `knowledge_search_query_completed`, `knowledge_query_started`, `knowledge_query_completed`, `knowledge_query_failed`, `knowledge_search_query_failed`
- **Reasoning** — `agent_reasoning_started`, `agent_reasoning_completed`, `agent_reasoning_failed`
- **MCP** — `mcp_connection_started`, `mcp_connection_completed`, `mcp_connection_failed`, `mcp_tool_execution_started`, `mcp_tool_execution_completed`, `mcp_tool_execution_failed`, `mcp_config_fetch_failed`
- **Observation** — `step_observation_started`, `step_observation_completed`, `step_observation_failed`, `plan_refinement`, `plan_replan_triggered`, `goal_achieved_early`
- **Skill** — `skill_discovery_started`, `skill_discovery_completed`, `skill_loaded`, `skill_activated`, `skill_load_failed`
- **Logging** — `agent_logs_started`, `agent_logs_execution`
- **A2A** — `a2a_delegation_started`, `a2a_delegation_completed`, `a2a_conversation_started`, `a2a_conversation_completed`, `a2a_message_sent`, `a2a_response_received`, `a2a_polling_started`, `a2a_polling_status`, `a2a_push_notification_registered`, `a2a_push_notification_received`, `a2a_push_notification_sent`, `a2a_push_notification_timeout`, `a2a_streaming_started`, `a2a_streaming_chunk`, `a2a_agent_card_fetched`, `a2a_authentication_failed`, `a2a_artifact_received`, `a2a_connection_error`, `a2a_server_task_started`, `a2a_server_task_completed`, `a2a_server_task_canceled`, `a2a_server_task_failed`, `a2a_parallel_delegation_started`, `a2a_parallel_delegation_completed`, `a2a_transport_negotiated`, `a2a_content_type_negotiated`, `a2a_context_created`, `a2a_context_expired`, `a2a_context_idle`, `a2a_context_completed`, `a2a_context_pruned`
- **시스템 시그널** — `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGTSTP`, `SIGCONT`
- **와일드카드** — `"*"`는 모든 이벤트와 일치합니다.
체크포인팅은 실행 중 자동으로 실행 상태를 저장합니다. 크루, 플로우 또는 에이전트가 실행 도중 실패하면 마지막 체크포인트에서 복원하여 이미 완료된 작업을 다시 실행하지 않고 재개할 수 있습니다.
</Expandable>
## 빠른 시작
### 스토리지 프로바이더
```python
from crewai import Crew, CheckpointConfig
<ParamField path="JsonProvider" type="provider">
체크포인트당 하나의 파일, `location` 내부에 `<timestamp>_<uuid>.json` 형식으로 명명.
</ParamField>
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=True, # 기본값 사용: ./.checkpoints, task_completed 이벤트
)
result = crew.kickoff()
```
<ParamField path="SqliteProvider" type="provider">
WAL 저널링이 있는 `location`의 단일 데이터베이스 파일.
</ParamField>
각 태스크가 완료된 후 `./.checkpoints/`에 체크포인트 파일이 기록됩니다.
### CLI
## 설정
| 명령 | 목적 |
|:-----|:-----|
| `crewai checkpoint` | TUI 실행; 스토리지 자동 감지. |
| `crewai checkpoint --location <path>` | 특정 위치에 대해 TUI 실행. |
| `crewai checkpoint list <path>` | 체크포인트 나열. |
| `crewai checkpoint info <path>` | 체크포인트 파일 또는 SQLite 데이터베이스의 최신 항목 검사. |
`CheckpointConfig`를 사용하여 세부 설정을 제어합니다:
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
### CheckpointConfig 필드
| 필드 | 타입 | 기본값 | 설명 |
|:-----|:-----|:-------|:-----|
| `location` | `str` | `"./.checkpoints"` | 체크포인트 파일 경로 |
| `on_events` | `list[str]` | `["task_completed"]` | 체크포인트를 트리거하는 이벤트 타입 |
| `provider` | `BaseProvider` | `JsonProvider()` | 스토리지 백엔드 |
| `max_checkpoints` | `int \| None` | `None` | 보관할 최대 파일 수; 오래된 것부터 삭제 |
### 상속 및 옵트아웃
Crew, Flow, Agent의 `checkpoint` 필드는 `CheckpointConfig`, `True`, `False`, `None`을 받습니다:
| 값 | 동작 |
|:---|:-----|
| `None` (기본값) | 부모에서 상속. 에이전트는 크루의 설정을 상속합니다. |
| `True` | 기본값으로 활성화. |
| `False` | 명시적 옵트아웃. 부모 상속을 중단합니다. |
| `CheckpointConfig(...)` | 사용자 정의 설정. |
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...), # 크루의 checkpoint 상속
Agent(role="Writer", ..., checkpoint=False), # 옵트아웃, 체크포인트 없음
],
tasks=[...],
checkpoint=True,
)
```
## 체크포인트에서 재개
```python
# 복원 및 재개
crew = Crew.from_checkpoint("./my_checkpoints/20260407T120000_abc123.json")
result = crew.kickoff() # 마지막으로 완료된 태스크부터 재개
```
복원된 크루는 이미 완료된 태스크를 건너뛰고 첫 번째 미완료 태스크부터 재개합니다.
## Crew, Flow, Agent에서 사용 가능
### Crew
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
기본 트리거: `task_completed` (완료된 태스크당 하나의 체크포인트).
### Flow
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
# 재개
flow = MyFlow.from_checkpoint("./flow_cp/20260407T120000_abc123.json")
result = flow.kickoff()
```
### Agent
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
## 스토리지 프로바이더
CrewAI는 두 가지 체크포인트 스토리지 프로바이더를 제공합니다.
### JsonProvider (기본값)
각 체크포인트를 별도의 JSON 파일로 저장합니다.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
### SqliteProvider
모든 체크포인트를 단일 SQLite 데이터베이스 파일에 저장합니다.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
),
)
```
## 이벤트 타입
`on_events` 필드는 이벤트 타입 문자열의 조합을 받습니다. 일반적인 선택:
| 사용 사례 | 이벤트 |
|:----------|:-------|
| 각 태스크 완료 후 (Crew) | `["task_completed"]` |
| 각 플로우 메서드 완료 후 | `["method_execution_finished"]` |
| 에이전트 실행 완료 후 | `["agent_execution_completed"]`, `["lite_agent_execution_completed"]` |
| 크루 완료 시에만 | `["crew_kickoff_completed"]` |
| 모든 LLM 호출 후 | `["llm_call_completed"]` |
| 모든 이벤트 | `["*"]` |
<Warning>
`["*"]` 또는 `llm_call_completed`와 같은 고빈도 이벤트를 사용하면 많은 체크포인트 파일이 생성되어 성능에 영향을 줄 수 있습니다. `max_checkpoints`를 사용하여 디스크 사용량을 제한하세요.
</Warning>
## 수동 체크포인팅
완전한 제어를 위해 자체 이벤트 핸들러를 등록하고 `state.checkpoint()`를 직접 호출할 수 있습니다:
```python
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
# 동기 핸들러
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"체크포인트 저장: {path}")
# 비동기 핸들러
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"체크포인트 저장: {path}")
```
`state` 인수는 핸들러가 3개의 매개변수를 받을 때 이벤트 버스가 자동으로 전달하는 `RuntimeState`입니다. [Event Listeners](/ko/concepts/event-listener) 문서에 나열된 모든 이벤트 타입에 핸들러를 등록할 수 있습니다.
체크포인팅은 best-effort입니다: 체크포인트 기록이 실패하면 오류가 로그에 기록되지만 실행은 중단 없이 계속됩니다.

View File

@@ -5,386 +5,225 @@ icon: floppy-disk
mode: "wide"
---
O checkpointing salva um snapshot do estado de execucao durante uma execucao para que uma crew, flow ou agente possa retomar apos uma falha ou ser bifurcado em uma branch alternativa.
<CardGroup cols={2}>
<Card title="Explicacao" icon="lightbulb" href="#explicacao">
Como o checkpointing funciona: eventos, armazenamento e heranca.
</Card>
<Card title="Tutorial" icon="graduation-cap" href="#tutorial-retomar-uma-crew-com-falha">
Um passo a passo de 5 minutos: executar, interromper, retomar.
</Card>
<Card title="Guias de uso" icon="screwdriver-wrench" href="#guias-de-uso">
Receitas focadas em tarefas para fluxos comuns.
</Card>
<Card title="Referencia" icon="book" href="#referencia">
`CheckpointConfig`, eventos, provedores e CLI.
</Card>
</CardGroup>
## Explicacao
### O que e um checkpoint
Um checkpoint captura tudo o que o CrewAI precisa para recriar uma execucao em andamento: o estado completo da crew, flow ou agente — configuracao, memoria e fontes de conhecimento dos agentes, progresso das tarefas, saidas intermediarias — junto com os inputs do kickoff, o historico de eventos ate aquele ponto e um ID de linhagem que liga o checkpoint a execucao de origem.
Restaurar reconstroi esse estado e continua. Tarefas concluidas sao puladas, memoria e conhecimento sao reidratados, e o trabalho downstream roda contra as mesmas saidas que a execucao original produziu. Fazer fork executa a mesma restauracao sob uma nova linhagem, para que a nova branch e a execucao original gravem checkpoints lado a lado sem sobrescrever uma a outra.
### Quando os checkpoints sao gravados
O checkpointing e orientado a eventos. O runtime se inscreve nos eventos selecionados em `on_events` e grava um checkpoint sempre que um e disparado. O padrao `task_completed` produz um checkpoint por tarefa finalizada — um equilibrio razoavel entre granularidade e uso de disco. Eventos de alta frequencia como `llm_call_completed` estao disponiveis para recuperacao mais granular, mas gravam muito mais arquivos.
### Armazenamento
Dois provedores acompanham o CrewAI:
- `JsonProvider` grava um arquivo por checkpoint. Legivel e facil de inspecionar.
- `SqliteProvider` grava em um unico banco SQLite. Melhor para checkpointing de alta frequencia.
Ambos removem os checkpoints mais antigos quando `max_checkpoints` esta definido.
<Note>
As gravacoes de checkpoint sao best-effort. Um checkpoint que falha e registrado em log, mas nao interrompe a execucao.
</Note>
### Modelo de heranca
`Crew`, `Flow` e `Agent` aceitam um argumento `checkpoint`. Filhos herdam do pai a menos que definam seu proprio valor ou passem `False` para desativar. Ative o checkpointing uma vez na crew e todos os agentes participam, ou exclua um agente seletivamente.
## Tutorial: Retomar uma crew com falha
Este passo a passo leva cerca de 5 minutos. Voce executara uma crew de duas tarefas, a interrompera no meio e a retomara a partir do checkpoint salvo.
<Steps>
<Step title="Crie a crew com checkpointing ativado">
```python
from crewai import Agent, Crew, Task
researcher = Agent(role="Researcher", goal="Research", backstory="Expert")
writer = Agent(role="Writer", goal="Write", backstory="Expert")
crew = Crew(
agents=[researcher, writer],
tasks=[
Task(description="Research AI trends", agent=researcher, expected_output="bullets"),
Task(description="Write a summary", agent=writer, expected_output="paragraph"),
],
checkpoint=True,
)
```
</Step>
<Step title="Execute e interrompa apos a primeira tarefa">
```python
result = crew.kickoff()
```
Pressione `Ctrl+C` apos a primeira tarefa concluir. Em `./.checkpoints/`, um arquivo `<timestamp>_<uuid>.json` e o checkpoint.
</Step>
<Step title="Retome a partir do checkpoint">
```python
from crewai import CheckpointConfig
result = crew.kickoff(
from_checkpoint=CheckpointConfig(
restore_from="./.checkpoints/<timestamp>_<uuid>.json",
),
)
```
A tarefa de pesquisa e pulada, o escritor executa contra a saida de pesquisa salva e a crew finaliza.
</Step>
</Steps>
## Guias de uso
<AccordionGroup>
<Accordion title="Ativar checkpointing com padroes" icon="play">
```python
crew = Crew(agents=[...], tasks=[...], checkpoint=True)
```
Grava em `./.checkpoints/` em cada `task_completed`.
</Accordion>
<Accordion title="Personalizar armazenamento e frequencia" icon="sliders">
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
</Accordion>
<Accordion title="Escolher um provedor de armazenamento" icon="database">
<CodeGroup>
```python JsonProvider
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
```python SqliteProvider
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
max_checkpoints=50,
),
)
```
</CodeGroup>
<Tip>
O SQLite ativa o modo journal WAL para leituras concorrentes. Prefira-o para checkpointing de alta frequencia.
</Tip>
</Accordion>
<Accordion title="Desativar um agente especifico" icon="user-slash">
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...),
Agent(role="Writer", ..., checkpoint=False),
],
tasks=[...],
checkpoint=True,
)
```
</Accordion>
<Accordion title="Fazer fork em uma nova branch" icon="code-branch">
`fork()` restaura um checkpoint sob uma nova linhagem para que a nova execucao nao colida com a original.
```python
config = CheckpointConfig(restore_from="./my_checkpoints/<file>.json")
crew = Crew.fork(config, branch="experiment-a")
result = crew.kickoff(inputs={"strategy": "aggressive"})
```
O label `branch` e opcional; um e gerado se omitido.
</Accordion>
<Accordion title="Checkpoint em Crew, Flow ou Agent" icon="cubes">
<Tabs>
<Tab title="Crew">
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
Gatilho padrao: `task_completed`.
</Tab>
<Tab title="Flow">
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
```
</Tab>
<Tab title="Agent">
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
</Tab>
</Tabs>
</Accordion>
<Accordion title="Gravar um checkpoint manualmente" icon="code">
Registre um handler em qualquer evento e chame `state.checkpoint()`.
<CodeGroup>
```python Sync
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"Checkpoint salvo: {path}")
```
```python Async
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"Checkpoint salvo: {path}")
```
</CodeGroup>
Um argumento `state` e fornecido automaticamente quando o handler recebe tres parametros. Veja [Event Listeners](/pt-BR/concepts/event-listener) para o catalogo completo de eventos.
</Accordion>
<Accordion title="Navegar, retomar e fazer fork pela CLI" icon="terminal">
```bash
crewai checkpoint # detecta automaticamente .checkpoints/ ou .checkpoints.db
crewai checkpoint --location ./my_checkpoints
crewai checkpoint --location ./.checkpoints.db
```
<Frame>
<img src="/images/checkpointing.png" alt="Checkpoint TUI" />
</Frame>
O painel esquerdo agrupa checkpoints por branch; forks aninham sob seu pai. Selecionar um checkpoint mostra seus metadados, estado da entidade e progresso das tarefas. **Resume** continua a execucao; **Fork** inicia uma nova branch.
O painel de detalhes expoe duas areas editaveis:
- **Inputs** — os inputs originais do kickoff, preenchidos e editaveis.
- **Saidas das tarefas** — saidas das tarefas concluidas. Editar uma saida e pressionar **Fork** invalida tarefas downstream para que sejam reexecutadas com o contexto modificado.
<Tip>
Util para exploracao de cenarios: fork, ajuste, observe.
</Tip>
</Accordion>
<Accordion title="Inspecionar checkpoints sem a TUI" icon="magnifying-glass">
```bash
crewai checkpoint list ./my_checkpoints
crewai checkpoint info ./my_checkpoints/<file>.json
crewai checkpoint info ./.checkpoints.db
```
</Accordion>
</AccordionGroup>
## Referencia
### `CheckpointConfig`
<ParamField path="location" type="str" default='"./.checkpoints"'>
Destino do armazenamento. Diretorio para `JsonProvider`, caminho de arquivo de banco para `SqliteProvider`.
</ParamField>
<ParamField path="on_events" type="list[CheckpointEventType]" default='["task_completed"]'>
Tipos de evento que disparam um checkpoint. `CheckpointEventType` e um `Literal` — seu type checker autocompleta e rejeita valores nao suportados. Veja [tipos de evento](#tipos-de-evento) para a lista completa.
</ParamField>
<ParamField path="provider" type="BaseProvider" default="JsonProvider()">
Backend de armazenamento. `JsonProvider` ou `SqliteProvider`.
</ParamField>
<ParamField path="max_checkpoints" type="int | None" default="None">
Maximo de checkpoints a reter. Os mais antigos sao removidos apos cada gravacao.
</ParamField>
<ParamField path="restore_from" type="Path | str | None" default="None">
Checkpoint a restaurar quando passado via `from_checkpoint`.
</ParamField>
### Valores do campo `checkpoint`
Aceito por `Crew`, `Flow` e `Agent`.
<ParamField path="None" type="padrao">
Herda do pai.
</ParamField>
<ParamField path="True" type="bool">
Ativa com padroes.
</ParamField>
<ParamField path="False" type="bool">
Desativacao explicita. Interrompe a heranca.
</ParamField>
<ParamField path="CheckpointConfig(...)" type="CheckpointConfig">
Configuracao personalizada.
</ParamField>
### Tipos de evento
`on_events` aceita qualquer combinacao de valores `CheckpointEventType`. O padrao `["task_completed"]` grava um checkpoint por tarefa finalizada; `["*"]` corresponde a todos os eventos.
<Warning>
`["*"]` e eventos de alta frequencia como `llm_call_completed` gravam muitos checkpoints e podem degradar o desempenho. Combine com `max_checkpoints`.
O checkpointing esta em versao inicial. As APIs podem mudar em versoes futuras.
</Warning>
<Expandable title="Todos os eventos suportados">
## Visao Geral
- **Task** — `task_started`, `task_completed`, `task_failed`, `task_evaluation`
- **Crew** — `crew_kickoff_started`, `crew_kickoff_completed`, `crew_kickoff_failed`, `crew_train_started`, `crew_train_completed`, `crew_train_failed`, `crew_test_started`, `crew_test_completed`, `crew_test_failed`, `crew_test_result`
- **Agent** — `agent_execution_started`, `agent_execution_completed`, `agent_execution_error`, `lite_agent_execution_started`, `lite_agent_execution_completed`, `lite_agent_execution_error`, `agent_evaluation_started`, `agent_evaluation_completed`, `agent_evaluation_failed`
- **Flow** — `flow_created`, `flow_started`, `flow_finished`, `flow_paused`, `method_execution_started`, `method_execution_finished`, `method_execution_failed`, `method_execution_paused`, `human_feedback_requested`, `human_feedback_received`, `flow_input_requested`, `flow_input_received`
- **LLM** — `llm_call_started`, `llm_call_completed`, `llm_call_failed`, `llm_stream_chunk`, `llm_thinking_chunk`
- **LLM Guardrail** — `llm_guardrail_started`, `llm_guardrail_completed`, `llm_guardrail_failed`
- **Tool** — `tool_usage_started`, `tool_usage_finished`, `tool_usage_error`, `tool_validate_input_error`, `tool_selection_error`, `tool_execution_error`
- **Memory** — `memory_save_started`, `memory_save_completed`, `memory_save_failed`, `memory_query_started`, `memory_query_completed`, `memory_query_failed`, `memory_retrieval_started`, `memory_retrieval_completed`, `memory_retrieval_failed`
- **Knowledge** — `knowledge_search_query_started`, `knowledge_search_query_completed`, `knowledge_query_started`, `knowledge_query_completed`, `knowledge_query_failed`, `knowledge_search_query_failed`
- **Reasoning** — `agent_reasoning_started`, `agent_reasoning_completed`, `agent_reasoning_failed`
- **MCP** — `mcp_connection_started`, `mcp_connection_completed`, `mcp_connection_failed`, `mcp_tool_execution_started`, `mcp_tool_execution_completed`, `mcp_tool_execution_failed`, `mcp_config_fetch_failed`
- **Observation** — `step_observation_started`, `step_observation_completed`, `step_observation_failed`, `plan_refinement`, `plan_replan_triggered`, `goal_achieved_early`
- **Skill** — `skill_discovery_started`, `skill_discovery_completed`, `skill_loaded`, `skill_activated`, `skill_load_failed`
- **Logging** — `agent_logs_started`, `agent_logs_execution`
- **A2A** — `a2a_delegation_started`, `a2a_delegation_completed`, `a2a_conversation_started`, `a2a_conversation_completed`, `a2a_message_sent`, `a2a_response_received`, `a2a_polling_started`, `a2a_polling_status`, `a2a_push_notification_registered`, `a2a_push_notification_received`, `a2a_push_notification_sent`, `a2a_push_notification_timeout`, `a2a_streaming_started`, `a2a_streaming_chunk`, `a2a_agent_card_fetched`, `a2a_authentication_failed`, `a2a_artifact_received`, `a2a_connection_error`, `a2a_server_task_started`, `a2a_server_task_completed`, `a2a_server_task_canceled`, `a2a_server_task_failed`, `a2a_parallel_delegation_started`, `a2a_parallel_delegation_completed`, `a2a_transport_negotiated`, `a2a_content_type_negotiated`, `a2a_context_created`, `a2a_context_expired`, `a2a_context_idle`, `a2a_context_completed`, `a2a_context_pruned`
- **Sinais de sistema** — `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGTSTP`, `SIGCONT`
- **Wildcard** — `"*"` corresponde a todos os eventos.
O checkpointing salva automaticamente o estado de execucao durante uma execucao. Se uma crew, flow ou agente falhar no meio da execucao, voce pode restaurar a partir do ultimo checkpoint e retomar sem reexecutar o trabalho ja concluido.
</Expandable>
## Inicio Rapido
### Provedores de armazenamento
```python
from crewai import Crew, CheckpointConfig
<ParamField path="JsonProvider" type="provider">
Um arquivo por checkpoint, nomeado `<timestamp>_<uuid>.json` dentro de `location`.
</ParamField>
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=True, # usa padroes: ./.checkpoints, em task_completed
)
result = crew.kickoff()
```
<ParamField path="SqliteProvider" type="provider">
Arquivo de banco unico em `location` com journaling WAL.
</ParamField>
Os arquivos de checkpoint sao gravados em `./.checkpoints/` apos cada tarefa concluida.
### CLI
## Configuracao
| Comando | Proposito |
|:--------|:----------|
| `crewai checkpoint` | Inicia a TUI; detecta o armazenamento automaticamente. |
| `crewai checkpoint --location <path>` | Inicia a TUI em uma localizacao especifica. |
| `crewai checkpoint list <path>` | Lista checkpoints. |
| `crewai checkpoint info <path>` | Inspeciona um arquivo de checkpoint ou a entrada mais recente em um banco SQLite. |
Use `CheckpointConfig` para controle total:
```python
from crewai import Crew, CheckpointConfig
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
on_events=["task_completed", "crew_kickoff_completed"],
max_checkpoints=5,
),
)
```
### Campos do CheckpointConfig
| Campo | Tipo | Padrao | Descricao |
|:------|:-----|:-------|:----------|
| `location` | `str` | `"./.checkpoints"` | Caminho para os arquivos de checkpoint |
| `on_events` | `list[str]` | `["task_completed"]` | Tipos de evento que acionam um checkpoint |
| `provider` | `BaseProvider` | `JsonProvider()` | Backend de armazenamento |
| `max_checkpoints` | `int \| None` | `None` | Maximo de arquivos a manter; os mais antigos sao removidos primeiro |
### Heranca e Desativacao
O campo `checkpoint` em Crew, Flow e Agent aceita `CheckpointConfig`, `True`, `False` ou `None`:
| Valor | Comportamento |
|:------|:--------------|
| `None` (padrao) | Herda do pai. Um agente herda a configuracao da crew. |
| `True` | Ativa com padroes. |
| `False` | Desativacao explicita. Interrompe a heranca do pai. |
| `CheckpointConfig(...)` | Configuracao personalizada. |
```python
crew = Crew(
agents=[
Agent(role="Researcher", ...), # herda checkpoint da crew
Agent(role="Writer", ..., checkpoint=False), # desativado, sem checkpoints
],
tasks=[...],
checkpoint=True,
)
```
## Retomando a partir de um Checkpoint
```python
# Restaurar e retomar
crew = Crew.from_checkpoint("./my_checkpoints/20260407T120000_abc123.json")
result = crew.kickoff() # retoma a partir da ultima tarefa concluida
```
A crew restaurada pula tarefas ja concluidas e retoma a partir da primeira incompleta.
## Funciona em Crew, Flow e Agent
### Crew
```python
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task, review_task],
checkpoint=CheckpointConfig(location="./crew_cp"),
)
```
Gatilho padrao: `task_completed` (um checkpoint por tarefa finalizada).
### Flow
```python
from crewai.flow.flow import Flow, start, listen
from crewai import CheckpointConfig
class MyFlow(Flow):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, data):
return process(data)
flow = MyFlow(
checkpoint=CheckpointConfig(
location="./flow_cp",
on_events=["method_execution_finished"],
),
)
result = flow.kickoff()
# Retomar
flow = MyFlow.from_checkpoint("./flow_cp/20260407T120000_abc123.json")
result = flow.kickoff()
```
### Agent
```python
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
checkpoint=CheckpointConfig(
location="./agent_cp",
on_events=["lite_agent_execution_completed"],
),
)
result = agent.kickoff(messages=[{"role": "user", "content": "Research AI trends"}])
```
## Provedores de Armazenamento
O CrewAI inclui dois provedores de armazenamento para checkpoints.
### JsonProvider (padrao)
Grava cada checkpoint como um arquivo JSON separado.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import JsonProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./my_checkpoints",
provider=JsonProvider(),
max_checkpoints=5,
),
)
```
### SqliteProvider
Armazena todos os checkpoints em um unico arquivo SQLite.
```python
from crewai import Crew, CheckpointConfig
from crewai.state import SqliteProvider
crew = Crew(
agents=[...],
tasks=[...],
checkpoint=CheckpointConfig(
location="./.checkpoints.db",
provider=SqliteProvider(),
),
)
```
## Tipos de Evento
O campo `on_events` aceita qualquer combinacao de strings de tipo de evento. Escolhas comuns:
| Caso de Uso | Eventos |
|:------------|:--------|
| Apos cada tarefa (Crew) | `["task_completed"]` |
| Apos cada metodo do flow | `["method_execution_finished"]` |
| Apos execucao do agente | `["agent_execution_completed"]`, `["lite_agent_execution_completed"]` |
| Apenas na conclusao da crew | `["crew_kickoff_completed"]` |
| Apos cada chamada LLM | `["llm_call_completed"]` |
| Em tudo | `["*"]` |
<Warning>
Usar `["*"]` ou eventos de alta frequencia como `llm_call_completed` gravara muitos arquivos de checkpoint e pode impactar o desempenho. Use `max_checkpoints` para limitar o uso de disco.
</Warning>
## Checkpointing Manual
Para controle total, registre seu proprio handler de evento e chame `state.checkpoint()` diretamente:
```python
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent
# Handler sincrono
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_done(source, event, state):
path = state.checkpoint("./my_checkpoints")
print(f"Checkpoint salvo: {path}")
# Handler assincrono
@crewai_event_bus.on(LLMCallCompletedEvent)
async def on_llm_done_async(source, event, state):
path = await state.acheckpoint("./my_checkpoints")
print(f"Checkpoint salvo: {path}")
```
O argumento `state` e o `RuntimeState` passado automaticamente pelo barramento de eventos quando seu handler aceita 3 parametros. Voce pode registrar handlers em qualquer tipo de evento listado na documentacao de [Event Listeners](/pt-BR/concepts/event-listener).
O checkpointing e best-effort: se uma gravacao de checkpoint falhar, o erro e registrado no log, mas a execucao continua sem interrupcao.

View File

@@ -232,9 +232,6 @@ class A2UIClientExtension:
continue
data = root.data
if not isinstance(data, dict):
continue
surface_id = _get_surface_id(data)
if not surface_id:
continue

View File

@@ -258,8 +258,6 @@ def validate_catalog_components_v09(message: A2UIMessageV09) -> None:
errors: list[Any] = []
for entry in message.update_components.components:
if not isinstance(entry, dict):
continue
type_name = entry.get("component")
if not isinstance(type_name, str):
continue

View File

@@ -178,7 +178,7 @@ class StreamingHandler:
is_final=is_final_update,
)
elif isinstance(event, Message):
elif isinstance(event, Message): # type: ignore[unreachable]
new_messages.append(event)
result_parts.extend(
part.root.text

View File

@@ -317,9 +317,7 @@ def get_part_content_type(part: Part) -> str:
if mime == APPLICATION_A2UI_JSON:
return APPLICATION_A2UI_JSON
return APPLICATION_JSON
if root.kind == "file":
return root.file.mime_type or APPLICATION_OCTET_STREAM
return APPLICATION_OCTET_STREAM
return root.file.mime_type or APPLICATION_OCTET_STREAM
def validate_message_parts(

View File

@@ -1109,14 +1109,9 @@ class Agent(BaseAgent):
"""
if self.agent_executor is None:
raise RuntimeError("Agent executor is not initialized.")
if not isinstance(self.llm, BaseLLM):
raise RuntimeError(
"LLM must be resolved before updating agent executor parameters."
)
if task is not None:
self.agent_executor.task = task
self.agent_executor.llm = self.llm
self.agent_executor.tools = tools
self.agent_executor.original_tools = raw_tools
self.agent_executor.prompt = prompt
@@ -1416,11 +1411,6 @@ class Agent(BaseAgent):
if _is_resuming_agent_executor(self.agent_executor):
executor = self.agent_executor
if not isinstance(self.llm, BaseLLM):
raise RuntimeError(
"LLM must be resolved before resuming agent executor."
)
executor.llm = self.llm
executor.tools = parsed_tools
executor.tools_names = get_tool_names(parsed_tools)
executor.tools_description = render_text_description_and_args(parsed_tools)

View File

@@ -112,9 +112,6 @@ class BaseConverterAdapter(ABC):
Returns:
Extracted JSON string if found and valid, otherwise the original result.
"""
if not isinstance(result, str):
return str(result)
if valid := BaseConverterAdapter._validate_json(result):
return valid

View File

@@ -46,8 +46,8 @@ class LangGraphToolAdapter(BaseToolAdapter):
else:
all_tools = tools
for tool in all_tools:
if isinstance(tool, LangChainBaseTool):
converted_tools.append(tool)
if isinstance(tool, LangChainBaseTool): # type: ignore[unreachable]
converted_tools.append(tool) # type: ignore[unreachable]
continue
sanitized_name: str = self.sanitize_tool_name(tool.name)

View File

@@ -4,7 +4,6 @@ This module contains the OpenAIAgentToolAdapter class that converts CrewAI tools
to OpenAI Assistant-compatible format using the agents library.
"""
from collections.abc import Awaitable
import inspect
import json
from typing import Any, cast
@@ -114,12 +113,8 @@ class OpenAIAgentToolAdapter(BaseToolAdapter):
else:
args_dict = {param_name: str(arguments)}
output: Any | Awaitable[Any] = tool._run(**args_dict)
if inspect.isawaitable(output):
result: Any = await output
else:
result = output
output: Any = tool._run(**args_dict)
result: Any = await output if inspect.isawaitable(output) else output
if isinstance(result, (dict, list, str, int, float, bool, type(None))):
return result

View File

@@ -569,9 +569,6 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
if not self._token_process:
self._token_process = TokenProcess()
if self.security_config is None:
self.security_config = SecurityConfig()
return self
@field_validator("id", mode="before")
@@ -621,7 +618,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
task: Any,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> str:
) -> str | BaseModel:
pass
@abstractmethod
@@ -630,7 +627,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
task: Any,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> str:
) -> str | BaseModel:
"""Execute a task asynchronously."""
@abstractmethod

View File

@@ -334,7 +334,7 @@ class CrewAgentExecutor(BaseAgentExecutor):
Returns:
Final answer from the agent.
"""
formatted_answer = None
formatted_answer: AgentAction | AgentFinish | None = None
while not isinstance(formatted_answer, AgentFinish):
try:
if has_reached_max_iterations(self.iterations, self.max_iter):
@@ -385,12 +385,12 @@ class CrewAgentExecutor(BaseAgentExecutor):
)
formatted_answer = process_llm_response(
answer_str, self.use_stop_words
) # type: ignore[assignment]
)
else:
answer_str = str(answer) if not isinstance(answer, str) else answer
formatted_answer = process_llm_response(
answer_str, self.use_stop_words
) # type: ignore[assignment]
)
if isinstance(formatted_answer, AgentAction):
fingerprint_context = {}
@@ -425,7 +425,7 @@ class CrewAgentExecutor(BaseAgentExecutor):
self._append_message(formatted_answer.text)
except OutputParserError as e:
formatted_answer = handle_output_parser_exception( # type: ignore[assignment]
formatted_answer = handle_output_parser_exception(
e=e,
messages=self.messages,
iterations=self.iterations,
@@ -1145,7 +1145,7 @@ class CrewAgentExecutor(BaseAgentExecutor):
Returns:
Final answer from the agent.
"""
formatted_answer = None
formatted_answer: AgentAction | AgentFinish | None = None
while not isinstance(formatted_answer, AgentFinish):
try:
if has_reached_max_iterations(self.iterations, self.max_iter):
@@ -1197,12 +1197,12 @@ class CrewAgentExecutor(BaseAgentExecutor):
)
formatted_answer = process_llm_response(
answer_str, self.use_stop_words
) # type: ignore[assignment]
)
else:
answer_str = str(answer) if not isinstance(answer, str) else answer
formatted_answer = process_llm_response(
answer_str, self.use_stop_words
) # type: ignore[assignment]
)
if isinstance(formatted_answer, AgentAction):
fingerprint_context = {}
@@ -1237,7 +1237,7 @@ class CrewAgentExecutor(BaseAgentExecutor):
self._append_message(formatted_answer.text)
except OutputParserError as e:
formatted_answer = handle_output_parser_exception( # type: ignore[assignment]
formatted_answer = handle_output_parser_exception(
e=e,
messages=self.messages,
iterations=self.iterations,

View File

@@ -308,15 +308,11 @@ class StepExecutor:
if isinstance(formatted, AgentFinish):
return str(formatted.output)
if isinstance(formatted, AgentAction):
tool_calls_made.append(formatted.tool)
tool_result = self._execute_text_tool_with_events(formatted)
last_tool_result = tool_result
messages.append({"role": "assistant", "content": answer_str})
messages.append(self._build_observation_message(tool_result))
continue
return answer_str
tool_calls_made.append(formatted.tool)
tool_result = self._execute_text_tool_with_events(formatted)
last_tool_result = tool_result
messages.append({"role": "assistant", "content": answer_str})
messages.append(self._build_observation_message(tool_result))
return last_tool_result

View File

@@ -39,10 +39,7 @@ class ToolsHandler(BaseModel):
if self.cache and should_cache and calling.tool_name != CacheTools().name:
input_str = ""
if calling.arguments:
if isinstance(calling.arguments, dict):
input_str = json.dumps(calling.arguments)
else:
input_str = str(calling.arguments)
input_str = json.dumps(calling.arguments)
self.cache.add(
tool=calling.tool_name,

View File

@@ -443,20 +443,16 @@ class Crew(FlowTrackable, BaseModel):
if node.event.type == "task_started" and node.event.task_id:
started_task_ids.add(node.event.task_id)
is_hierarchical = self.process == Process.hierarchical
resuming_task_agent_roles: set[str] = set()
for task in self.tasks:
if task.output is not None or str(task.id) not in started_task_ids:
continue
executing_agent = self.manager_agent if is_hierarchical else task.agent
if executing_agent is not None:
resuming_task_agent_roles.add(executing_agent.role)
if (
task.output is None
and task.agent is not None
and str(task.id) in started_task_ids
):
resuming_task_agent_roles.add(task.agent.role)
candidate_agents: list[BaseAgent] = list(self.agents)
if self.manager_agent is not None:
candidate_agents.append(self.manager_agent)
for agent in candidate_agents:
for agent in self.agents:
agent.crew = self
executor = agent.agent_executor
if (
@@ -471,7 +467,7 @@ class Crew(FlowTrackable, BaseModel):
agent.agent_executor = None
for task in self.tasks:
if task.agent is not None:
for agent in candidate_agents:
for agent in self.agents:
if agent.role == task.agent.role:
task.agent = agent
if agent.agent_executor is not None and task.output is None:
@@ -540,9 +536,25 @@ class Crew(FlowTrackable, BaseModel):
if state is None:
return
# Restore crew scope and the in-progress task scope. Inner scopes
# (agent, llm, tool) are re-created by the executor on resume.
stack: list[tuple[str, str]] = []
if self._kickoff_event_id:
stack.append((self._kickoff_event_id, "crew_kickoff_started"))
# Find the task_started event for the in-progress task (skipped on resume)
for task in self.tasks:
if task.output is None:
task_id_str = str(task.id)
for node in state.event_record.nodes.values():
if (
node.event.type == "task_started"
and node.event.task_id == task_id_str
):
stack.append((node.event.event_id, "task_started"))
break
break
restore_event_scope(tuple(stack))
# Restore last_event_id and emission counter from the record

View File

@@ -166,7 +166,7 @@ class CrewAIEventsBus:
with self._instance_lock:
if self._executor_initialized:
return
return # type: ignore[unreachable]
self._sync_executor = ThreadPoolExecutor(
max_workers=10,
@@ -304,7 +304,7 @@ class CrewAIEventsBus:
from crewai import RuntimeState
if RuntimeState is None:
logger.warning(
logger.warning( # type: ignore[unreachable]
"RuntimeState unavailable; skipping entity registration."
)
return
@@ -428,7 +428,7 @@ class CrewAIEventsBus:
if cached_plan is None:
with self._rwlock.w_locked():
if self._shutting_down:
return
return # type: ignore[unreachable]
cached_plan = self._execution_plan_cache.get(event_type)
if cached_plan is None:
sync_handlers = self._sync_handlers.get(event_type, frozenset())

View File

@@ -138,36 +138,6 @@ def restore_event_scope(stack: tuple[tuple[str, str], ...]) -> None:
_event_id_stack.set(stack)
def resume_task_scope(task_id: str) -> bool:
"""Push the latest recorded ``task_started`` scope for a task.
Args:
task_id: The task identifier to look up in the active event record.
Returns:
``True`` if a prior scope was pushed; ``False`` otherwise.
"""
from crewai.events.event_bus import crewai_event_bus
state = crewai_event_bus._runtime_state
if state is None:
return False
latest_event_id: str | None = None
latest_seq = -1
for node in list(state.event_record.nodes.values()):
ev = node.event
if ev.type != "task_started" or ev.task_id != task_id:
continue
seq = ev.emission_sequence or 0
if seq > latest_seq:
latest_seq = seq
latest_event_id = ev.event_id
if latest_event_id is None:
return False
push_event_scope(latest_event_id, "task_started")
return True
def push_event_scope(event_id: str, event_type: str = "") -> None:
"""Push an event ID and type onto the scope stack."""
config = _event_context_config.get() or _default_config

View File

@@ -291,7 +291,9 @@ class TraceBatchManager:
)
if response is None:
logger.warning("Failed to send trace events. Events will be lost.")
logger.warning( # type: ignore[unreachable]
"Failed to send trace events. Events will be lost."
)
return 500
if response.status_code in [200, 201]:

View File

@@ -173,10 +173,8 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
executor_type: Literal["experimental"] = "experimental"
suppress_flow_events: bool = True # always suppress for executor
llm: BaseLLM | None = Field(default=None, exclude=True)
prompt: SystemPromptResult | StandardPromptResult | None = Field(
default=None, exclude=True
)
llm: BaseLLM = Field(exclude=True)
prompt: SystemPromptResult | StandardPromptResult = Field(exclude=True)
max_iter: int = Field(default=25, exclude=True)
tools: list[CrewStructuredTool] = Field(default_factory=list, exclude=True)
tools_names: str = Field(default="", exclude=True)
@@ -2587,11 +2585,6 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
self._kickoff_input = inputs.get("input", "")
if self.llm is None or self.prompt is None:
raise RuntimeError(
"AgentExecutor.llm or .prompt is unset; the executor was "
"not fully restored or initialized before execution."
)
if "system" in self.prompt:
from crewai.llms.cache import mark_cache_breakpoint
@@ -2623,7 +2616,9 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
with _llm_stop_words_applied(self.llm, self):
self.kickoff()
formatted_answer = self.state.current_answer
formatted_answer: AgentAction | AgentFinish | None = (
self.state.current_answer
)
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
@@ -2693,11 +2688,6 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
self._kickoff_input = inputs.get("input", "")
if self.llm is None or self.prompt is None:
raise RuntimeError(
"AgentExecutor.llm or .prompt is unset; the executor was "
"not fully restored or initialized before execution."
)
if "system" in self.prompt:
from crewai.llms.cache import mark_cache_breakpoint
@@ -2729,7 +2719,9 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
with _llm_stop_words_applied(self.llm, self):
await self.kickoff_async()
formatted_answer = self.state.current_answer
formatted_answer: AgentAction | AgentFinish | None = (
self.state.current_answer
)
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(

View File

@@ -962,7 +962,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
model_config = ConfigDict(
arbitrary_types_allowed=True,
ignored_types=(StartMethod, ListenMethod, RouterMethod),
ignored_types=(FlowMethod,),
revalidate_instances="never",
)
__hash__ = object.__hash__
@@ -3009,8 +3009,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self._pending_and_listeners.pop(pending_key, None)
return True
return False
return False
def _find_triggered_methods(

View File

@@ -28,7 +28,7 @@ import inspect
import logging
import re
import textwrap
from typing import Any, TypedDict, get_args, get_origin
from typing import Any, Literal, TypeAlias, TypedDict, get_args, get_origin
from pydantic import BaseModel
from pydantic_core import PydanticUndefined
@@ -44,6 +44,8 @@ from crewai.flow.flow_wrappers import (
logger = logging.getLogger(__name__)
MethodType: TypeAlias = Literal["start", "listen", "router", "start_router"]
class MethodInfo(TypedDict, total=False):
"""Information about a single flow method.
@@ -59,7 +61,7 @@ class MethodInfo(TypedDict, total=False):
"""
name: str
type: str
type: MethodType
trigger_methods: list[str]
condition_type: str | None
router_paths: list[str]
@@ -132,7 +134,7 @@ def _get_method_type(
method: Any,
start_methods: list[str],
routers: set[str],
) -> str:
) -> MethodType:
"""Determine the type of a flow method.
Args:
@@ -191,7 +193,6 @@ def _detect_crew_reference(method: Any) -> bool:
True if crew reference detected, False otherwise.
"""
try:
# Get the underlying function from wrapper
func = method
if hasattr(method, "_meth"):
func = method._meth
@@ -201,12 +202,11 @@ def _detect_crew_reference(method: Any) -> bool:
source = inspect.getsource(func)
source = textwrap.dedent(source)
# Patterns that indicate Crew usage
crew_patterns = [
r"\.crew\(\)", # .crew() method call
r"Crew\s*\(", # Crew( instantiation
r":\s*Crew\b", # Type hint with Crew
r"->.*Crew", # Return type hint with Crew
r"\.crew\(\)",
r"Crew\s*\(",
r":\s*Crew\b",
r"->.*Crew",
]
for pattern in crew_patterns:
@@ -215,7 +215,6 @@ def _detect_crew_reference(method: Any) -> bool:
return False
except (OSError, TypeError):
# Can't get source code - assume no crew reference
return False
@@ -231,11 +230,9 @@ def _extract_trigger_methods(method: Any) -> tuple[list[str], str | None]:
trigger_methods: list[str] = []
condition_type: str | None = None
# First try __trigger_methods__ (populated for simple conditions)
if hasattr(method, "__trigger_methods__") and method.__trigger_methods__:
trigger_methods = [str(m) for m in method.__trigger_methods__]
# For complex conditions (or_/and_ combinators), extract from __trigger_condition__
if (
not trigger_methods
and hasattr(method, "__trigger_condition__")
@@ -264,11 +261,9 @@ def _extract_router_paths(
"""
method_name = getattr(method, "__name__", "")
# First check if there are __router_paths__ on the method itself
if hasattr(method, "__router_paths__") and method.__router_paths__:
return [str(p) for p in method.__router_paths__]
# Then check the class-level registry
if method_name in router_paths_registry:
return [str(p) for p in router_paths_registry[method_name]]
@@ -276,39 +271,15 @@ def _extract_router_paths(
def _extract_all_methods_from_condition(
condition: str | FlowCondition | dict[str, Any] | list[Any],
condition: str | FlowCondition,
) -> list[str]:
"""Extract all method names from a condition tree recursively.
Args:
condition: Can be a string, FlowCondition tuple, dict, or list.
Returns:
List of all method names found in the condition.
"""
"""Extract all method names from a condition tree recursively."""
if isinstance(condition, str):
return [condition]
if isinstance(condition, tuple) and len(condition) == 2:
# FlowCondition: (condition_type, methods_list)
_, methods = condition
if isinstance(methods, list):
result: list[str] = []
for m in methods:
result.extend(_extract_all_methods_from_condition(m))
return result
return []
if isinstance(condition, dict):
conditions_list = condition.get("conditions", [])
dict_methods: list[str] = []
for sub_cond in conditions_list:
dict_methods.extend(_extract_all_methods_from_condition(sub_cond))
return dict_methods
if isinstance(condition, list):
list_methods: list[str] = []
for item in condition:
list_methods.extend(_extract_all_methods_from_condition(item))
return list_methods
return []
methods: list[str] = []
for sub_cond in condition.get("conditions", []):
methods.extend(_extract_all_methods_from_condition(sub_cond))
return methods
def _generate_edges(
@@ -330,7 +301,6 @@ def _generate_edges(
"""
edges: list[EdgeInfo] = []
# Generate edges from listeners (listen edges)
for listener_name, condition_data in listeners.items():
trigger_methods: list[str] = []
@@ -340,7 +310,6 @@ def _generate_edges(
elif isinstance(condition_data, dict):
trigger_methods = _extract_all_methods_from_condition(condition_data)
# Create edges from each trigger to the listener
edges.extend(
EdgeInfo(
from_method=trigger,
@@ -352,10 +321,8 @@ def _generate_edges(
if trigger in all_methods
)
# Generate edges from routers (route edges)
for router_name, paths in router_paths.items():
for path in paths:
# Find listeners that listen to this path
for listener_name, condition_data in listeners.items():
path_triggers: list[str] = []
@@ -393,11 +360,9 @@ def _extract_state_schema(flow_class: type) -> StateSchemaInfo | None:
"""
state_type: type | None = None
# Check for _initial_state_t set by __class_getitem__
if hasattr(flow_class, "_initial_state_t"):
state_type = flow_class._initial_state_t
# Check initial_state class attribute
if state_type is None and hasattr(flow_class, "initial_state"):
initial_state = flow_class.initial_state
if isinstance(initial_state, type) and issubclass(initial_state, BaseModel):
@@ -405,7 +370,6 @@ def _extract_state_schema(flow_class: type) -> StateSchemaInfo | None:
elif isinstance(initial_state, BaseModel):
state_type = type(initial_state)
# Check __orig_bases__ for generic parameters
if state_type is None and hasattr(flow_class, "__orig_bases__"):
for base in flow_class.__orig_bases__:
origin = get_origin(base)
@@ -420,7 +384,6 @@ def _extract_state_schema(flow_class: type) -> StateSchemaInfo | None:
if state_type is None or not issubclass(state_type, BaseModel):
return None
# Extract fields from the Pydantic model
fields: list[StateFieldInfo] = []
try:
model_fields = state_type.model_fields
@@ -428,7 +391,6 @@ def _extract_state_schema(flow_class: type) -> StateSchemaInfo | None:
field_type_str = "Any"
if field_info.annotation is not None:
field_type_str = str(field_info.annotation)
# Clean up the type string
field_type_str = field_type_str.replace("typing.", "")
field_type_str = field_type_str.replace("<class '", "").replace(
"'>", ""
@@ -441,7 +403,6 @@ def _extract_state_schema(flow_class: type) -> StateSchemaInfo | None:
and not callable(field_info.default)
):
try:
# Try to serialize the default value
default_value = field_info.default
except Exception:
default_value = str(field_info.default)
@@ -474,7 +435,6 @@ def _detect_flow_inputs(flow_class: type) -> list[str]:
"""
inputs: list[str] = []
# Check for inputs in __init__ signature beyond standard Flow params
try:
init_method = flow_class.__init__ # type: ignore[misc]
init_sig = inspect.signature(init_method)
@@ -533,7 +493,6 @@ def flow_structure(flow_class: type) -> FlowStructureInfo:
f"Got {type(flow_class).__name__}"
)
# Get class-level metadata set by FlowMeta
start_methods: list[str] = getattr(flow_class, "_start_methods", [])
listeners: dict[str, Any] = getattr(flow_class, "_listeners", {})
routers: set[str] = getattr(flow_class, "_routers", set())
@@ -541,7 +500,6 @@ def flow_structure(flow_class: type) -> FlowStructureInfo:
flow_class, "_router_paths", {}
)
# Collect all flow methods
methods: list[MethodInfo] = []
all_method_names: set[str] = set()
@@ -554,7 +512,6 @@ def flow_structure(flow_class: type) -> FlowStructureInfo:
except AttributeError:
continue
# Check if it's a flow method
is_flow_method = (
isinstance(attr, (FlowMethod, StartMethod, ListenMethod, RouterMethod))
or hasattr(attr, "__is_flow_method__")
@@ -568,21 +525,16 @@ def flow_structure(flow_class: type) -> FlowStructureInfo:
all_method_names.add(attr_name)
# Get method type
method_type = _get_method_type(attr_name, attr, start_methods, routers)
# Get trigger methods and condition type
trigger_methods, condition_type = _extract_trigger_methods(attr)
# Get router paths if applicable
router_paths_list: list[str] = []
if method_type in ("router", "start_router"):
router_paths_list = _extract_router_paths(attr, router_paths_registry)
# Check for human feedback
has_hf = _has_human_feedback(attr)
# Check for crew reference
has_crew = _detect_crew_reference(attr)
method_info = MethodInfo(
@@ -596,16 +548,10 @@ def flow_structure(flow_class: type) -> FlowStructureInfo:
)
methods.append(method_info)
# Generate edges
edges = _generate_edges(listeners, routers, router_paths_registry, all_method_names)
# Extract state schema
state_schema = _extract_state_schema(flow_class)
# Detect inputs
inputs = _detect_flow_inputs(flow_class)
# Get flow description from docstring
description: str | None = None
if flow_class.__doc__:
description = flow_class.__doc__.strip()

View File

@@ -46,6 +46,8 @@ class FlowMethod(Generic[P, R]):
both bound (instance) and unbound (class) method states.
"""
__is_flow_method__: bool = True
def __init__(self, meth: Callable[P, R], instance: Any = None) -> None:
"""Initialize the flow method wrapper.
@@ -53,9 +55,9 @@ class FlowMethod(Generic[P, R]):
meth: The method to wrap.
instance: The instance to bind to (None for unbound).
"""
functools.update_wrapper(self, meth)
self._meth = meth
self._instance = instance
functools.update_wrapper(self, meth, updated=[])
self.__name__: FlowMethodName = FlowMethodName(self.__name__)
self.__signature__ = inspect.signature(meth)
@@ -70,16 +72,6 @@ class FlowMethod(Generic[P, R]):
self._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore[attr-defined]
# Preserve flow-related attributes from wrapped method (e.g., from @human_feedback)
for attr in [
"__is_router__",
"__router_paths__",
"__human_feedback_config__",
"_hf_llm", # Live LLM object for HITL resume
]:
if hasattr(meth, attr):
setattr(self, attr, getattr(meth, attr))
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
"""Call the wrapped method.
@@ -102,6 +94,19 @@ class FlowMethod(Generic[P, R]):
"""
return self._meth
def __getattr__(self, name: str) -> Any:
"""Delegate missing attributes to the wrapped method.
Lets flow flags like ``__is_start_method__`` defined on the wrapped
method's class flow through transparently when this wrapper itself
wraps another FlowMethod.
"""
try:
meth = object.__getattribute__(self, "_meth")
except AttributeError:
raise AttributeError(name) from None
return getattr(meth, name)
def __get__(self, instance: Any, owner: type | None = None) -> Self:
"""Support the descriptor protocol for method binding.

View File

@@ -118,13 +118,11 @@ def _deserialize_llm_from_context(
if isinstance(llm_data, str):
return LLM(model=llm_data)
if isinstance(llm_data, dict):
data = dict(llm_data)
model = data.pop("model", None)
if not model:
return None
return LLM(model=model, **data)
return None
data = dict(llm_data)
model = data.pop("model", None)
if not model:
return None
return LLM(model=model, **data)
@dataclass
@@ -706,6 +704,6 @@ def human_feedback(
# instead of creating a bare LLM from just the model string.
wrapper._hf_llm = llm
return wrapper # type: ignore[no-any-return]
return HumanFeedbackMethod(wrapper) # type: ignore[return-value]
return decorator

View File

@@ -24,15 +24,16 @@ Example:
from __future__ import annotations
import asyncio
from collections.abc import Callable
from collections.abc import Awaitable, Callable
import functools
import inspect
import logging
from typing import TYPE_CHECKING, Any, Final, TypeVar, cast
from typing import TYPE_CHECKING, Any, Final, ParamSpec, TypeVar, cast
from crewai_core.printer import PRINTER
from pydantic import BaseModel
from crewai.flow.flow_wrappers import FlowMethod
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
@@ -42,6 +43,8 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
P = ParamSpec("P")
R = TypeVar("R")
T = TypeVar("T")
# Constants for log messages
@@ -134,9 +137,71 @@ class PersistenceDecorator:
raise ValueError(error_msg) from e
class PersistedFlowMethod(FlowMethod[P, R]):
"""FlowMethod variant that persists state after each invocation.
Wrapping the original method directly (rather than copying its attributes
onto a closure) lets ``FlowMethod.__getattr__`` delegate flow flags like
``__is_start_method__`` to the wrapped object transparently.
For async wrapped methods, ``R`` is the ``Coroutine`` returned by calling
them, so ``__call__``'s declared return type stays accurate in both cases.
"""
def __init__(
self,
meth: Callable[P, R],
instance: Any = None,
*,
persistence: FlowPersistence | None = None,
verbose: bool = False,
) -> None:
super().__init__(meth, instance)
self._persistence = persistence
self._verbose = verbose
def _resolve_flow_instance(self, args: tuple[Any, ...]) -> Any:
return (
self._instance
if self._instance is not None
else (args[0] if args else None)
)
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
if inspect.iscoroutinefunction(self._meth):
return cast(R, self._call_async(*args, **kwargs))
flow_instance = self._resolve_flow_instance(args)
result = super().__call__(*args, **kwargs)
PersistenceDecorator.persist_state(
flow_instance,
self.__name__,
cast(FlowPersistence, self._persistence),
self._verbose,
)
return result
async def _call_async(self, *args: Any, **kwargs: Any) -> Any:
flow_instance = self._resolve_flow_instance(args)
meth = cast(Callable[..., Awaitable[Any]], self._meth)
if self._instance is not None:
result = await meth(self._instance, *args, **kwargs)
else:
result = await meth(*args, **kwargs)
PersistenceDecorator.persist_state(
flow_instance,
self.__name__,
cast(FlowPersistence, self._persistence),
self._verbose,
)
return result
def persist(
persistence: FlowPersistence | None = None, verbose: bool = False
) -> Callable[[type | Callable[..., T]], type | Callable[..., T]]:
) -> Callable[
[type[Flow[Any]] | Callable[..., T]],
type[Flow[Any]] | Callable[..., T],
]:
"""Decorator to persist flow state.
This decorator can be applied at either the class level or method level.
@@ -164,150 +229,41 @@ def persist(
pass
"""
def decorator(target: type | Callable[..., T]) -> type | Callable[..., T]:
def decorator(
target: type[Flow[Any]] | Callable[..., T],
) -> type[Flow[Any]] | Callable[..., T]:
"""Decorator that handles both class and method decoration."""
actual_persistence = persistence or SQLiteFlowPersistence()
if isinstance(target, type):
# Class decoration
original_init = target.__init__ # type: ignore[misc]
@functools.wraps(original_init)
def new_init(self: Any, *args: Any, **kwargs: Any) -> None:
def new_init(self: Flow[Any], *args: Any, **kwargs: Any) -> None:
if "persistence" not in kwargs:
kwargs["persistence"] = actual_persistence
original_init(self, *args, **kwargs)
target.__init__ = new_init # type: ignore[misc]
# Store original methods to preserve their decorators
original_methods = {
name: method
for name, method in target.__dict__.items()
if callable(method)
and (
hasattr(method, "__is_start_method__")
or hasattr(method, "__trigger_methods__")
or hasattr(method, "__condition_type__")
or hasattr(method, "__is_flow_method__")
or hasattr(method, "__is_router__")
for name, method in list(target.__dict__.items()):
if not isinstance(method, FlowMethod):
continue
setattr(
target,
name,
PersistedFlowMethod(
method, persistence=actual_persistence, verbose=verbose
),
)
}
# Create wrapped versions of the methods that include persistence
for name, method in original_methods.items():
if asyncio.iscoroutinefunction(method):
# Create a closure to capture the current name and method
def create_async_wrapper(
method_name: str, original_method: Callable[..., Any]
) -> Callable[..., Any]:
@functools.wraps(original_method)
async def method_wrapper(
self: Any, *args: Any, **kwargs: Any
) -> Any:
result = await original_method(self, *args, **kwargs)
PersistenceDecorator.persist_state(
self, method_name, actual_persistence, verbose
)
return result
return method_wrapper
wrapped = create_async_wrapper(name, method)
# Preserve all original decorators and attributes
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__is_router__",
]:
if hasattr(method, attr):
setattr(wrapped, attr, getattr(method, attr))
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
# Update the class with the wrapped method
setattr(target, name, wrapped)
else:
# Create a closure to capture the current name and method
def create_sync_wrapper(
method_name: str, original_method: Callable[..., Any]
) -> Callable[..., Any]:
@functools.wraps(original_method)
def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
result = original_method(self, *args, **kwargs)
PersistenceDecorator.persist_state(
self, method_name, actual_persistence, verbose
)
return result
return method_wrapper
wrapped = create_sync_wrapper(name, method)
# Preserve all original decorators and attributes
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__is_router__",
]:
if hasattr(method, attr):
setattr(wrapped, attr, getattr(method, attr))
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
# Update the class with the wrapped method
setattr(target, name, wrapped)
return target
# Method decoration
method = target
method.__is_flow_method__ = True # type: ignore[attr-defined]
if asyncio.iscoroutinefunction(method):
@functools.wraps(method)
async def method_async_wrapper(
flow_instance: Any, *args: Any, **kwargs: Any
) -> T:
method_coro = method(flow_instance, *args, **kwargs)
if asyncio.iscoroutine(method_coro):
result = await method_coro
else:
result = method_coro
PersistenceDecorator.persist_state(
flow_instance, method.__name__, actual_persistence, verbose
)
return cast(T, result)
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__is_router__",
]:
if hasattr(method, attr):
setattr(method_async_wrapper, attr, getattr(method, attr))
method_async_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
return cast(Callable[..., T], method_async_wrapper)
@functools.wraps(method)
def method_sync_wrapper(flow_instance: Any, *args: Any, **kwargs: Any) -> T:
result = method(flow_instance, *args, **kwargs)
PersistenceDecorator.persist_state(
flow_instance, method.__name__, actual_persistence, verbose
)
return result
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__is_router__",
]:
if hasattr(method, attr):
setattr(method_sync_wrapper, attr, getattr(method, attr))
method_sync_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
return cast(Callable[..., T], method_sync_wrapper)
return cast(
Callable[..., T],
PersistedFlowMethod(
target, persistence=actual_persistence, verbose=verbose
),
)
return decorator

View File

@@ -26,7 +26,7 @@ if TYPE_CHECKING:
def _extract_direct_or_triggers(
condition: str | dict[str, Any] | list[Any] | FlowCondition,
condition: str | FlowCondition,
) -> list[str]:
"""Extract direct OR-level trigger strings from a condition.
@@ -39,36 +39,22 @@ def _extract_direct_or_triggers(
- and_("a", "b") -> [] (neither are direct triggers, both required)
- or_(and_("a", "b"), "c") -> ["c"] (only "c" is a direct trigger)
Args:
condition: Can be a string, dict, or list.
Returns:
List of direct OR-level trigger strings.
"""
if isinstance(condition, str):
return [condition]
if isinstance(condition, dict):
cond_type = condition.get("type", OR_CONDITION)
conditions_list = condition.get("conditions", [])
if cond_type == OR_CONDITION:
strings = []
for sub_cond in conditions_list:
strings.extend(_extract_direct_or_triggers(sub_cond))
return strings
cond_type = condition.get("type", OR_CONDITION)
if cond_type != OR_CONDITION:
return []
if isinstance(condition, list):
strings = []
for item in condition:
strings.extend(_extract_direct_or_triggers(item))
return strings
if callable(condition) and hasattr(condition, "__name__"):
return [condition.__name__]
return []
strings: list[str] = []
for sub_cond in condition.get("conditions", []):
strings.extend(_extract_direct_or_triggers(sub_cond))
return strings
def _extract_all_trigger_names(
condition: str | dict[str, Any] | list[Any] | FlowCondition,
condition: str | FlowCondition,
) -> list[str]:
"""Extract ALL trigger names from a condition for display purposes.
@@ -81,50 +67,26 @@ def _extract_all_trigger_names(
- and_("a", "b") -> ["a", "b"]
- or_(and_("a", method_6), method_4) -> ["a", "method_6", "method_4"]
Args:
condition: Can be a string, dict, or list.
Returns:
List of all trigger names found in the condition.
"""
if isinstance(condition, str):
return [condition]
if isinstance(condition, dict):
conditions_list = condition.get("conditions", [])
strings = []
for sub_cond in conditions_list:
strings.extend(_extract_all_trigger_names(sub_cond))
return strings
if isinstance(condition, list):
strings = []
for item in condition:
strings.extend(_extract_all_trigger_names(item))
return strings
if callable(condition) and hasattr(condition, "__name__"):
return [condition.__name__]
return []
strings: list[str] = []
for sub_cond in condition.get("conditions", []):
strings.extend(_extract_all_trigger_names(sub_cond))
return strings
def _create_edges_from_condition(
condition: str | dict[str, Any] | list[Any] | FlowCondition,
condition: str | FlowCondition,
target: str,
nodes: dict[str, NodeMetadata],
) -> list[StructureEdge]:
"""Create edges from a condition tree, preserving AND/OR semantics.
This function recursively processes the condition tree and creates edges
with the appropriate condition_type for each trigger.
For AND conditions, all triggers get edges with condition_type="AND".
For OR conditions, triggers get edges with condition_type="OR".
Args:
condition: The condition tree (string, dict, or list).
target: The target node name.
nodes: Dictionary of all nodes for validation.
Returns:
List of StructureEdge objects representing the condition.
"""
edges: list[StructureEdge] = []
@@ -138,39 +100,24 @@ def _create_edges_from_condition(
is_router_path=False,
)
)
elif callable(condition) and hasattr(condition, "__name__"):
method_name = condition.__name__
if method_name in nodes:
edges.append(
StructureEdge(
source=method_name,
target=target,
condition_type=OR_CONDITION,
is_router_path=False,
)
)
elif isinstance(condition, dict):
cond_type = condition.get("type", OR_CONDITION)
conditions_list = condition.get("conditions", [])
return edges
if cond_type == AND_CONDITION:
triggers = _extract_all_trigger_names(condition)
edges.extend(
StructureEdge(
source=trigger,
target=target,
condition_type=AND_CONDITION,
is_router_path=False,
)
for trigger in triggers
if trigger in nodes
cond_type = condition.get("type", OR_CONDITION)
if cond_type == AND_CONDITION:
triggers = _extract_all_trigger_names(condition)
edges.extend(
StructureEdge(
source=trigger,
target=target,
condition_type=AND_CONDITION,
is_router_path=False,
)
else:
for sub_cond in conditions_list:
edges.extend(_create_edges_from_condition(sub_cond, target, nodes))
elif isinstance(condition, list):
for item in condition:
edges.extend(_create_edges_from_condition(item, target, nodes))
for trigger in triggers
if trigger in nodes
)
else:
for sub_cond in condition.get("conditions", []):
edges.extend(_create_edges_from_condition(sub_cond, target, nodes))
return edges

View File

@@ -92,8 +92,6 @@ class CrewDoclingSource(BaseKnowledgeSource):
raise e
def add(self) -> None:
if self.content is None:
return
for doc in self.content:
new_chunks_iterable = self._chunk_doc(doc)
self.chunks.extend(list(new_chunks_iterable))
@@ -101,8 +99,6 @@ class CrewDoclingSource(BaseKnowledgeSource):
async def aadd(self) -> None:
"""Add docling content asynchronously."""
if self.content is None:
return
for doc in self.content:
new_chunks_iterable = self._chunk_doc(doc)
self.chunks.extend(list(new_chunks_iterable))

View File

@@ -155,11 +155,8 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
# Updated to account for .xlsx workbooks with multiple tabs/sheets
content_str = ""
for value in self.content.values():
if isinstance(value, dict):
for sheet_value in value.values():
content_str += str(sheet_value) + "\n"
else:
content_str += str(value) + "\n"
for sheet_value in value.values():
content_str += str(sheet_value) + "\n"
new_chunks = self._chunk_text(content_str)
self.chunks.extend(new_chunks)
@@ -169,11 +166,8 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
"""Add Excel file content asynchronously."""
content_str = ""
for value in self.content.values():
if isinstance(value, dict):
for sheet_value in value.values():
content_str += str(sheet_value) + "\n"
else:
content_str += str(value) + "\n"
for sheet_value in value.values():
content_str += str(sheet_value) + "\n"
new_chunks = self._chunk_text(content_str)
self.chunks.extend(new_chunks)

View File

@@ -484,10 +484,7 @@ class AnthropicCompletion(BaseLLM):
params["tool_choice"] = {"type": "tool", "name": tool_name}
if self.thinking:
if isinstance(self.thinking, AnthropicThinkingConfig):
params["thinking"] = self.thinking.model_dump()
else:
params["thinking"] = self.thinking
params["thinking"] = self.thinking.model_dump()
return params

View File

@@ -582,19 +582,16 @@ class GeminiCompletion(BaseLLM):
parts: list[types.Part] = []
if isinstance(content, list):
for item in content:
if isinstance(item, dict):
if "text" in item:
parts.append(types.Part.from_text(text=str(item["text"])))
elif "inlineData" in item:
inline = item["inlineData"]
parts.append(
types.Part.from_bytes(
data=base64.b64decode(inline["data"]),
mime_type=inline["mimeType"],
)
if "text" in item:
parts.append(types.Part.from_text(text=str(item["text"])))
elif "inlineData" in item:
inline = item["inlineData"]
parts.append(
types.Part.from_bytes(
data=base64.b64decode(inline["data"]),
mime_type=inline["mimeType"],
)
else:
parts.append(types.Part.from_text(text=str(item)))
)
else:
parts.append(types.Part.from_text(text=str(content) if content else ""))

View File

@@ -798,10 +798,7 @@ class OpenAICompletion(BaseLLM):
}
if parameters:
if isinstance(parameters, dict):
responses_tool["parameters"] = parameters
else:
responses_tool["parameters"] = dict(parameters)
responses_tool["parameters"] = parameters
responses_tools.append(responses_tool)

View File

@@ -383,22 +383,19 @@ class MCPToolResolver:
if mcp_config.tool_filter:
filtered_tools = []
for tool in tools_list:
if callable(mcp_config.tool_filter):
try:
from crewai.mcp.filters import ToolFilterContext
try:
from crewai.mcp.filters import ToolFilterContext
context = ToolFilterContext(
agent=self._agent,
server_name=server_name,
run_context=None,
)
if mcp_config.tool_filter(context, tool): # type: ignore[call-arg, arg-type]
filtered_tools.append(tool)
except (TypeError, AttributeError):
if mcp_config.tool_filter(tool): # type: ignore[call-arg, arg-type]
filtered_tools.append(tool)
else:
filtered_tools.append(tool)
context = ToolFilterContext(
agent=self._agent,
server_name=server_name,
run_context=None,
)
if mcp_config.tool_filter(context, tool): # type: ignore[call-arg, arg-type]
filtered_tools.append(tool)
except (TypeError, AttributeError): # noqa: PERF203
if mcp_config.tool_filter(tool): # type: ignore[call-arg, arg-type]
filtered_tools.append(tool)
tools_list = filtered_tools
if not tools_list:

View File

@@ -194,9 +194,6 @@ class GoogleGenAIVertexEmbeddingFunction(EmbeddingFunction[Documents]):
Returns:
List of embedding vectors.
"""
if isinstance(input, str):
input = [input]
if self._use_legacy:
return self._call_legacy(input)
return self._call_genai(input)

View File

@@ -54,9 +54,6 @@ class WatsonXEmbeddingFunction(EmbeddingFunction[Documents]):
"Install it with: uv add ibm-watsonx-ai"
) from e
if isinstance(input, str):
input = [input]
embeddings_config: dict[str, Any] = {
"model_id": self._config["model_id"],
}

View File

@@ -47,9 +47,6 @@ class VoyageAIEmbeddingFunction(EmbeddingFunction[Documents]):
List of embedding vectors.
"""
if isinstance(input, str):
input = [input]
result = self._client.embed(
texts=input,
model=self._config.get("model", "voyage-2"),

View File

@@ -47,7 +47,7 @@ def _ensure_handlers_registered() -> None:
return
with _register_lock:
if _handlers_registered:
return
return # type: ignore[unreachable]
_register_all_handlers(crewai_event_bus)
_handlers_registered = True

View File

@@ -40,7 +40,6 @@ from crewai.agents.agent_builder.base_agent import BaseAgent, _resolve_agent
from crewai.context import reset_current_task_id, set_current_task_id
from crewai.core.providers.content_processor import process_content
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_context import resume_task_scope
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
@@ -662,10 +661,7 @@ class Task(BaseModel):
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
executor = agent.agent_executor
if not (
executor and executor._resuming and resume_task_scope(str(self.id))
):
if not (agent.agent_executor and agent.agent_executor._resuming):
crewai_event_bus.emit(
self, TaskStartedEvent(context=context, task=self)
)
@@ -787,10 +783,7 @@ class Task(BaseModel):
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
executor = agent.agent_executor
if not (
executor and executor._resuming and resume_task_scope(str(self.id))
):
if not (agent.agent_executor and agent.agent_executor._resuming):
crewai_event_bus.emit(
self, TaskStartedEvent(context=context, task=self)
)
@@ -1166,12 +1159,10 @@ Follow these guidelines:
return model_output, None
if isinstance(model_output, dict):
return None, model_output
if isinstance(model_output, str):
try:
return None, json.loads(model_output)
except json.JSONDecodeError:
return None, None
return None, None
try:
return None, json.loads(model_output)
except json.JSONDecodeError:
return None, None
def _get_output_format(self) -> OutputFormat:
if self.output_json:

View File

@@ -97,7 +97,7 @@ class Telemetry:
provider: OpenTelemetry tracer provider.
"""
_instance = None
_instance: Self | None = None
_lock = threading.Lock()
def __new__(cls) -> Self:
@@ -937,9 +937,6 @@ class Telemetry:
value: The attribute value.
"""
if span is None:
return
def _operation() -> None:
return span.set_attribute(key, value)

View File

@@ -122,7 +122,8 @@ class BaseAgentTool(BaseTool):
logger.debug(
f"Created task for agent '{self.sanitize_agent_name(selected_agent.role)}': {task}"
)
return selected_agent.execute_task(task_with_assigned_agent, context)
result = selected_agent.execute_task(task_with_assigned_agent, context)
return result if isinstance(result, str) else result.model_dump_json()
except Exception as e:
# Handle task creation or execution errors
return I18N_DEFAULT.errors("agent_tool_execution_error").format(

View File

@@ -421,28 +421,10 @@ class BaseTool(BaseModel, ABC):
)
def _set_args_schema(self) -> None:
if self.args_schema is None:
run_sig = signature(self._run)
fields: dict[str, Any] = {}
"""No-op retained for backward compatibility.
for param_name, param in run_sig.parameters.items():
if param_name in ("self", "return"):
continue
if param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD):
continue
annotation = (
param.annotation if param.annotation != param.empty else Any
)
if param.default is param.empty:
fields[param_name] = (annotation, ...)
else:
fields[param_name] = (annotation, param.default)
self.args_schema = create_model(
f"{self.__class__.__name__}Schema", **fields
)
Schema generation is performed by the ``args_schema`` field validator.
"""
def _generate_description(self) -> None:
"""Generate the tool description with a JSON schema for arguments."""

View File

@@ -274,10 +274,7 @@ class ToolUsage:
if self.tools_handler and self.tools_handler.cache:
input_str = ""
if calling.arguments:
if isinstance(calling.arguments, dict):
input_str = json.dumps(calling.arguments)
else:
input_str = str(calling.arguments)
input_str = json.dumps(calling.arguments)
result = self.tools_handler.cache.read(
tool=sanitize_tool_name(calling.tool_name), input=input_str
@@ -303,7 +300,7 @@ class ToolUsage:
result = self._format_result(result=result)
# Don't return early - fall through to finally block
elif result is None:
try:
try: # type: ignore[unreachable]
if sanitize_tool_name(calling.tool_name) in [
sanitize_tool_name("Delegate work to coworker"),
sanitize_tool_name("Ask question to coworker"),
@@ -507,10 +504,7 @@ class ToolUsage:
if self.tools_handler and self.tools_handler.cache:
input_str = ""
if calling.arguments:
if isinstance(calling.arguments, dict):
input_str = json.dumps(calling.arguments)
else:
input_str = str(calling.arguments)
input_str = json.dumps(calling.arguments)
result = self.tools_handler.cache.read(
tool=sanitize_tool_name(calling.tool_name), input=input_str
@@ -536,7 +530,7 @@ class ToolUsage:
result = self._format_result(result=result)
# Don't return early - fall through to finally block
elif result is None:
try:
try: # type: ignore[unreachable]
if sanitize_tool_name(calling.tool_name) in [
sanitize_tool_name("Delegate work to coworker"),
sanitize_tool_name("Ask question to coworker"),
@@ -826,11 +820,6 @@ class ToolUsage:
raise
return ToolUsageError(f"{I18N_DEFAULT.errors('tool_arguments_error')}")
if not isinstance(arguments, dict):
if raise_error:
raise
return ToolUsageError(f"{I18N_DEFAULT.errors('tool_arguments_error')}")
return ToolCalling(
tool_name=sanitize_tool_name(tool.name),
arguments=arguments,

View File

@@ -1679,7 +1679,7 @@ def _setup_before_llm_call_hooks(
)
if not isinstance(executor_context.messages, list):
if verbose:
if verbose: # type: ignore[unreachable]
printer.print(
content=(
"Warning: before_llm_call hook replaced messages with non-list. "
@@ -1742,7 +1742,7 @@ def _setup_after_llm_call_hooks(
)
if not isinstance(executor_context.messages, list):
if verbose:
if verbose: # type: ignore[unreachable]
printer.print(
content=(
"Warning: after_llm_call hook replaced messages with non-list. "

View File

@@ -161,15 +161,10 @@ def _llm_via_environment_or_fallback() -> LLM | None:
# Map environment variable names to recognized parameters
param_key = _normalize_key_name(key_name.lower())
llm_params[param_key] = env_value
elif isinstance(env_var, dict):
if env_var.get("default", False):
for key, value in env_var.items():
if key not in ["prompt", "key_name", "default"]:
llm_params[key.lower()] = value
else:
logger.debug(
f"Expected env_var to be a dictionary, but got {type(env_var)}"
)
elif env_var.get("default", False):
for key, value in env_var.items():
if key not in ["prompt", "key_name", "default"]:
llm_params[key.lower()] = value
llm_params = {k: v for k, v in llm_params.items() if v is not None}

View File

@@ -141,12 +141,12 @@ def resolve_refs(schema: dict[str, Any]) -> dict[str, Any]:
def add_key_in_dict_recursively(
d: dict[str, Any],
d: Any,
key: str,
value: Any,
criteria: Callable[[dict[str, Any]], bool],
_seen: set[int] | None = None,
) -> dict[str, Any]:
) -> Any:
"""Recursively adds a key/value pair to all nested dicts matching `criteria`.
Args:
@@ -338,9 +338,6 @@ def add_const_to_oneof_variants(schema: dict[str, Any]) -> dict[str, Any]:
def _process_oneof(node: dict[str, Any]) -> dict[str, Any]:
"""Process a single node that might contain a oneOf with discriminator."""
if not isinstance(node, dict):
return node
if "oneOf" in node and "discriminator" in node:
discriminator = node["discriminator"]
property_name = discriminator.get("propertyName")
@@ -606,8 +603,6 @@ def sanitize_tool_params_for_openai_strict(
params: dict[str, Any],
) -> dict[str, Any]:
"""Sanitize a JSON schema for OpenAI strict function calling."""
if not isinstance(params, dict):
return params
return cast(
dict[str, Any], strip_unsupported_formats(_common_strict_pipeline(params))
)
@@ -617,8 +612,6 @@ def sanitize_tool_params_for_anthropic_strict(
params: dict[str, Any],
) -> dict[str, Any]:
"""Sanitize a JSON schema for Anthropic strict tool use."""
if not isinstance(params, dict):
return params
sanitized = lift_top_level_anyof(_common_strict_pipeline(params))
sanitized = _strip_keys_recursive(sanitized, _CLAUDE_STRICT_UNSUPPORTED)
return cast(dict[str, Any], strip_unsupported_formats(sanitized))

View File

@@ -11,7 +11,6 @@ from crewai.events.event_context import (
MismatchBehavior,
StackDepthExceededError,
_event_context_config,
_event_id_stack,
EventContextConfig,
get_current_parent_id,
get_enclosing_parent_id,
@@ -22,7 +21,6 @@ from crewai.events.event_context import (
pop_event_scope,
push_event_scope,
reset_last_event_id,
resume_task_scope,
set_last_event_id,
set_triggering_event_id,
triggered_by_scope,
@@ -182,91 +180,6 @@ class TestTriggeredByScope:
assert get_triggering_event_id() is None
class TestResumeTaskScope:
"""Tests for the checkpoint-resume scope helper."""
@pytest.fixture(autouse=True)
def _reset_stack(self) -> None:
_event_id_stack.set(())
def _bind_runtime_state(self, *event_dicts: dict[str, object]):
from crewai.events import crewai_event_bus
from crewai.events.types.task_events import TaskStartedEvent
from crewai.state.event_record import EventRecord
from crewai.state.runtime import RuntimeState
record = EventRecord()
for spec in event_dicts:
ev = TaskStartedEvent(context=None, task=None)
ev.task_id = spec["task_id"] # type: ignore[assignment]
ev.event_id = spec["event_id"] # type: ignore[assignment]
ev.emission_sequence = spec["emission_sequence"] # type: ignore[assignment]
record.add(ev)
state = RuntimeState(root=[])
state._event_record = record
previous = crewai_event_bus._runtime_state
crewai_event_bus._runtime_state = state
return crewai_event_bus, previous
def test_returns_false_when_no_runtime_state(self) -> None:
from crewai.events import crewai_event_bus
previous = crewai_event_bus._runtime_state
crewai_event_bus._runtime_state = None
try:
assert resume_task_scope("any-task") is False
assert _event_id_stack.get() == ()
finally:
crewai_event_bus._runtime_state = previous
def test_returns_false_when_no_matching_event(self) -> None:
bus, previous = self._bind_runtime_state(
{"task_id": "other", "event_id": "e1", "emission_sequence": 1},
)
try:
assert resume_task_scope("missing") is False
assert _event_id_stack.get() == ()
finally:
bus._runtime_state = previous
def test_pushes_latest_event_for_task(self) -> None:
bus, previous = self._bind_runtime_state(
{"task_id": "t1", "event_id": "e1", "emission_sequence": 1},
{"task_id": "t1", "event_id": "e2", "emission_sequence": 5},
{"task_id": "t1", "event_id": "e3", "emission_sequence": 3},
{"task_id": "t2", "event_id": "x1", "emission_sequence": 9},
)
try:
assert resume_task_scope("t1") is True
assert _event_id_stack.get() == (("e2", "task_started"),)
finally:
bus._runtime_state = previous
def test_pairs_cleanly_with_task_completed(self) -> None:
"""The pushed scope must be popped by a matching task_completed."""
from crewai.events import crewai_event_bus
from crewai.events.types.task_events import TaskCompletedEvent
from crewai.tasks.task_output import TaskOutput
push_event_scope("kickoff-1", "crew_kickoff_started")
bus, previous = self._bind_runtime_state(
{"task_id": "t1", "event_id": "started-1", "emission_sequence": 1},
)
try:
assert resume_task_scope("t1") is True
output = TaskOutput(description="d", raw="r", agent="a")
completed = TaskCompletedEvent(output=output, task=None)
completed.task_id = "t1"
crewai_event_bus.emit(None, completed)
crewai_event_bus.flush()
assert _event_id_stack.get() == (("kickoff-1", "crew_kickoff_started"),)
assert completed.started_event_id == "started-1"
finally:
bus._runtime_state = previous
_event_id_stack.set(())
def test_agent_scope_preserved_after_tool_error_event() -> None:
from crewai.events import crewai_event_bus
from crewai.events.types.tool_usage_events import (

View File

@@ -124,8 +124,11 @@ disallow_any_unimported = true
no_implicit_optional = true
check_untyped_defs = true
warn_return_any = true
warn_unreachable = true
show_error_codes = true
warn_unused_ignores = true
local_partial_types = true
extra_checks = true
python_version = "3.12"
exclude = "(?x)(^lib/crewai/src/crewai/cli/templates/|^lib/cli/src/crewai_cli/templates/|^lib/crewai/tests/|^lib/crewai-tools/tests/|^lib/crewai-files/tests/|^lib/cli/tests/|^lib/devtools/tests/)"
plugins = ["pydantic.mypy"]
@@ -189,7 +192,6 @@ exclude-newer = "3 days"
# authlib <1.6.11 has GHSA-jj8c-mmj3-mmgv (CSRF bypass in cache-based state storage).
# pip <26.1.1 has GHSA-58qw-9mgm-455v (archive handling); OSV considers 26.1.1 unaffected.
# paramiko <5.0.0 has GHSA-r374-rxx8-8654 (SHA-1 in rsakey.py); OSV considers 5.0.0 unaffected. Transitive via composio-core.
# starlette <1.0.1 has PYSEC-2026-161 (missing Host header validation poisons request.url.path, bypassing path-based auth). Transitive via fastapi.
# litellm 1.83.8+ hard-pins openai==2.24.0, missing openai.types.responses used by crewai;
# override to >=2.30.0 (the version litellm 1.83.7 used) until upstream relaxes the pin.
override-dependencies = [
@@ -210,7 +212,6 @@ override-dependencies = [
"authlib>=1.6.11",
"pip>=26.1.1",
"paramiko>=5.0.0",
"starlette>=1.0.1",
]
[tool.uv.workspace]

12
uv.lock generated
View File

@@ -13,12 +13,9 @@ resolution-markers = [
]
[options]
exclude-newer = "2026-05-19T15:27:50.647689Z"
exclude-newer = "2026-05-17T14:20:01.778505Z"
exclude-newer-span = "P3D"
[options.exclude-newer-package]
starlette = "2026-05-22T16:00:00Z"
[manifest]
members = [
"crewai",
@@ -43,7 +40,6 @@ overrides = [
{ name = "pypdf", specifier = ">=6.10.2,<7" },
{ name = "python-multipart", specifier = ">=0.0.27,<1" },
{ name = "rich", specifier = ">=13.7.1" },
{ name = "starlette", specifier = ">=1.0.1" },
{ name = "transformers", marker = "python_full_version >= '3.10'", specifier = ">=5.4.0" },
{ name = "urllib3", specifier = ">=2.7.0" },
{ name = "uv", specifier = ">=0.11.6,<1" },
@@ -8532,15 +8528,15 @@ wheels = [
[[package]]
name = "starlette"
version = "1.0.1"
version = "1.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/08/a3/84e821cc54b4ab50ae6dbc6ac3800a651b65ec35f045cc73785380654057/starlette-1.0.1.tar.gz", hash = "sha256:512399c5f1de7fac99c88572212ded9ddeddef2fb32afa82d724000e88b38f4f", size = 2659596, upload-time = "2026-05-21T21:58:58.433Z" }
sdist = { url = "https://files.pythonhosted.org/packages/81/69/17425771797c36cded50b7fe44e850315d039f28b15901ab44839e70b593/starlette-1.0.0.tar.gz", hash = "sha256:6a4beaf1f81bb472fd19ea9b918b50dc3a77a6f2e190a12954b25e6ed5eea149", size = 2655289, upload-time = "2026-03-22T18:29:46.779Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ec/e1/b2df4bc09a1e51ff664c1e17018a4274b42e5e9352e4a478ea540512dc88/starlette-1.0.1-py3-none-any.whl", hash = "sha256:7c0e69b2ee1c848bd54669d908500117a3ee13de603a21427e5c6fc1adf98dcd", size = 72802, upload-time = "2026-05-21T21:58:56.551Z" },
{ url = "https://files.pythonhosted.org/packages/0b/c9/584bc9651441b4ba60cc4d557d8a547b5aff901af35bda3a4ee30c819b82/starlette-1.0.0-py3-none-any.whl", hash = "sha256:d3ec55e0bb321692d275455ddfd3df75fff145d009685eb40dc91fc66b03d38b", size = 72651, upload-time = "2026-03-22T18:29:45.111Z" },
]
[[package]]