mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-14 15:02:37 +00:00
Compare commits
9 Commits
devin/1775
...
1.14.2a2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea58f8d34d | ||
|
|
fe93333066 | ||
|
|
1293dee241 | ||
|
|
6efa142e22 | ||
|
|
fc6792d067 | ||
|
|
84b1b0a0b0 | ||
|
|
56cf8a4384 | ||
|
|
68c754883d | ||
|
|
ce56472fc3 |
@@ -4,6 +4,33 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="10 أبريل 2026">
|
||||
## v1.14.2a2
|
||||
|
||||
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.2a2)
|
||||
|
||||
## ما الذي تغير
|
||||
|
||||
### الميزات
|
||||
- إضافة واجهة مستخدم نصية لنقطة التحقق مع عرض شجري، ودعم التفرع، ومدخلات/مخرجات قابلة للتعديل
|
||||
- إثراء تتبع رموز LLM مع رموز الاستدلال ورموز إنشاء التخزين المؤقت
|
||||
- إضافة معلمة `from_checkpoint` إلى طرق الانطلاق
|
||||
- تضمين `crewai_version` في نقاط التحقق مع إطار عمل الهجرة
|
||||
- إضافة تفرع نقاط التحقق مع تتبع السلالة
|
||||
|
||||
### إصلاحات الأخطاء
|
||||
- إصلاح توجيه الوضع الصارم إلى مزودي Anthropic وBedrock
|
||||
- تعزيز NL2SQLTool مع وضع القراءة فقط الافتراضي، والتحقق من الاستعلامات، والاستعلامات المعلمة
|
||||
|
||||
### الوثائق
|
||||
- تحديث سجل التغييرات والإصدار لـ v1.14.2a1
|
||||
|
||||
## المساهمون
|
||||
|
||||
@alex-clawd, @github-actions[bot], @greysonlalonde, @lucasgomide
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="9 أبريل 2026">
|
||||
## v1.14.2a1
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ mode: "wide"
|
||||
|
||||
يتيح ذلك سير عمل متعددة مثل أن يقوم وكيل بالوصول إلى قاعدة البيانات واسترجاع المعلومات بناءً على الهدف ثم استخدام تلك المعلومات لتوليد استجابة أو تقرير أو أي مخرجات أخرى. بالإضافة إلى ذلك، يوفر القدرة للوكيل على تحديث قاعدة البيانات بناءً على هدفه.
|
||||
|
||||
**تنبيه**: تأكد من أن الوكيل لديه وصول إلى نسخة قراءة فقط أو أنه من المقبول أن يقوم الوكيل بتنفيذ استعلامات إدراج/تحديث على قاعدة البيانات.
|
||||
**تنبيه**: الأداة للقراءة فقط بشكل افتراضي (SELECT/SHOW/DESCRIBE/EXPLAIN فقط). تتطلب عمليات الكتابة تمرير `allow_dml=True` أو ضبط متغير البيئة `CREWAI_NL2SQL_ALLOW_DML=true`. عند تفعيل الكتابة، تأكد من أن الوكيل يستخدم مستخدم قاعدة بيانات محدود الصلاحيات أو نسخة قراءة كلما أمكن.
|
||||
|
||||
## نموذج الأمان
|
||||
|
||||
@@ -36,6 +36,74 @@ mode: "wide"
|
||||
- أضف خطافات `before_tool_call` لفرض أنماط الاستعلام المسموح بها
|
||||
- فعّل تسجيل الاستعلامات والتنبيهات للعبارات التدميرية
|
||||
|
||||
## وضع القراءة فقط وتهيئة DML
|
||||
|
||||
تعمل `NL2SQLTool` في **وضع القراءة فقط بشكل افتراضي**. لا يُسمح إلا بأنواع العبارات التالية دون تهيئة إضافية:
|
||||
|
||||
- `SELECT`
|
||||
- `SHOW`
|
||||
- `DESCRIBE`
|
||||
- `EXPLAIN`
|
||||
|
||||
أي محاولة لتنفيذ عملية كتابة (`INSERT`، `UPDATE`، `DELETE`، `DROP`، `CREATE`، `ALTER`، `TRUNCATE`، إلخ) ستُسبب خطأً ما لم يتم تفعيل DML صراحةً.
|
||||
|
||||
كما تُحظر الاستعلامات متعددة العبارات التي تحتوي على فاصلة منقوطة (مثل `SELECT 1; DROP TABLE users`) في وضع القراءة فقط لمنع هجمات الحقن.
|
||||
|
||||
### تفعيل عمليات الكتابة
|
||||
|
||||
يمكنك تفعيل DML (لغة معالجة البيانات) بطريقتين:
|
||||
|
||||
**الخيار الأول — معامل المُنشئ:**
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
nl2sql = NL2SQLTool(
|
||||
db_uri="postgresql://example@localhost:5432/test_db",
|
||||
allow_dml=True,
|
||||
)
|
||||
```
|
||||
|
||||
**الخيار الثاني — متغير البيئة:**
|
||||
|
||||
```bash
|
||||
CREWAI_NL2SQL_ALLOW_DML=true
|
||||
```
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
# DML مفعّل عبر متغير البيئة
|
||||
nl2sql = NL2SQLTool(db_uri="postgresql://example@localhost:5432/test_db")
|
||||
```
|
||||
|
||||
### أمثلة الاستخدام
|
||||
|
||||
**القراءة فقط (الافتراضي) — آمن للتحليلات والتقارير:**
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
# يُسمح فقط بـ SELECT/SHOW/DESCRIBE/EXPLAIN
|
||||
nl2sql = NL2SQLTool(db_uri="postgresql://example@localhost:5432/test_db")
|
||||
```
|
||||
|
||||
**مع تفعيل DML — مطلوب لأعباء عمل الكتابة:**
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
# يُسمح بـ INSERT وUPDATE وDELETE وDROP وغيرها
|
||||
nl2sql = NL2SQLTool(
|
||||
db_uri="postgresql://example@localhost:5432/test_db",
|
||||
allow_dml=True,
|
||||
)
|
||||
```
|
||||
|
||||
<Warning>
|
||||
يمنح تفعيل DML للوكيل القدرة على تعديل البيانات أو حذفها. لا تفعّله إلا عندما يتطلب حالة الاستخدام صراحةً وصولاً للكتابة، وتأكد من أن بيانات اعتماد قاعدة البيانات محدودة بالحد الأدنى من الصلاحيات المطلوبة.
|
||||
</Warning>
|
||||
|
||||
## المتطلبات
|
||||
|
||||
- SqlAlchemy
|
||||
|
||||
@@ -4,6 +4,33 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="Apr 10, 2026">
|
||||
## v1.14.2a2
|
||||
|
||||
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.2a2)
|
||||
|
||||
## What's Changed
|
||||
|
||||
### Features
|
||||
- Add checkpoint TUI with tree view, fork support, and editable inputs/outputs
|
||||
- Enrich LLM token tracking with reasoning tokens and cache creation tokens
|
||||
- Add `from_checkpoint` parameter to kickoff methods
|
||||
- Embed `crewai_version` in checkpoints with migration framework
|
||||
- Add checkpoint forking with lineage tracking
|
||||
|
||||
### Bug Fixes
|
||||
- Fix strict mode forwarding to Anthropic and Bedrock providers
|
||||
- Harden NL2SQLTool with read-only default, query validation, and parameterized queries
|
||||
|
||||
### Documentation
|
||||
- Update changelog and version for v1.14.2a1
|
||||
|
||||
## Contributors
|
||||
|
||||
@alex-clawd, @github-actions[bot], @greysonlalonde, @lucasgomide
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="Apr 09, 2026">
|
||||
## v1.14.2a1
|
||||
|
||||
|
||||
@@ -54,6 +54,7 @@ crew = Crew(
|
||||
| `on_events` | `list[str]` | `["task_completed"]` | Event types that trigger a checkpoint |
|
||||
| `provider` | `BaseProvider` | `JsonProvider()` | Storage backend |
|
||||
| `max_checkpoints` | `int \| None` | `None` | Max checkpoints to keep. Oldest are pruned after each write. Pruning is handled by the provider. |
|
||||
| `restore_from` | `Path \| str \| None` | `None` | Path to a checkpoint to restore from. Used when passing config via a kickoff method's `from_checkpoint` parameter. |
|
||||
|
||||
### Inheritance and Opt-Out
|
||||
|
||||
@@ -79,13 +80,42 @@ crew = Crew(
|
||||
|
||||
## Resuming from a Checkpoint
|
||||
|
||||
Pass a `CheckpointConfig` with `restore_from` to any kickoff method. The crew restores from that checkpoint, skips completed tasks, and resumes.
|
||||
|
||||
```python
|
||||
# Restore and resume
|
||||
crew = Crew.from_checkpoint("./my_checkpoints/20260407T120000_abc123.json")
|
||||
result = crew.kickoff() # picks up from last completed task
|
||||
from crewai import Crew, CheckpointConfig
|
||||
|
||||
crew = Crew(agents=[...], tasks=[...])
|
||||
result = crew.kickoff(
|
||||
from_checkpoint=CheckpointConfig(
|
||||
restore_from="./my_checkpoints/20260407T120000_abc123.json",
|
||||
),
|
||||
)
|
||||
```
|
||||
|
||||
The restored crew skips already-completed tasks and resumes from the first incomplete one.
|
||||
Remaining `CheckpointConfig` fields apply to the new run, so checkpointing continues after the restore.
|
||||
|
||||
You can also use the classmethod directly:
|
||||
|
||||
```python
|
||||
config = CheckpointConfig(restore_from="./my_checkpoints/20260407T120000_abc123.json")
|
||||
crew = Crew.from_checkpoint(config)
|
||||
result = crew.kickoff()
|
||||
```
|
||||
|
||||
## Forking from a Checkpoint
|
||||
|
||||
`fork()` restores a checkpoint and starts a new execution branch. Useful for exploring alternative paths from the same point.
|
||||
|
||||
```python
|
||||
from crewai import Crew, CheckpointConfig
|
||||
|
||||
config = CheckpointConfig(restore_from="./my_checkpoints/20260407T120000_abc123.json")
|
||||
crew = Crew.fork(config, branch="experiment-a")
|
||||
result = crew.kickoff(inputs={"strategy": "aggressive"})
|
||||
```
|
||||
|
||||
Each fork gets a unique lineage ID so checkpoints from different branches don't collide. The `branch` label is optional and auto-generated if omitted.
|
||||
|
||||
## Works on Crew, Flow, and Agent
|
||||
|
||||
@@ -125,7 +155,8 @@ flow = MyFlow(
|
||||
result = flow.kickoff()
|
||||
|
||||
# Resume
|
||||
flow = MyFlow.from_checkpoint("./flow_cp/20260407T120000_abc123.json")
|
||||
config = CheckpointConfig(restore_from="./flow_cp/20260407T120000_abc123.json")
|
||||
flow = MyFlow.from_checkpoint(config)
|
||||
result = flow.kickoff()
|
||||
```
|
||||
|
||||
@@ -231,3 +262,44 @@ async def on_llm_done_async(source, event, state):
|
||||
The `state` argument is the `RuntimeState` passed automatically by the event bus when your handler accepts 3 parameters. You can register handlers on any event type listed in the [Event Listeners](/en/concepts/event-listener) documentation.
|
||||
|
||||
Checkpointing is best-effort: if a checkpoint write fails, the error is logged but execution continues uninterrupted.
|
||||
|
||||
## CLI
|
||||
|
||||
The `crewai checkpoint` command gives you a TUI for browsing, inspecting, resuming, and forking checkpoints. It auto-detects whether your checkpoints are JSON files or a SQLite database.
|
||||
|
||||
```bash
|
||||
# Launch the TUI — auto-detects .checkpoints/ or .checkpoints.db
|
||||
crewai checkpoint
|
||||
|
||||
# Point at a specific location
|
||||
crewai checkpoint --location ./my_checkpoints
|
||||
crewai checkpoint --location ./.checkpoints.db
|
||||
```
|
||||
|
||||
<Frame>
|
||||
<img src="/images/checkpointing.png" alt="Checkpoint TUI" />
|
||||
</Frame>
|
||||
|
||||
The left panel is a tree view. Checkpoints are grouped by branch, and forks nest under the checkpoint they diverged from. Select a checkpoint to see its metadata, entity state, and task progress in the detail panel. Hit **Resume** to pick up where it left off, or **Fork** to start a new branch from that point.
|
||||
|
||||
### Editing inputs and task outputs
|
||||
|
||||
When a checkpoint is selected, the detail panel shows:
|
||||
|
||||
- **Inputs** — if the original kickoff had inputs (e.g. `{topic}`), they appear as editable fields pre-filled with the original values. Change them before resuming or forking.
|
||||
- **Task outputs** — completed tasks show their output in editable text areas. Edit a task's output to change the context that downstream tasks receive. When you modify a task output and hit Fork, all subsequent tasks are invalidated and re-run with the new context.
|
||||
|
||||
This is useful for "what if" exploration — fork from a checkpoint, tweak a task's result, and see how it changes downstream behavior.
|
||||
|
||||
### Subcommands
|
||||
|
||||
```bash
|
||||
# List all checkpoints
|
||||
crewai checkpoint list ./my_checkpoints
|
||||
|
||||
# Inspect a specific checkpoint
|
||||
crewai checkpoint info ./my_checkpoints/20260407T120000_abc123.json
|
||||
|
||||
# Inspect latest in a SQLite database
|
||||
crewai checkpoint info ./.checkpoints.db
|
||||
```
|
||||
|
||||
@@ -13,7 +13,7 @@ This tool is used to convert natural language to SQL queries. When passed to the
|
||||
This enables multiple workflows like having an Agent to access the database fetch information based on the goal and then use the information to generate a response, report or any other output.
|
||||
Along with that provides the ability for the Agent to update the database based on its goal.
|
||||
|
||||
**Attention**: Make sure that the Agent has access to a Read-Replica or that is okay for the Agent to run insert/update queries on the database.
|
||||
**Attention**: By default the tool is read-only (SELECT/SHOW/DESCRIBE/EXPLAIN only). Write operations require `allow_dml=True` or the `CREWAI_NL2SQL_ALLOW_DML=true` environment variable. When write access is enabled, make sure the Agent uses a scoped database user or a read replica where possible.
|
||||
|
||||
## Security Model
|
||||
|
||||
@@ -38,6 +38,74 @@ Use all of the following in production:
|
||||
- Add `before_tool_call` hooks to enforce allowed query patterns
|
||||
- Enable query logging and alerting for destructive statements
|
||||
|
||||
## Read-Only Mode & DML Configuration
|
||||
|
||||
`NL2SQLTool` operates in **read-only mode by default**. Only the following statement types are permitted without additional configuration:
|
||||
|
||||
- `SELECT`
|
||||
- `SHOW`
|
||||
- `DESCRIBE`
|
||||
- `EXPLAIN`
|
||||
|
||||
Any attempt to execute a write operation (`INSERT`, `UPDATE`, `DELETE`, `DROP`, `CREATE`, `ALTER`, `TRUNCATE`, etc.) will raise an error unless DML is explicitly enabled.
|
||||
|
||||
Multi-statement queries containing semicolons (e.g. `SELECT 1; DROP TABLE users`) are also blocked in read-only mode to prevent injection attacks.
|
||||
|
||||
### Enabling Write Operations
|
||||
|
||||
You can enable DML (Data Manipulation Language) in two ways:
|
||||
|
||||
**Option 1 — constructor parameter:**
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
nl2sql = NL2SQLTool(
|
||||
db_uri="postgresql://example@localhost:5432/test_db",
|
||||
allow_dml=True,
|
||||
)
|
||||
```
|
||||
|
||||
**Option 2 — environment variable:**
|
||||
|
||||
```bash
|
||||
CREWAI_NL2SQL_ALLOW_DML=true
|
||||
```
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
# DML enabled via environment variable
|
||||
nl2sql = NL2SQLTool(db_uri="postgresql://example@localhost:5432/test_db")
|
||||
```
|
||||
|
||||
### Usage Examples
|
||||
|
||||
**Read-only (default) — safe for analytics and reporting:**
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
# Only SELECT/SHOW/DESCRIBE/EXPLAIN are permitted
|
||||
nl2sql = NL2SQLTool(db_uri="postgresql://example@localhost:5432/test_db")
|
||||
```
|
||||
|
||||
**DML enabled — required for write workloads:**
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
# INSERT, UPDATE, DELETE, DROP, etc. are permitted
|
||||
nl2sql = NL2SQLTool(
|
||||
db_uri="postgresql://example@localhost:5432/test_db",
|
||||
allow_dml=True,
|
||||
)
|
||||
```
|
||||
|
||||
<Warning>
|
||||
Enabling DML gives the agent the ability to modify or destroy data. Only enable this when your use case explicitly requires write access, and ensure the database credentials are scoped to the minimum required privileges.
|
||||
</Warning>
|
||||
|
||||
## Requirements
|
||||
|
||||
- SqlAlchemy
|
||||
|
||||
BIN
docs/images/checkpointing.png
Normal file
BIN
docs/images/checkpointing.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 315 KiB |
@@ -4,6 +4,33 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="2026년 4월 10일">
|
||||
## v1.14.2a2
|
||||
|
||||
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.2a2)
|
||||
|
||||
## 변경 사항
|
||||
|
||||
### 기능
|
||||
- 트리 뷰, 포크 지원 및 편집 가능한 입력/출력을 갖춘 체크포인트 TUI 추가
|
||||
- 추론 토큰 및 캐시 생성 토큰으로 LLM 토큰 추적 강화
|
||||
- 킥오프 메서드에 `from_checkpoint` 매개변수 추가
|
||||
- 마이그레이션 프레임워크와 함께 체크포인트에 `crewai_version` 포함
|
||||
- 계보 추적이 가능한 체크포인트 포킹 추가
|
||||
|
||||
### 버그 수정
|
||||
- Anthropic 및 Bedrock 공급자로의 엄격 모드 포워딩 수정
|
||||
- 읽기 전용 기본값, 쿼리 검증 및 매개변수화된 쿼리로 NL2SQLTool 강화
|
||||
|
||||
### 문서
|
||||
- v1.14.2a1에 대한 변경 로그 및 버전 업데이트
|
||||
|
||||
## 기여자
|
||||
|
||||
@alex-clawd, @github-actions[bot], @greysonlalonde, @lucasgomide
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="2026년 4월 9일">
|
||||
## v1.14.2a1
|
||||
|
||||
|
||||
@@ -11,7 +11,75 @@ mode: "wide"
|
||||
|
||||
이를 통해 에이전트가 데이터베이스에 접근하여 목표에 따라 정보를 가져오고, 해당 정보를 사용해 응답, 보고서 또는 기타 출력물을 생성하는 다양한 워크플로우가 가능해집니다. 또한 에이전트가 자신의 목표에 맞춰 데이터베이스를 업데이트할 수 있는 기능도 제공합니다.
|
||||
|
||||
**주의**: 에이전트가 Read-Replica에 접근할 수 있거나, 에이전트가 데이터베이스에 insert/update 쿼리를 실행해도 괜찮은지 반드시 확인하십시오.
|
||||
**주의**: 도구는 기본적으로 읽기 전용(SELECT/SHOW/DESCRIBE/EXPLAIN만 허용)으로 동작합니다. 쓰기 작업을 수행하려면 `allow_dml=True` 매개변수 또는 `CREWAI_NL2SQL_ALLOW_DML=true` 환경 변수가 필요합니다. 쓰기 접근이 활성화된 경우, 가능하면 권한이 제한된 데이터베이스 사용자나 읽기 복제본을 사용하십시오.
|
||||
|
||||
## 읽기 전용 모드 및 DML 구성
|
||||
|
||||
`NL2SQLTool`은 기본적으로 **읽기 전용 모드**로 동작합니다. 추가 구성 없이 허용되는 구문 유형은 다음과 같습니다:
|
||||
|
||||
- `SELECT`
|
||||
- `SHOW`
|
||||
- `DESCRIBE`
|
||||
- `EXPLAIN`
|
||||
|
||||
DML을 명시적으로 활성화하지 않으면 쓰기 작업(`INSERT`, `UPDATE`, `DELETE`, `DROP`, `CREATE`, `ALTER`, `TRUNCATE` 등)을 실행하려고 할 때 오류가 발생합니다.
|
||||
|
||||
읽기 전용 모드에서는 세미콜론이 포함된 다중 구문 쿼리(예: `SELECT 1; DROP TABLE users`)도 인젝션 공격을 방지하기 위해 차단됩니다.
|
||||
|
||||
### 쓰기 작업 활성화
|
||||
|
||||
DML(데이터 조작 언어)을 활성화하는 방법은 두 가지입니다:
|
||||
|
||||
**옵션 1 — 생성자 매개변수:**
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
nl2sql = NL2SQLTool(
|
||||
db_uri="postgresql://example@localhost:5432/test_db",
|
||||
allow_dml=True,
|
||||
)
|
||||
```
|
||||
|
||||
**옵션 2 — 환경 변수:**
|
||||
|
||||
```bash
|
||||
CREWAI_NL2SQL_ALLOW_DML=true
|
||||
```
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
# 환경 변수를 통해 DML 활성화
|
||||
nl2sql = NL2SQLTool(db_uri="postgresql://example@localhost:5432/test_db")
|
||||
```
|
||||
|
||||
### 사용 예시
|
||||
|
||||
**읽기 전용(기본값) — 분석 및 보고 워크로드에 안전:**
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
# SELECT/SHOW/DESCRIBE/EXPLAIN만 허용
|
||||
nl2sql = NL2SQLTool(db_uri="postgresql://example@localhost:5432/test_db")
|
||||
```
|
||||
|
||||
**DML 활성화 — 쓰기 워크로드에 필요:**
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
# INSERT, UPDATE, DELETE, DROP 등이 허용됨
|
||||
nl2sql = NL2SQLTool(
|
||||
db_uri="postgresql://example@localhost:5432/test_db",
|
||||
allow_dml=True,
|
||||
)
|
||||
```
|
||||
|
||||
<Warning>
|
||||
DML을 활성화하면 에이전트가 데이터를 수정하거나 삭제할 수 있습니다. 사용 사례에서 명시적으로 쓰기 접근이 필요한 경우에만 활성화하고, 데이터베이스 자격 증명이 최소 필요 권한으로 제한되어 있는지 확인하십시오.
|
||||
</Warning>
|
||||
|
||||
## 요구 사항
|
||||
|
||||
|
||||
@@ -4,6 +4,33 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="10 abr 2026">
|
||||
## v1.14.2a2
|
||||
|
||||
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.2a2)
|
||||
|
||||
## O que Mudou
|
||||
|
||||
### Funcionalidades
|
||||
- Adicionar TUI de ponto de verificação com visualização em árvore, suporte a bifurcações e entradas/saídas editáveis
|
||||
- Enriquecer o rastreamento de tokens LLM com tokens de raciocínio e tokens de criação de cache
|
||||
- Adicionar parâmetro `from_checkpoint` aos métodos de inicialização
|
||||
- Incorporar `crewai_version` em pontos de verificação com o framework de migração
|
||||
- Adicionar bifurcação de ponto de verificação com rastreamento de linhagem
|
||||
|
||||
### Correções de Bugs
|
||||
- Corrigir o encaminhamento em modo estrito para os provedores Anthropic e Bedrock
|
||||
- Fortalecer NL2SQLTool com padrão somente leitura, validação de consultas e consultas parametrizadas
|
||||
|
||||
### Documentação
|
||||
- Atualizar changelog e versão para v1.14.2a1
|
||||
|
||||
## Contributors
|
||||
|
||||
@alex-clawd, @github-actions[bot], @greysonlalonde, @lucasgomide
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="09 abr 2026">
|
||||
## v1.14.2a1
|
||||
|
||||
|
||||
@@ -11,7 +11,75 @@ Esta ferramenta é utilizada para converter linguagem natural em consultas SQL.
|
||||
|
||||
Isso possibilita múltiplos fluxos de trabalho, como por exemplo ter um Agente acessando o banco de dados para buscar informações com base em um objetivo e, então, usar essas informações para gerar uma resposta, relatório ou qualquer outro tipo de saída. Além disso, permite que o Agente atualize o banco de dados de acordo com seu objetivo.
|
||||
|
||||
**Atenção**: Certifique-se de que o Agente tenha acesso a um Read-Replica ou que seja permitido que o Agente execute consultas de inserção/atualização no banco de dados.
|
||||
**Atenção**: Por padrão, a ferramenta opera em modo somente leitura (apenas SELECT/SHOW/DESCRIBE/EXPLAIN). Operações de escrita exigem `allow_dml=True` ou a variável de ambiente `CREWAI_NL2SQL_ALLOW_DML=true`. Quando o acesso de escrita estiver habilitado, certifique-se de que o Agente use um usuário de banco de dados com privilégios mínimos ou um Read-Replica sempre que possível.
|
||||
|
||||
## Modo Somente Leitura e Configuração de DML
|
||||
|
||||
O `NL2SQLTool` opera em **modo somente leitura por padrão**. Apenas os seguintes tipos de instrução são permitidos sem configuração adicional:
|
||||
|
||||
- `SELECT`
|
||||
- `SHOW`
|
||||
- `DESCRIBE`
|
||||
- `EXPLAIN`
|
||||
|
||||
Qualquer tentativa de executar uma operação de escrita (`INSERT`, `UPDATE`, `DELETE`, `DROP`, `CREATE`, `ALTER`, `TRUNCATE`, etc.) resultará em erro, a menos que o DML seja habilitado explicitamente.
|
||||
|
||||
Consultas com múltiplas instruções contendo ponto e vírgula (ex.: `SELECT 1; DROP TABLE users`) também são bloqueadas no modo somente leitura para prevenir ataques de injeção.
|
||||
|
||||
### Habilitando Operações de Escrita
|
||||
|
||||
Você pode habilitar DML (Linguagem de Manipulação de Dados) de duas formas:
|
||||
|
||||
**Opção 1 — parâmetro do construtor:**
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
nl2sql = NL2SQLTool(
|
||||
db_uri="postgresql://example@localhost:5432/test_db",
|
||||
allow_dml=True,
|
||||
)
|
||||
```
|
||||
|
||||
**Opção 2 — variável de ambiente:**
|
||||
|
||||
```bash
|
||||
CREWAI_NL2SQL_ALLOW_DML=true
|
||||
```
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
# DML habilitado via variável de ambiente
|
||||
nl2sql = NL2SQLTool(db_uri="postgresql://example@localhost:5432/test_db")
|
||||
```
|
||||
|
||||
### Exemplos de Uso
|
||||
|
||||
**Somente leitura (padrão) — seguro para análise e relatórios:**
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
# Apenas SELECT/SHOW/DESCRIBE/EXPLAIN são permitidos
|
||||
nl2sql = NL2SQLTool(db_uri="postgresql://example@localhost:5432/test_db")
|
||||
```
|
||||
|
||||
**Com DML habilitado — necessário para workloads de escrita:**
|
||||
|
||||
```python
|
||||
from crewai_tools import NL2SQLTool
|
||||
|
||||
# INSERT, UPDATE, DELETE, DROP, etc. são permitidos
|
||||
nl2sql = NL2SQLTool(
|
||||
db_uri="postgresql://example@localhost:5432/test_db",
|
||||
allow_dml=True,
|
||||
)
|
||||
```
|
||||
|
||||
<Warning>
|
||||
Habilitar DML concede ao agente a capacidade de modificar ou destruir dados. Ative apenas quando o seu caso de uso exigir explicitamente acesso de escrita e certifique-se de que as credenciais do banco de dados estejam limitadas aos privilégios mínimos necessários.
|
||||
</Warning>
|
||||
|
||||
## Requisitos
|
||||
|
||||
|
||||
@@ -152,4 +152,4 @@ __all__ = [
|
||||
"wrap_file_source",
|
||||
]
|
||||
|
||||
__version__ = "1.14.2a1"
|
||||
__version__ = "1.14.2a2"
|
||||
|
||||
@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
|
||||
dependencies = [
|
||||
"pytube~=15.0.0",
|
||||
"requests~=2.32.5",
|
||||
"crewai==1.14.2a1",
|
||||
"crewai==1.14.2a2",
|
||||
"tiktoken~=0.8.0",
|
||||
"beautifulsoup4~=4.13.4",
|
||||
"python-docx~=1.2.0",
|
||||
|
||||
@@ -305,4 +305,4 @@ __all__ = [
|
||||
"ZapierActionTools",
|
||||
]
|
||||
|
||||
__version__ = "1.14.2a1"
|
||||
__version__ = "1.14.2a2"
|
||||
|
||||
@@ -1,7 +1,17 @@
|
||||
from collections.abc import Iterator
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
|
||||
try:
|
||||
from typing import Self
|
||||
except ImportError:
|
||||
from typing_extensions import Self
|
||||
|
||||
from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
|
||||
|
||||
try:
|
||||
@@ -12,6 +22,186 @@ try:
|
||||
except ImportError:
|
||||
SQLALCHEMY_AVAILABLE = False
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Commands allowed in read-only mode
|
||||
# NOTE: WITH is intentionally excluded — writable CTEs start with WITH, so the
|
||||
# CTE body must be inspected separately (see _validate_statement).
|
||||
_READ_ONLY_COMMANDS = {"SELECT", "SHOW", "DESCRIBE", "DESC", "EXPLAIN"}
|
||||
|
||||
# Commands that mutate state and are blocked by default
|
||||
_WRITE_COMMANDS = {
|
||||
"INSERT",
|
||||
"UPDATE",
|
||||
"DELETE",
|
||||
"DROP",
|
||||
"ALTER",
|
||||
"CREATE",
|
||||
"TRUNCATE",
|
||||
"GRANT",
|
||||
"REVOKE",
|
||||
"EXEC",
|
||||
"EXECUTE",
|
||||
"CALL",
|
||||
"MERGE",
|
||||
"REPLACE",
|
||||
"UPSERT",
|
||||
"LOAD",
|
||||
"COPY",
|
||||
"VACUUM",
|
||||
"ANALYZE",
|
||||
"ANALYSE",
|
||||
"REINDEX",
|
||||
"CLUSTER",
|
||||
"REFRESH",
|
||||
"COMMENT",
|
||||
"SET",
|
||||
"RESET",
|
||||
}
|
||||
|
||||
|
||||
# Subset of write commands that can realistically appear *inside* a CTE body.
|
||||
# Narrower than _WRITE_COMMANDS to avoid false positives on identifiers like
|
||||
# ``comment``, ``set``, or ``reset`` which are common column/table names.
|
||||
_CTE_WRITE_INDICATORS = {
|
||||
"INSERT",
|
||||
"UPDATE",
|
||||
"DELETE",
|
||||
"DROP",
|
||||
"ALTER",
|
||||
"CREATE",
|
||||
"TRUNCATE",
|
||||
"MERGE",
|
||||
}
|
||||
|
||||
|
||||
_AS_PAREN_RE = re.compile(r"\bAS\s*\(", re.IGNORECASE)
|
||||
|
||||
|
||||
def _iter_as_paren_matches(stmt: str) -> Iterator[re.Match[str]]:
|
||||
"""Yield regex matches for ``AS\\s*(`` outside of string literals."""
|
||||
# Build a set of character positions that are inside string literals.
|
||||
in_string: set[int] = set()
|
||||
i = 0
|
||||
while i < len(stmt):
|
||||
if stmt[i] == "'":
|
||||
start = i
|
||||
end = _skip_string_literal(stmt, i)
|
||||
in_string.update(range(start, end))
|
||||
i = end
|
||||
else:
|
||||
i += 1
|
||||
|
||||
for m in _AS_PAREN_RE.finditer(stmt):
|
||||
if m.start() not in in_string:
|
||||
yield m
|
||||
|
||||
|
||||
def _detect_writable_cte(stmt: str) -> str | None:
|
||||
"""Return the first write command inside a CTE body, or None.
|
||||
|
||||
Instead of tokenizing the whole statement (which falsely matches column
|
||||
names like ``comment``), this walks through parenthesized CTE bodies and
|
||||
checks only the *first keyword after* an opening ``AS (`` for a write
|
||||
command. Uses a regex to handle any whitespace (spaces, tabs, newlines)
|
||||
between ``AS`` and ``(``. Skips matches inside string literals.
|
||||
"""
|
||||
for m in _iter_as_paren_matches(stmt):
|
||||
body = stmt[m.end() :].lstrip()
|
||||
first_word = body.split()[0].upper().strip("()") if body.split() else ""
|
||||
if first_word in _CTE_WRITE_INDICATORS:
|
||||
return first_word
|
||||
return None
|
||||
|
||||
|
||||
def _skip_string_literal(stmt: str, pos: int) -> int:
|
||||
"""Skip past a string literal starting at pos (single-quoted).
|
||||
|
||||
Handles escaped quotes ('') inside the literal.
|
||||
Returns the index after the closing quote.
|
||||
"""
|
||||
quote_char = stmt[pos]
|
||||
i = pos + 1
|
||||
while i < len(stmt):
|
||||
if stmt[i] == quote_char:
|
||||
# Check for escaped quote ('')
|
||||
if i + 1 < len(stmt) and stmt[i + 1] == quote_char:
|
||||
i += 2
|
||||
continue
|
||||
return i + 1
|
||||
i += 1
|
||||
return i # Unterminated literal — return end
|
||||
|
||||
|
||||
def _find_matching_close_paren(stmt: str, start: int) -> int:
|
||||
"""Find the matching close paren, skipping string literals."""
|
||||
depth = 1
|
||||
i = start
|
||||
while i < len(stmt) and depth > 0:
|
||||
ch = stmt[i]
|
||||
if ch == "'":
|
||||
i = _skip_string_literal(stmt, i)
|
||||
continue
|
||||
if ch == "(":
|
||||
depth += 1
|
||||
elif ch == ")":
|
||||
depth -= 1
|
||||
i += 1
|
||||
return i
|
||||
|
||||
|
||||
def _extract_main_query_after_cte(stmt: str) -> str | None:
|
||||
"""Extract the main (outer) query that follows all CTE definitions.
|
||||
|
||||
For ``WITH cte AS (SELECT 1) DELETE FROM users``, returns ``DELETE FROM users``.
|
||||
Returns None if no main query is found after the last CTE body.
|
||||
Handles parentheses inside string literals (e.g., ``SELECT '(' FROM t``).
|
||||
"""
|
||||
last_cte_end = 0
|
||||
for m in _iter_as_paren_matches(stmt):
|
||||
last_cte_end = _find_matching_close_paren(stmt, m.end())
|
||||
|
||||
if last_cte_end > 0:
|
||||
remainder = stmt[last_cte_end:].strip().lstrip(",").strip()
|
||||
if remainder:
|
||||
return remainder
|
||||
return None
|
||||
|
||||
|
||||
def _resolve_explain_command(stmt: str) -> str | None:
|
||||
"""Resolve the underlying command from an EXPLAIN [ANALYZE] [VERBOSE] statement.
|
||||
|
||||
Returns the real command (e.g., 'DELETE') if ANALYZE is present, else None.
|
||||
Handles both space-separated and parenthesized syntax.
|
||||
"""
|
||||
rest = stmt.strip()[len("EXPLAIN") :].strip()
|
||||
if not rest:
|
||||
return None
|
||||
|
||||
analyze_found = False
|
||||
explain_opts = {"ANALYZE", "ANALYSE", "VERBOSE"}
|
||||
|
||||
if rest.startswith("("):
|
||||
close = rest.find(")")
|
||||
if close != -1:
|
||||
options_str = rest[1:close].upper()
|
||||
analyze_found = any(
|
||||
opt.strip() in ("ANALYZE", "ANALYSE") for opt in options_str.split(",")
|
||||
)
|
||||
rest = rest[close + 1 :].strip()
|
||||
else:
|
||||
while rest:
|
||||
first_opt = rest.split()[0].upper().rstrip(";") if rest.split() else ""
|
||||
if first_opt in ("ANALYZE", "ANALYSE"):
|
||||
analyze_found = True
|
||||
if first_opt not in explain_opts:
|
||||
break
|
||||
rest = rest[len(first_opt) :].strip()
|
||||
|
||||
if analyze_found and rest:
|
||||
return rest.split()[0].upper().rstrip(";")
|
||||
return None
|
||||
|
||||
|
||||
class NL2SQLToolInput(BaseModel):
|
||||
sql_query: str = Field(
|
||||
@@ -21,20 +211,70 @@ class NL2SQLToolInput(BaseModel):
|
||||
|
||||
|
||||
class NL2SQLTool(BaseTool):
|
||||
"""Tool that converts natural language to SQL and executes it against a database.
|
||||
|
||||
By default the tool operates in **read-only mode**: only SELECT, SHOW,
|
||||
DESCRIBE, EXPLAIN, and read-only CTEs (WITH … SELECT) are permitted. Write
|
||||
operations (INSERT, UPDATE, DELETE, DROP, ALTER, CREATE, TRUNCATE, …) are
|
||||
blocked unless ``allow_dml=True`` is set explicitly or the environment
|
||||
variable ``CREWAI_NL2SQL_ALLOW_DML=true`` is present.
|
||||
|
||||
Writable CTEs (``WITH d AS (DELETE …) SELECT …``) and
|
||||
``EXPLAIN ANALYZE <write-stmt>`` are treated as write operations and are
|
||||
blocked in read-only mode.
|
||||
|
||||
The ``_fetch_all_available_columns`` helper uses parameterised queries so
|
||||
that table names coming from the database catalogue cannot be used as an
|
||||
injection vector.
|
||||
"""
|
||||
|
||||
name: str = "NL2SQLTool"
|
||||
description: str = "Converts natural language to SQL queries and executes them."
|
||||
description: str = (
|
||||
"Converts natural language to SQL queries and executes them against a "
|
||||
"database. Read-only by default — only SELECT/SHOW/DESCRIBE/EXPLAIN "
|
||||
"queries (and read-only CTEs) are allowed unless configured with "
|
||||
"allow_dml=True."
|
||||
)
|
||||
db_uri: str = Field(
|
||||
title="Database URI",
|
||||
description="The URI of the database to connect to.",
|
||||
)
|
||||
allow_dml: bool = Field(
|
||||
default=False,
|
||||
title="Allow DML",
|
||||
description=(
|
||||
"When False (default) only read statements are permitted. "
|
||||
"Set to True to allow INSERT/UPDATE/DELETE/DROP and other "
|
||||
"write operations."
|
||||
),
|
||||
)
|
||||
tables: list[dict[str, Any]] = Field(default_factory=list)
|
||||
columns: dict[str, list[dict[str, Any]] | str] = Field(default_factory=dict)
|
||||
args_schema: type[BaseModel] = NL2SQLToolInput
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _apply_env_override(self) -> Self:
|
||||
"""Allow CREWAI_NL2SQL_ALLOW_DML=true to override allow_dml at runtime."""
|
||||
if os.environ.get("CREWAI_NL2SQL_ALLOW_DML", "").strip().lower() == "true":
|
||||
if not self.allow_dml:
|
||||
logger.warning(
|
||||
"NL2SQLTool: CREWAI_NL2SQL_ALLOW_DML env var is set — "
|
||||
"DML/DDL operations are enabled. Ensure this is intentional."
|
||||
)
|
||||
self.allow_dml = True
|
||||
return self
|
||||
|
||||
def model_post_init(self, __context: Any) -> None:
|
||||
if not SQLALCHEMY_AVAILABLE:
|
||||
raise ImportError(
|
||||
"sqlalchemy is not installed. Please install it with `pip install crewai-tools[sqlalchemy]`"
|
||||
"sqlalchemy is not installed. Please install it with "
|
||||
"`pip install crewai-tools[sqlalchemy]`"
|
||||
)
|
||||
|
||||
if self.allow_dml:
|
||||
logger.warning(
|
||||
"NL2SQLTool: allow_dml=True — write operations (INSERT/UPDATE/"
|
||||
"DELETE/DROP/…) are permitted. Use with caution."
|
||||
)
|
||||
|
||||
data: dict[str, list[dict[str, Any]] | str] = {}
|
||||
@@ -50,42 +290,216 @@ class NL2SQLTool(BaseTool):
|
||||
self.tables = tables
|
||||
self.columns = data
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Query validation
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _validate_query(self, sql_query: str) -> None:
|
||||
"""Raise ValueError if *sql_query* is not permitted under the current config.
|
||||
|
||||
Splits the query on semicolons and validates each statement
|
||||
independently. When ``allow_dml=False`` (the default), multi-statement
|
||||
queries are rejected outright to prevent ``SELECT 1; DROP TABLE users``
|
||||
style bypasses. When ``allow_dml=True`` every statement is checked and
|
||||
a warning is emitted for write operations.
|
||||
"""
|
||||
statements = [s.strip() for s in sql_query.split(";") if s.strip()]
|
||||
|
||||
if not statements:
|
||||
raise ValueError("NL2SQLTool received an empty SQL query.")
|
||||
|
||||
if not self.allow_dml and len(statements) > 1:
|
||||
raise ValueError(
|
||||
"NL2SQLTool blocked a multi-statement query in read-only mode. "
|
||||
"Semicolons are not permitted when allow_dml=False."
|
||||
)
|
||||
|
||||
for stmt in statements:
|
||||
self._validate_statement(stmt)
|
||||
|
||||
def _validate_statement(self, stmt: str) -> None:
|
||||
"""Validate a single SQL statement (no semicolons)."""
|
||||
command = self._extract_command(stmt)
|
||||
|
||||
# EXPLAIN ANALYZE / EXPLAIN ANALYSE actually *executes* the underlying
|
||||
# query. Resolve the real command so write operations are caught.
|
||||
# Handles both space-separated ("EXPLAIN ANALYZE DELETE …") and
|
||||
# parenthesized ("EXPLAIN (ANALYZE) DELETE …", "EXPLAIN (ANALYZE, VERBOSE) DELETE …").
|
||||
# EXPLAIN ANALYZE actually executes the underlying query — resolve the
|
||||
# real command so write operations are caught.
|
||||
if command == "EXPLAIN":
|
||||
resolved = _resolve_explain_command(stmt)
|
||||
if resolved:
|
||||
command = resolved
|
||||
|
||||
# WITH starts a CTE. Read-only CTEs are fine; writable CTEs
|
||||
# (e.g. WITH d AS (DELETE …) SELECT …) must be blocked in read-only mode.
|
||||
if command == "WITH":
|
||||
# Check for write commands inside CTE bodies.
|
||||
write_found = _detect_writable_cte(stmt)
|
||||
if write_found:
|
||||
found = write_found
|
||||
if not self.allow_dml:
|
||||
raise ValueError(
|
||||
f"NL2SQLTool is configured in read-only mode and blocked a "
|
||||
f"writable CTE containing a '{found}' statement. To allow "
|
||||
f"write operations set allow_dml=True or "
|
||||
f"CREWAI_NL2SQL_ALLOW_DML=true."
|
||||
)
|
||||
logger.warning(
|
||||
"NL2SQLTool: executing writable CTE with '%s' because allow_dml=True.",
|
||||
found,
|
||||
)
|
||||
return
|
||||
|
||||
# Check the main query after the CTE definitions.
|
||||
main_query = _extract_main_query_after_cte(stmt)
|
||||
if main_query:
|
||||
main_cmd = main_query.split()[0].upper().rstrip(";")
|
||||
if main_cmd in _WRITE_COMMANDS:
|
||||
if not self.allow_dml:
|
||||
raise ValueError(
|
||||
f"NL2SQLTool is configured in read-only mode and blocked a "
|
||||
f"'{main_cmd}' statement after a CTE. To allow write "
|
||||
f"operations set allow_dml=True or "
|
||||
f"CREWAI_NL2SQL_ALLOW_DML=true."
|
||||
)
|
||||
logger.warning(
|
||||
"NL2SQLTool: executing '%s' after CTE because allow_dml=True.",
|
||||
main_cmd,
|
||||
)
|
||||
elif main_cmd not in _READ_ONLY_COMMANDS:
|
||||
if not self.allow_dml:
|
||||
raise ValueError(
|
||||
f"NL2SQLTool blocked an unrecognised SQL command '{main_cmd}' "
|
||||
f"after a CTE. Only {sorted(_READ_ONLY_COMMANDS)} are allowed "
|
||||
f"in read-only mode."
|
||||
)
|
||||
return
|
||||
|
||||
if command in _WRITE_COMMANDS:
|
||||
if not self.allow_dml:
|
||||
raise ValueError(
|
||||
f"NL2SQLTool is configured in read-only mode and blocked a "
|
||||
f"'{command}' statement. To allow write operations set "
|
||||
f"allow_dml=True or CREWAI_NL2SQL_ALLOW_DML=true."
|
||||
)
|
||||
logger.warning(
|
||||
"NL2SQLTool: executing write statement '%s' because allow_dml=True.",
|
||||
command,
|
||||
)
|
||||
elif command not in _READ_ONLY_COMMANDS:
|
||||
# Unknown command — block by default unless DML is explicitly enabled
|
||||
if not self.allow_dml:
|
||||
raise ValueError(
|
||||
f"NL2SQLTool blocked an unrecognised SQL command '{command}'. "
|
||||
f"Only {sorted(_READ_ONLY_COMMANDS)} are allowed in read-only "
|
||||
f"mode."
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _extract_command(sql_query: str) -> str:
|
||||
"""Return the uppercased first keyword of *sql_query*."""
|
||||
stripped = sql_query.strip().lstrip("(")
|
||||
first_token = stripped.split()[0] if stripped.split() else ""
|
||||
return first_token.upper().rstrip(";")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Schema introspection helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _fetch_available_tables(self) -> list[dict[str, Any]] | str:
|
||||
return self.execute_sql(
|
||||
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';"
|
||||
"SELECT table_name FROM information_schema.tables "
|
||||
"WHERE table_schema = 'public';"
|
||||
)
|
||||
|
||||
def _fetch_all_available_columns(
|
||||
self, table_name: str
|
||||
) -> list[dict[str, Any]] | str:
|
||||
"""Fetch columns for *table_name* using a parameterised query.
|
||||
|
||||
The table name is bound via SQLAlchemy's ``:param`` syntax to prevent
|
||||
SQL injection from catalogue values.
|
||||
"""
|
||||
return self.execute_sql(
|
||||
f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table_name}';" # noqa: S608
|
||||
"SELECT column_name, data_type FROM information_schema.columns "
|
||||
"WHERE table_name = :table_name",
|
||||
params={"table_name": table_name},
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Core execution
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _run(self, sql_query: str) -> list[dict[str, Any]] | str:
|
||||
try:
|
||||
self._validate_query(sql_query)
|
||||
data = self.execute_sql(sql_query)
|
||||
except ValueError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
data = (
|
||||
f"Based on these tables {self.tables} and columns {self.columns}, "
|
||||
"you can create SQL queries to retrieve data from the database."
|
||||
f"Get the original request {sql_query} and the error {exc} and create the correct SQL query."
|
||||
"you can create SQL queries to retrieve data from the database. "
|
||||
f"Get the original request {sql_query} and the error {exc} and "
|
||||
"create the correct SQL query."
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
def execute_sql(self, sql_query: str) -> list[dict[str, Any]] | str:
|
||||
def execute_sql(
|
||||
self,
|
||||
sql_query: str,
|
||||
params: dict[str, Any] | None = None,
|
||||
) -> list[dict[str, Any]] | str:
|
||||
"""Execute *sql_query* and return the results as a list of dicts.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
sql_query:
|
||||
The SQL statement to run.
|
||||
params:
|
||||
Optional mapping of bind parameters (e.g. ``{"table_name": "users"}``).
|
||||
"""
|
||||
if not SQLALCHEMY_AVAILABLE:
|
||||
raise ImportError(
|
||||
"sqlalchemy is not installed. Please install it with `pip install crewai-tools[sqlalchemy]`"
|
||||
"sqlalchemy is not installed. Please install it with "
|
||||
"`pip install crewai-tools[sqlalchemy]`"
|
||||
)
|
||||
|
||||
# Check ALL statements so that e.g. "SELECT 1; DROP TABLE t" triggers a
|
||||
# commit when allow_dml=True, regardless of statement order.
|
||||
_stmts = [s.strip() for s in sql_query.split(";") if s.strip()]
|
||||
|
||||
def _is_write_stmt(s: str) -> bool:
|
||||
cmd = self._extract_command(s)
|
||||
if cmd in _WRITE_COMMANDS:
|
||||
return True
|
||||
if cmd == "EXPLAIN":
|
||||
# Resolve the underlying command for EXPLAIN ANALYZE
|
||||
resolved = _resolve_explain_command(s)
|
||||
if resolved and resolved in _WRITE_COMMANDS:
|
||||
return True
|
||||
if cmd == "WITH":
|
||||
if _detect_writable_cte(s):
|
||||
return True
|
||||
main_q = _extract_main_query_after_cte(s)
|
||||
if main_q:
|
||||
return main_q.split()[0].upper().rstrip(";") in _WRITE_COMMANDS
|
||||
return False
|
||||
|
||||
is_write = any(_is_write_stmt(s) for s in _stmts)
|
||||
|
||||
engine = create_engine(self.db_uri)
|
||||
Session = sessionmaker(bind=engine) # noqa: N806
|
||||
session = Session()
|
||||
try:
|
||||
result = session.execute(text(sql_query))
|
||||
session.commit()
|
||||
result = session.execute(text(sql_query), params or {})
|
||||
|
||||
# Only commit when the operation actually mutates state
|
||||
if self.allow_dml and is_write:
|
||||
session.commit()
|
||||
|
||||
if result.returns_rows: # type: ignore[attr-defined]
|
||||
columns = result.keys()
|
||||
|
||||
671
lib/crewai-tools/tests/tools/test_nl2sql_security.py
Normal file
671
lib/crewai-tools/tests/tools/test_nl2sql_security.py
Normal file
@@ -0,0 +1,671 @@
|
||||
"""Security tests for NL2SQLTool.
|
||||
|
||||
Uses an in-memory SQLite database so no external service is needed.
|
||||
SQLite does not have information_schema, so we patch the schema-introspection
|
||||
helpers to avoid bootstrap failures and focus purely on the security logic.
|
||||
"""
|
||||
import os
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Skip the entire module if SQLAlchemy is not installed
|
||||
pytest.importorskip("sqlalchemy")
|
||||
|
||||
from sqlalchemy import create_engine, text # noqa: E402
|
||||
|
||||
from crewai_tools.tools.nl2sql.nl2sql_tool import NL2SQLTool # noqa: E402
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
SQLITE_URI = "sqlite://" # in-memory
|
||||
|
||||
|
||||
def _make_tool(allow_dml: bool = False, **kwargs) -> NL2SQLTool:
|
||||
"""Return a NL2SQLTool wired to an in-memory SQLite DB.
|
||||
|
||||
Schema-introspection is patched out so we can create the tool without a
|
||||
real PostgreSQL information_schema.
|
||||
"""
|
||||
with (
|
||||
patch.object(NL2SQLTool, "_fetch_available_tables", return_value=[]),
|
||||
patch.object(NL2SQLTool, "_fetch_all_available_columns", return_value=[]),
|
||||
):
|
||||
return NL2SQLTool(db_uri=SQLITE_URI, allow_dml=allow_dml, **kwargs)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Read-only enforcement (allow_dml=False)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestReadOnlyMode:
|
||||
def test_select_allowed_by_default(self):
|
||||
tool = _make_tool()
|
||||
# SQLite supports SELECT without information_schema
|
||||
result = tool.execute_sql("SELECT 1 AS val")
|
||||
assert result == [{"val": 1}]
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"stmt",
|
||||
[
|
||||
"INSERT INTO t VALUES (1)",
|
||||
"UPDATE t SET col = 1",
|
||||
"DELETE FROM t",
|
||||
"DROP TABLE t",
|
||||
"ALTER TABLE t ADD col TEXT",
|
||||
"CREATE TABLE t (id INTEGER)",
|
||||
"TRUNCATE TABLE t",
|
||||
"GRANT SELECT ON t TO user1",
|
||||
"REVOKE SELECT ON t FROM user1",
|
||||
"EXEC sp_something",
|
||||
"EXECUTE sp_something",
|
||||
"CALL proc()",
|
||||
],
|
||||
)
|
||||
def test_write_statements_blocked_by_default(self, stmt: str):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query(stmt)
|
||||
|
||||
def test_explain_allowed(self):
|
||||
tool = _make_tool()
|
||||
# Should not raise
|
||||
tool._validate_query("EXPLAIN SELECT 1")
|
||||
|
||||
def test_read_only_cte_allowed(self):
|
||||
tool = _make_tool()
|
||||
tool._validate_query("WITH cte AS (SELECT 1) SELECT * FROM cte")
|
||||
|
||||
def test_show_allowed(self):
|
||||
tool = _make_tool()
|
||||
tool._validate_query("SHOW TABLES")
|
||||
|
||||
def test_describe_allowed(self):
|
||||
tool = _make_tool()
|
||||
tool._validate_query("DESCRIBE users")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DML enabled (allow_dml=True)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDMLEnabled:
|
||||
def test_insert_allowed_when_dml_enabled(self):
|
||||
tool = _make_tool(allow_dml=True)
|
||||
# Should not raise
|
||||
tool._validate_query("INSERT INTO t VALUES (1)")
|
||||
|
||||
def test_delete_allowed_when_dml_enabled(self):
|
||||
tool = _make_tool(allow_dml=True)
|
||||
tool._validate_query("DELETE FROM t WHERE id = 1")
|
||||
|
||||
def test_drop_allowed_when_dml_enabled(self):
|
||||
tool = _make_tool(allow_dml=True)
|
||||
tool._validate_query("DROP TABLE t")
|
||||
|
||||
def test_dml_actually_persists(self):
|
||||
"""End-to-end: INSERT commits when allow_dml=True."""
|
||||
# Use a file-based SQLite so we can verify persistence across sessions
|
||||
import tempfile, os
|
||||
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
||||
db_path = f.name
|
||||
uri = f"sqlite:///{db_path}"
|
||||
try:
|
||||
tool = _make_tool(allow_dml=True)
|
||||
tool.db_uri = uri
|
||||
|
||||
engine = create_engine(uri)
|
||||
with engine.connect() as conn:
|
||||
conn.execute(text("CREATE TABLE items (id INTEGER PRIMARY KEY)"))
|
||||
conn.commit()
|
||||
|
||||
tool.execute_sql("INSERT INTO items VALUES (42)")
|
||||
|
||||
with engine.connect() as conn:
|
||||
rows = conn.execute(text("SELECT id FROM items")).fetchall()
|
||||
assert (42,) in rows
|
||||
finally:
|
||||
os.unlink(db_path)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Parameterised query — SQL injection prevention
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestParameterisedQueries:
|
||||
def test_table_name_is_parameterised(self):
|
||||
"""_fetch_all_available_columns must not interpolate table_name into SQL."""
|
||||
tool = _make_tool()
|
||||
captured_calls = []
|
||||
|
||||
def recording_execute_sql(self_inner, sql_query, params=None):
|
||||
captured_calls.append((sql_query, params))
|
||||
return []
|
||||
|
||||
with patch.object(NL2SQLTool, "execute_sql", recording_execute_sql):
|
||||
tool._fetch_all_available_columns("users'; DROP TABLE users; --")
|
||||
|
||||
assert len(captured_calls) == 1
|
||||
sql, params = captured_calls[0]
|
||||
# The raw SQL must NOT contain the injected string
|
||||
assert "DROP" not in sql
|
||||
# The table name must be passed as a parameter
|
||||
assert params is not None
|
||||
assert params.get("table_name") == "users'; DROP TABLE users; --"
|
||||
# The SQL template must use the :param syntax
|
||||
assert ":table_name" in sql
|
||||
|
||||
def test_injection_string_not_in_sql_template(self):
|
||||
"""The f-string vulnerability is gone — table name never lands in the SQL."""
|
||||
tool = _make_tool()
|
||||
injection = "'; DROP TABLE users; --"
|
||||
captured = {}
|
||||
|
||||
def spy(self_inner, sql_query, params=None):
|
||||
captured["sql"] = sql_query
|
||||
captured["params"] = params
|
||||
return []
|
||||
|
||||
with patch.object(NL2SQLTool, "execute_sql", spy):
|
||||
tool._fetch_all_available_columns(injection)
|
||||
|
||||
assert injection not in captured["sql"]
|
||||
assert captured["params"]["table_name"] == injection
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# session.commit() not called for read-only queries
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestNoCommitForReadOnly:
|
||||
def test_select_does_not_commit(self):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
|
||||
mock_session = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_result.returns_rows = True
|
||||
mock_result.keys.return_value = ["val"]
|
||||
mock_result.fetchall.return_value = [(1,)]
|
||||
mock_session.execute.return_value = mock_result
|
||||
|
||||
mock_session_cls = MagicMock(return_value=mock_session)
|
||||
|
||||
with (
|
||||
patch("crewai_tools.tools.nl2sql.nl2sql_tool.create_engine"),
|
||||
patch(
|
||||
"crewai_tools.tools.nl2sql.nl2sql_tool.sessionmaker",
|
||||
return_value=mock_session_cls,
|
||||
),
|
||||
):
|
||||
tool.execute_sql("SELECT 1")
|
||||
|
||||
mock_session.commit.assert_not_called()
|
||||
|
||||
def test_write_with_dml_enabled_does_commit(self):
|
||||
tool = _make_tool(allow_dml=True)
|
||||
|
||||
mock_session = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_result.returns_rows = False
|
||||
mock_session.execute.return_value = mock_result
|
||||
|
||||
mock_session_cls = MagicMock(return_value=mock_session)
|
||||
|
||||
with (
|
||||
patch("crewai_tools.tools.nl2sql.nl2sql_tool.create_engine"),
|
||||
patch(
|
||||
"crewai_tools.tools.nl2sql.nl2sql_tool.sessionmaker",
|
||||
return_value=mock_session_cls,
|
||||
),
|
||||
):
|
||||
tool.execute_sql("INSERT INTO t VALUES (1)")
|
||||
|
||||
mock_session.commit.assert_called_once()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Environment-variable escape hatch
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestEnvVarEscapeHatch:
|
||||
def test_env_var_enables_dml(self):
|
||||
with patch.dict(os.environ, {"CREWAI_NL2SQL_ALLOW_DML": "true"}):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
assert tool.allow_dml is True
|
||||
|
||||
def test_env_var_case_insensitive(self):
|
||||
with patch.dict(os.environ, {"CREWAI_NL2SQL_ALLOW_DML": "TRUE"}):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
assert tool.allow_dml is True
|
||||
|
||||
def test_env_var_absent_keeps_default(self):
|
||||
env = {k: v for k, v in os.environ.items() if k != "CREWAI_NL2SQL_ALLOW_DML"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
assert tool.allow_dml is False
|
||||
|
||||
def test_env_var_false_does_not_enable_dml(self):
|
||||
with patch.dict(os.environ, {"CREWAI_NL2SQL_ALLOW_DML": "false"}):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
assert tool.allow_dml is False
|
||||
|
||||
def test_dml_write_blocked_without_env_var(self):
|
||||
env = {k: v for k, v in os.environ.items() if k != "CREWAI_NL2SQL_ALLOW_DML"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query("DROP TABLE sensitive_data")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _run() propagates ValueError from _validate_query
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRunValidation:
|
||||
def test_run_raises_on_blocked_query(self):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._run("DELETE FROM users")
|
||||
|
||||
def test_run_returns_results_for_select(self):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
result = tool._run("SELECT 1 AS n")
|
||||
assert result == [{"n": 1}]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Multi-statement / semicolon injection prevention
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSemicolonInjection:
|
||||
def test_multi_statement_blocked_in_read_only_mode(self):
|
||||
"""SELECT 1; DROP TABLE users must be rejected when allow_dml=False."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="multi-statement"):
|
||||
tool._validate_query("SELECT 1; DROP TABLE users")
|
||||
|
||||
def test_multi_statement_blocked_even_with_only_selects(self):
|
||||
"""Two SELECT statements are still rejected in read-only mode."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="multi-statement"):
|
||||
tool._validate_query("SELECT 1; SELECT 2")
|
||||
|
||||
def test_trailing_semicolon_allowed_single_statement(self):
|
||||
"""A single statement with a trailing semicolon should pass."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
# Should not raise — the part after the semicolon is empty
|
||||
tool._validate_query("SELECT 1;")
|
||||
|
||||
def test_multi_statement_allowed_when_dml_enabled(self):
|
||||
"""Multiple statements are permitted when allow_dml=True."""
|
||||
tool = _make_tool(allow_dml=True)
|
||||
# Should not raise
|
||||
tool._validate_query("SELECT 1; INSERT INTO t VALUES (1)")
|
||||
|
||||
def test_multi_statement_write_still_blocked_individually(self):
|
||||
"""Even with allow_dml=False, a single write statement is blocked."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query("DROP TABLE users")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Writable CTEs (WITH … DELETE/INSERT/UPDATE)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestWritableCTE:
|
||||
def test_writable_cte_delete_blocked_in_read_only(self):
|
||||
"""WITH d AS (DELETE FROM users RETURNING *) SELECT * FROM d — blocked."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query(
|
||||
"WITH deleted AS (DELETE FROM users RETURNING *) SELECT * FROM deleted"
|
||||
)
|
||||
|
||||
def test_writable_cte_insert_blocked_in_read_only(self):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query(
|
||||
"WITH ins AS (INSERT INTO t VALUES (1) RETURNING id) SELECT * FROM ins"
|
||||
)
|
||||
|
||||
def test_writable_cte_update_blocked_in_read_only(self):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query(
|
||||
"WITH upd AS (UPDATE t SET x=1 RETURNING id) SELECT * FROM upd"
|
||||
)
|
||||
|
||||
def test_writable_cte_allowed_when_dml_enabled(self):
|
||||
tool = _make_tool(allow_dml=True)
|
||||
# Should not raise
|
||||
tool._validate_query(
|
||||
"WITH deleted AS (DELETE FROM users RETURNING *) SELECT * FROM deleted"
|
||||
)
|
||||
|
||||
def test_plain_read_only_cte_still_allowed(self):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
# No write commands in the CTE body — must pass
|
||||
tool._validate_query("WITH cte AS (SELECT id FROM users) SELECT * FROM cte")
|
||||
|
||||
def test_cte_with_comment_column_not_false_positive(self):
|
||||
"""Column named 'comment' should NOT trigger writable CTE detection."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
# 'comment' is a column name, not a SQL command
|
||||
tool._validate_query(
|
||||
"WITH cte AS (SELECT comment FROM posts) SELECT * FROM cte"
|
||||
)
|
||||
|
||||
def test_cte_with_set_column_not_false_positive(self):
|
||||
"""Column named 'set' should NOT trigger writable CTE detection."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
tool._validate_query(
|
||||
"WITH cte AS (SELECT set, reset FROM config) SELECT * FROM cte"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# EXPLAIN ANALYZE executes the underlying query
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_cte_with_write_main_query_blocked(self):
|
||||
"""WITH cte AS (SELECT 1) DELETE FROM users — main query must be caught."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query(
|
||||
"WITH cte AS (SELECT 1) DELETE FROM users"
|
||||
)
|
||||
|
||||
def test_cte_with_write_main_query_allowed_with_dml(self):
|
||||
"""Main query write after CTE should pass when allow_dml=True."""
|
||||
tool = _make_tool(allow_dml=True)
|
||||
tool._validate_query(
|
||||
"WITH cte AS (SELECT id FROM users) INSERT INTO archive SELECT * FROM cte"
|
||||
)
|
||||
|
||||
def test_cte_with_newline_before_paren_blocked(self):
|
||||
"""AS followed by newline then ( should still detect writable CTE."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query(
|
||||
"WITH cte AS\n(DELETE FROM users RETURNING *) SELECT * FROM cte"
|
||||
)
|
||||
|
||||
def test_cte_with_tab_before_paren_blocked(self):
|
||||
"""AS followed by tab then ( should still detect writable CTE."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query(
|
||||
"WITH cte AS\t(DELETE FROM users RETURNING *) SELECT * FROM cte"
|
||||
)
|
||||
|
||||
|
||||
class TestExplainAnalyze:
|
||||
def test_explain_analyze_delete_blocked_in_read_only(self):
|
||||
"""EXPLAIN ANALYZE DELETE actually runs the delete — block it."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query("EXPLAIN ANALYZE DELETE FROM users")
|
||||
|
||||
def test_explain_analyse_delete_blocked_in_read_only(self):
|
||||
"""British spelling ANALYSE is also caught."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query("EXPLAIN ANALYSE DELETE FROM users")
|
||||
|
||||
def test_explain_analyze_drop_blocked_in_read_only(self):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query("EXPLAIN ANALYZE DROP TABLE users")
|
||||
|
||||
def test_explain_analyze_select_allowed_in_read_only(self):
|
||||
"""EXPLAIN ANALYZE on a SELECT is safe — must be permitted."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
tool._validate_query("EXPLAIN ANALYZE SELECT * FROM users")
|
||||
|
||||
def test_explain_without_analyze_allowed(self):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
tool._validate_query("EXPLAIN SELECT * FROM users")
|
||||
|
||||
def test_explain_analyze_delete_allowed_when_dml_enabled(self):
|
||||
tool = _make_tool(allow_dml=True)
|
||||
tool._validate_query("EXPLAIN ANALYZE DELETE FROM users")
|
||||
|
||||
def test_explain_paren_analyze_delete_blocked_in_read_only(self):
|
||||
"""EXPLAIN (ANALYZE) DELETE actually runs the delete — block it."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query("EXPLAIN (ANALYZE) DELETE FROM users")
|
||||
|
||||
def test_explain_paren_analyze_verbose_delete_blocked_in_read_only(self):
|
||||
"""EXPLAIN (ANALYZE, VERBOSE) DELETE actually runs the delete — block it."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query("EXPLAIN (ANALYZE, VERBOSE) DELETE FROM users")
|
||||
|
||||
def test_explain_paren_verbose_select_allowed_in_read_only(self):
|
||||
"""EXPLAIN (VERBOSE) SELECT is safe — no ANALYZE means no execution."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
tool._validate_query("EXPLAIN (VERBOSE) SELECT * FROM users")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Multi-statement commit covers ALL statements (not just the first)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMultiStatementCommit:
|
||||
def test_select_then_insert_triggers_commit(self):
|
||||
"""SELECT 1; INSERT … — commit must happen because INSERT is a write."""
|
||||
tool = _make_tool(allow_dml=True)
|
||||
|
||||
mock_session = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_result.returns_rows = False
|
||||
mock_session.execute.return_value = mock_result
|
||||
mock_session_cls = MagicMock(return_value=mock_session)
|
||||
|
||||
with (
|
||||
patch("crewai_tools.tools.nl2sql.nl2sql_tool.create_engine"),
|
||||
patch(
|
||||
"crewai_tools.tools.nl2sql.nl2sql_tool.sessionmaker",
|
||||
return_value=mock_session_cls,
|
||||
),
|
||||
):
|
||||
tool.execute_sql("SELECT 1; INSERT INTO t VALUES (1)")
|
||||
|
||||
mock_session.commit.assert_called_once()
|
||||
|
||||
def test_select_only_multi_statement_does_not_commit(self):
|
||||
"""Two SELECTs must not trigger a commit even when allow_dml=True."""
|
||||
tool = _make_tool(allow_dml=True)
|
||||
|
||||
mock_session = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_result.returns_rows = True
|
||||
mock_result.keys.return_value = ["v"]
|
||||
mock_result.fetchall.return_value = [(1,)]
|
||||
mock_session.execute.return_value = mock_result
|
||||
mock_session_cls = MagicMock(return_value=mock_session)
|
||||
|
||||
with (
|
||||
patch("crewai_tools.tools.nl2sql.nl2sql_tool.create_engine"),
|
||||
patch(
|
||||
"crewai_tools.tools.nl2sql.nl2sql_tool.sessionmaker",
|
||||
return_value=mock_session_cls,
|
||||
),
|
||||
):
|
||||
tool.execute_sql("SELECT 1; SELECT 2")
|
||||
|
||||
def test_writable_cte_triggers_commit(self):
|
||||
"""WITH d AS (DELETE ...) must trigger commit when allow_dml=True."""
|
||||
tool = _make_tool(allow_dml=True)
|
||||
|
||||
mock_session = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_result.returns_rows = True
|
||||
mock_result.keys.return_value = ["id"]
|
||||
mock_result.fetchall.return_value = [(1,)]
|
||||
mock_session.execute.return_value = mock_result
|
||||
mock_session_cls = MagicMock(return_value=mock_session)
|
||||
|
||||
with (
|
||||
patch("crewai_tools.tools.nl2sql.nl2sql_tool.create_engine"),
|
||||
patch(
|
||||
"crewai_tools.tools.nl2sql.nl2sql_tool.sessionmaker",
|
||||
return_value=mock_session_cls,
|
||||
),
|
||||
):
|
||||
tool.execute_sql(
|
||||
"WITH d AS (DELETE FROM users RETURNING *) SELECT * FROM d"
|
||||
)
|
||||
mock_session.commit.assert_called_once()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Extended _WRITE_COMMANDS coverage
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExtendedWriteCommands:
|
||||
@pytest.mark.parametrize(
|
||||
"stmt",
|
||||
[
|
||||
"UPSERT INTO t VALUES (1)",
|
||||
"LOAD DATA INFILE 'f.csv' INTO TABLE t",
|
||||
"COPY t FROM '/tmp/f.csv'",
|
||||
"VACUUM ANALYZE t",
|
||||
"ANALYZE t",
|
||||
"ANALYSE t",
|
||||
"REINDEX TABLE t",
|
||||
"CLUSTER t USING idx",
|
||||
"REFRESH MATERIALIZED VIEW v",
|
||||
"COMMENT ON TABLE t IS 'desc'",
|
||||
"SET search_path = myschema",
|
||||
"RESET search_path",
|
||||
],
|
||||
)
|
||||
def test_extended_write_commands_blocked_by_default(self, stmt: str):
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query(stmt)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# EXPLAIN ANALYZE VERBOSE handling
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExplainAnalyzeVerbose:
|
||||
def test_explain_analyze_verbose_select_allowed(self):
|
||||
"""EXPLAIN ANALYZE VERBOSE SELECT should be allowed (read-only)."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
tool._validate_query("EXPLAIN ANALYZE VERBOSE SELECT * FROM users")
|
||||
|
||||
def test_explain_analyze_verbose_delete_blocked(self):
|
||||
"""EXPLAIN ANALYZE VERBOSE DELETE should be blocked."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query("EXPLAIN ANALYZE VERBOSE DELETE FROM users")
|
||||
|
||||
def test_explain_verbose_select_allowed(self):
|
||||
"""EXPLAIN VERBOSE SELECT (no ANALYZE) should be allowed."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
tool._validate_query("EXPLAIN VERBOSE SELECT * FROM users")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CTE with string literal parens
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCTEStringLiteralParens:
|
||||
def test_cte_string_paren_does_not_bypass(self):
|
||||
"""Parens inside string literals should not confuse the paren walker."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query(
|
||||
"WITH cte AS (SELECT '(' FROM t) DELETE FROM users"
|
||||
)
|
||||
|
||||
def test_cte_string_paren_read_only_allowed(self):
|
||||
"""Read-only CTE with string literal parens should be allowed."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
tool._validate_query(
|
||||
"WITH cte AS (SELECT '(' FROM t) SELECT * FROM cte"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# EXPLAIN ANALYZE commit logic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExplainAnalyzeCommit:
|
||||
def test_explain_analyze_delete_triggers_commit(self):
|
||||
"""EXPLAIN ANALYZE DELETE should trigger commit when allow_dml=True."""
|
||||
tool = _make_tool(allow_dml=True)
|
||||
|
||||
mock_session = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_result.returns_rows = True
|
||||
mock_result.keys.return_value = ["QUERY PLAN"]
|
||||
mock_result.fetchall.return_value = [("Delete on users",)]
|
||||
mock_session.execute.return_value = mock_result
|
||||
mock_session_cls = MagicMock(return_value=mock_session)
|
||||
|
||||
with (
|
||||
patch("crewai_tools.tools.nl2sql.nl2sql_tool.create_engine"),
|
||||
patch(
|
||||
"crewai_tools.tools.nl2sql.nl2sql_tool.sessionmaker",
|
||||
return_value=mock_session_cls,
|
||||
),
|
||||
):
|
||||
tool.execute_sql("EXPLAIN ANALYZE DELETE FROM users")
|
||||
mock_session.commit.assert_called_once()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AS( inside string literals must not confuse CTE detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCTEStringLiteralAS:
|
||||
def test_as_paren_inside_string_does_not_bypass(self):
|
||||
"""'AS (' inside a string literal must not be treated as a CTE body."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="read-only mode"):
|
||||
tool._validate_query(
|
||||
"WITH cte AS (SELECT 'AS (' FROM t) DELETE FROM users"
|
||||
)
|
||||
|
||||
def test_as_paren_inside_string_read_only_ok(self):
|
||||
"""Read-only CTE with 'AS (' in a string should be allowed."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
tool._validate_query(
|
||||
"WITH cte AS (SELECT 'AS (' FROM t) SELECT * FROM cte"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unknown command after CTE should be blocked
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCTEUnknownCommand:
|
||||
def test_unknown_command_after_cte_blocked(self):
|
||||
"""WITH cte AS (SELECT 1) FOOBAR should be blocked as unknown."""
|
||||
tool = _make_tool(allow_dml=False)
|
||||
with pytest.raises(ValueError, match="unrecognised"):
|
||||
tool._validate_query("WITH cte AS (SELECT 1) FOOBAR")
|
||||
@@ -14051,7 +14051,7 @@
|
||||
}
|
||||
},
|
||||
{
|
||||
"description": "Converts natural language to SQL queries and executes them.",
|
||||
"description": "Converts natural language to SQL queries and executes them against a database. Read-only by default \u2014 only SELECT/SHOW/DESCRIBE/EXPLAIN queries (and read-only CTEs) are allowed unless configured with allow_dml=True.",
|
||||
"env_vars": [],
|
||||
"humanized_name": "NL2SQLTool",
|
||||
"init_params_schema": {
|
||||
@@ -14092,7 +14092,14 @@
|
||||
"type": "object"
|
||||
}
|
||||
},
|
||||
"description": "Tool that converts natural language to SQL and executes it against a database.\n\nBy default the tool operates in **read-only mode**: only SELECT, SHOW,\nDESCRIBE, EXPLAIN, and read-only CTEs (WITH \u2026 SELECT) are permitted. Write\noperations (INSERT, UPDATE, DELETE, DROP, ALTER, CREATE, TRUNCATE, \u2026) are\nblocked unless ``allow_dml=True`` is set explicitly or the environment\nvariable ``CREWAI_NL2SQL_ALLOW_DML=true`` is present.\n\nWritable CTEs (``WITH d AS (DELETE \u2026) SELECT \u2026``) and\n``EXPLAIN ANALYZE <write-stmt>`` are treated as write operations and are\nblocked in read-only mode.\n\nThe ``_fetch_all_available_columns`` helper uses parameterised queries so\nthat table names coming from the database catalogue cannot be used as an\ninjection vector.",
|
||||
"properties": {
|
||||
"allow_dml": {
|
||||
"default": false,
|
||||
"description": "When False (default) only read statements are permitted. Set to True to allow INSERT/UPDATE/DELETE/DROP and other write operations.",
|
||||
"title": "Allow DML",
|
||||
"type": "boolean"
|
||||
},
|
||||
"columns": {
|
||||
"additionalProperties": {
|
||||
"anyOf": [
|
||||
|
||||
@@ -55,7 +55,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
|
||||
|
||||
[project.optional-dependencies]
|
||||
tools = [
|
||||
"crewai-tools==1.14.2a1",
|
||||
"crewai-tools==1.14.2a2",
|
||||
]
|
||||
embeddings = [
|
||||
"tiktoken~=0.8.0"
|
||||
@@ -74,8 +74,8 @@ qdrant = [
|
||||
"qdrant-client[fastembed]~=1.14.3",
|
||||
]
|
||||
aws = [
|
||||
"boto3~=1.40.38",
|
||||
"aiobotocore~=2.25.2",
|
||||
"boto3~=1.42.79",
|
||||
"aiobotocore~=3.4.0",
|
||||
]
|
||||
watson = [
|
||||
"ibm-watsonx-ai~=1.3.39",
|
||||
@@ -87,7 +87,7 @@ litellm = [
|
||||
"litellm~=1.83.0",
|
||||
]
|
||||
bedrock = [
|
||||
"boto3~=1.40.45",
|
||||
"boto3~=1.42.79",
|
||||
]
|
||||
google-genai = [
|
||||
"google-genai~=1.65.0",
|
||||
|
||||
@@ -46,7 +46,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
|
||||
|
||||
_suppress_pydantic_deprecation_warnings()
|
||||
|
||||
__version__ = "1.14.2a1"
|
||||
__version__ = "1.14.2a2"
|
||||
_telemetry_submitted = False
|
||||
|
||||
|
||||
|
||||
@@ -51,7 +51,6 @@ from crewai.utilities.string_utils import interpolate_only
|
||||
if TYPE_CHECKING:
|
||||
from crewai.context import ExecutionContext
|
||||
from crewai.crew import Crew
|
||||
from crewai.state.provider.core import BaseProvider
|
||||
|
||||
|
||||
def _validate_crew_ref(value: Any) -> Any:
|
||||
@@ -338,19 +337,16 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
|
||||
execution_context: ExecutionContext | None = Field(default=None)
|
||||
|
||||
@classmethod
|
||||
def from_checkpoint(
|
||||
cls, path: str, *, provider: BaseProvider | None = None
|
||||
) -> Self:
|
||||
"""Restore an Agent from a checkpoint file."""
|
||||
def from_checkpoint(cls, config: CheckpointConfig) -> Self:
|
||||
"""Restore an Agent from a checkpoint.
|
||||
|
||||
Args:
|
||||
config: Checkpoint configuration with ``restore_from`` set.
|
||||
"""
|
||||
from crewai.context import apply_execution_context
|
||||
from crewai.state.provider.json_provider import JsonProvider
|
||||
from crewai.state.runtime import RuntimeState
|
||||
|
||||
state = RuntimeState.from_checkpoint(
|
||||
path,
|
||||
provider=provider or JsonProvider(),
|
||||
context={"from_checkpoint": True},
|
||||
)
|
||||
state = RuntimeState.from_checkpoint(config, context={"from_checkpoint": True})
|
||||
for entity in state.root:
|
||||
if isinstance(entity, cls):
|
||||
if entity.execution_context is not None:
|
||||
@@ -359,7 +355,9 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
|
||||
entity.agent_executor.agent = entity
|
||||
entity.agent_executor._resuming = True
|
||||
return entity
|
||||
raise ValueError(f"No {cls.__name__} found in checkpoint: {path}")
|
||||
raise ValueError(
|
||||
f"No {cls.__name__} found in checkpoint: {config.restore_from}"
|
||||
)
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
|
||||
@@ -6,12 +6,16 @@ from datetime import datetime
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
from typing import Any
|
||||
|
||||
import click
|
||||
|
||||
|
||||
_PLACEHOLDER_RE = re.compile(r"\{([A-Za-z_][A-Za-z0-9_\-]*)}")
|
||||
|
||||
|
||||
_SQLITE_MAGIC = b"SQLite format 3\x00"
|
||||
|
||||
_SELECT_ALL = """
|
||||
@@ -34,6 +38,25 @@ LIMIT 1
|
||||
"""
|
||||
|
||||
|
||||
_DEFAULT_DIR = "./.checkpoints"
|
||||
_DEFAULT_DB = "./.checkpoints.db"
|
||||
|
||||
|
||||
def _detect_location(location: str) -> str:
|
||||
"""Resolve the default checkpoint location.
|
||||
|
||||
When the caller passes the default directory path, check whether a
|
||||
SQLite database exists at the conventional ``.db`` path and prefer it.
|
||||
"""
|
||||
if (
|
||||
location == _DEFAULT_DIR
|
||||
and not os.path.exists(_DEFAULT_DIR)
|
||||
and os.path.exists(_DEFAULT_DB)
|
||||
):
|
||||
return _DEFAULT_DB
|
||||
return location
|
||||
|
||||
|
||||
def _is_sqlite(path: str) -> bool:
|
||||
"""Check if a file is a SQLite database by reading its magic bytes."""
|
||||
if not os.path.isfile(path):
|
||||
@@ -52,13 +75,7 @@ def _parse_checkpoint_json(raw: str, source: str) -> dict[str, Any]:
|
||||
nodes = data.get("event_record", {}).get("nodes", {})
|
||||
event_count = len(nodes)
|
||||
|
||||
trigger_event = None
|
||||
if nodes:
|
||||
last_node = max(
|
||||
nodes.values(),
|
||||
key=lambda n: n.get("event", {}).get("emission_sequence") or 0,
|
||||
)
|
||||
trigger_event = last_node.get("event", {}).get("type")
|
||||
trigger_event = data.get("trigger")
|
||||
|
||||
parsed_entities: list[dict[str, Any]] = []
|
||||
for entity in entities:
|
||||
@@ -76,16 +93,47 @@ def _parse_checkpoint_json(raw: str, source: str) -> dict[str, Any]:
|
||||
{
|
||||
"description": t.get("description", ""),
|
||||
"completed": t.get("output") is not None,
|
||||
"output": (t.get("output") or {}).get("raw", ""),
|
||||
}
|
||||
for t in tasks
|
||||
]
|
||||
parsed_entities.append(info)
|
||||
|
||||
inputs: dict[str, Any] = {}
|
||||
for entity in entities:
|
||||
cp_inputs = entity.get("checkpoint_inputs")
|
||||
if isinstance(cp_inputs, dict) and cp_inputs:
|
||||
inputs = dict(cp_inputs)
|
||||
break
|
||||
|
||||
for entity in entities:
|
||||
for task in entity.get("tasks", []):
|
||||
for field in (
|
||||
"checkpoint_original_description",
|
||||
"checkpoint_original_expected_output",
|
||||
):
|
||||
text = task.get(field) or ""
|
||||
for match in _PLACEHOLDER_RE.findall(text):
|
||||
if match not in inputs:
|
||||
inputs[match] = ""
|
||||
for agent in entity.get("agents", []):
|
||||
for field in ("role", "goal", "backstory"):
|
||||
text = agent.get(field) or ""
|
||||
for match in _PLACEHOLDER_RE.findall(text):
|
||||
if match not in inputs:
|
||||
inputs[match] = ""
|
||||
|
||||
branch = data.get("branch", "main")
|
||||
parent_id = data.get("parent_id")
|
||||
|
||||
return {
|
||||
"source": source,
|
||||
"event_count": event_count,
|
||||
"trigger": trigger_event,
|
||||
"entities": parsed_entities,
|
||||
"branch": branch,
|
||||
"parent_id": parent_id,
|
||||
"inputs": inputs,
|
||||
}
|
||||
|
||||
|
||||
@@ -189,6 +237,7 @@ def _list_sqlite(db_path: str) -> list[dict[str, Any]]:
|
||||
"entities": [],
|
||||
"source": checkpoint_id,
|
||||
}
|
||||
meta["db"] = db_path
|
||||
results.append(meta)
|
||||
return results
|
||||
|
||||
@@ -311,6 +360,10 @@ def _print_info(meta: dict[str, Any]) -> None:
|
||||
trigger = meta.get("trigger")
|
||||
if trigger:
|
||||
click.echo(f"Trigger: {trigger}")
|
||||
click.echo(f"Branch: {meta.get('branch', 'main')}")
|
||||
parent_id = meta.get("parent_id")
|
||||
if parent_id:
|
||||
click.echo(f"Parent: {parent_id}")
|
||||
|
||||
for ent in meta.get("entities", []):
|
||||
eid = str(ent.get("id", ""))[:8]
|
||||
|
||||
@@ -2,17 +2,23 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from typing import Any, ClassVar
|
||||
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.binding import Binding
|
||||
from textual.containers import Horizontal, Vertical
|
||||
from textual.screen import ModalScreen
|
||||
from textual.widgets import Button, Footer, Header, OptionList, Static
|
||||
from textual.widgets.option_list import Option
|
||||
from textual.containers import Horizontal, Vertical, VerticalScroll
|
||||
from textual.widgets import (
|
||||
Button,
|
||||
Footer,
|
||||
Header,
|
||||
Input,
|
||||
Static,
|
||||
TextArea,
|
||||
Tree,
|
||||
)
|
||||
|
||||
from crewai.cli.checkpoint_cli import (
|
||||
_entity_summary,
|
||||
_format_size,
|
||||
_is_sqlite,
|
||||
_list_json,
|
||||
@@ -34,151 +40,54 @@ def _load_entries(location: str) -> list[dict[str, Any]]:
|
||||
return _list_json(location)
|
||||
|
||||
|
||||
def _format_list_label(entry: dict[str, Any]) -> str:
|
||||
"""Format a checkpoint entry for the list panel."""
|
||||
name = entry.get("name", "")
|
||||
ts = entry.get("ts") or ""
|
||||
trigger = entry.get("trigger") or ""
|
||||
summary = _entity_summary(entry.get("entities", []))
|
||||
|
||||
line1 = f"[bold]{name}[/]"
|
||||
parts = []
|
||||
if ts:
|
||||
parts.append(f"[dim]{ts}[/]")
|
||||
if "size" in entry:
|
||||
parts.append(f"[dim]{_format_size(entry['size'])}[/]")
|
||||
if trigger:
|
||||
parts.append(f"[{_PRIMARY}]{trigger}[/]")
|
||||
line2 = " ".join(parts)
|
||||
line3 = f" [{_DIM}]{summary}[/]"
|
||||
|
||||
return f"{line1}\n{line2}\n{line3}"
|
||||
def _short_id(name: str) -> str:
|
||||
"""Shorten a checkpoint name for tree display."""
|
||||
if len(name) > 30:
|
||||
return name[:27] + "..."
|
||||
return name
|
||||
|
||||
|
||||
def _format_detail(entry: dict[str, Any]) -> str:
|
||||
"""Format checkpoint details for the right panel."""
|
||||
def _entry_id(entry: dict[str, Any]) -> str:
|
||||
"""Normalize an entry's name into its checkpoint ID.
|
||||
|
||||
JSON filenames are ``{ts}_{uuid}_p-{parent}.json``; SQLite IDs
|
||||
are already ``{ts}_{uuid}``. This strips the JSON suffix so
|
||||
fork-parent lookups work in both providers.
|
||||
"""
|
||||
name = str(entry.get("name", ""))
|
||||
if name.endswith(".json"):
|
||||
name = name[: -len(".json")]
|
||||
idx = name.find("_p-")
|
||||
if idx != -1:
|
||||
name = name[:idx]
|
||||
return name
|
||||
|
||||
|
||||
def _build_entity_header(ent: dict[str, Any]) -> str:
|
||||
"""Build rich text header for an entity (progress bar only)."""
|
||||
lines: list[str] = []
|
||||
|
||||
# Header
|
||||
name = entry.get("name", "")
|
||||
lines.append(f"[bold {_PRIMARY}]{name}[/]")
|
||||
lines.append(f"[{_DIM}]{'─' * 50}[/]")
|
||||
lines.append("")
|
||||
|
||||
# Metadata table
|
||||
ts = entry.get("ts") or "unknown"
|
||||
trigger = entry.get("trigger") or ""
|
||||
lines.append(f" [bold]Time[/] {ts}")
|
||||
if "size" in entry:
|
||||
lines.append(f" [bold]Size[/] {_format_size(entry['size'])}")
|
||||
lines.append(f" [bold]Events[/] {entry.get('event_count', 0)}")
|
||||
if trigger:
|
||||
lines.append(f" [bold]Trigger[/] [{_PRIMARY}]{trigger}[/]")
|
||||
if "path" in entry:
|
||||
lines.append(f" [bold]Path[/] [{_DIM}]{entry['path']}[/]")
|
||||
if "db" in entry:
|
||||
lines.append(f" [bold]Database[/] [{_DIM}]{entry['db']}[/]")
|
||||
|
||||
# Entities
|
||||
for ent in entry.get("entities", []):
|
||||
eid = str(ent.get("id", ""))[:8]
|
||||
etype = ent.get("type", "unknown")
|
||||
ename = ent.get("name", "unnamed")
|
||||
|
||||
lines.append("")
|
||||
lines.append(f" [{_DIM}]{'─' * 50}[/]")
|
||||
lines.append(f" [bold {_SECONDARY}]{etype}[/]: {ename} [{_DIM}]{eid}[/]")
|
||||
|
||||
tasks = ent.get("tasks")
|
||||
if isinstance(tasks, list):
|
||||
completed = ent.get("tasks_completed", 0)
|
||||
total = ent.get("tasks_total", 0)
|
||||
pct = int(completed / total * 100) if total else 0
|
||||
bar_len = 20
|
||||
filled = int(bar_len * completed / total) if total else 0
|
||||
bar = f"[{_PRIMARY}]{'█' * filled}[/][{_DIM}]{'░' * (bar_len - filled)}[/]"
|
||||
|
||||
lines.append(f" {bar} {completed}/{total} tasks ({pct}%)")
|
||||
lines.append("")
|
||||
|
||||
for i, task in enumerate(tasks):
|
||||
if task.get("completed"):
|
||||
icon = "[green]✓[/]"
|
||||
else:
|
||||
icon = "[yellow]○[/]"
|
||||
desc = str(task.get("description", ""))
|
||||
if len(desc) > 55:
|
||||
desc = desc[:52] + "..."
|
||||
lines.append(f" {icon} {i + 1}. {desc}")
|
||||
|
||||
tasks = ent.get("tasks")
|
||||
if isinstance(tasks, list):
|
||||
completed = ent.get("tasks_completed", 0)
|
||||
total = ent.get("tasks_total", 0)
|
||||
pct = int(completed / total * 100) if total else 0
|
||||
bar_len = 20
|
||||
filled = int(bar_len * completed / total) if total else 0
|
||||
bar = f"[{_PRIMARY}]{'█' * filled}[/][{_DIM}]{'░' * (bar_len - filled)}[/]"
|
||||
lines.append(f"{bar} {completed}/{total} tasks ({pct}%)")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class ConfirmResumeScreen(ModalScreen[bool]):
|
||||
"""Modal confirmation before resuming from a checkpoint."""
|
||||
|
||||
CSS = f"""
|
||||
ConfirmResumeScreen {{
|
||||
align: center middle;
|
||||
}}
|
||||
#confirm-dialog {{
|
||||
width: 60;
|
||||
height: auto;
|
||||
padding: 1 2;
|
||||
background: {_BG_PANEL};
|
||||
border: round {_PRIMARY};
|
||||
}}
|
||||
#confirm-label {{
|
||||
width: 100%;
|
||||
content-align: center middle;
|
||||
margin-bottom: 1;
|
||||
}}
|
||||
#confirm-name {{
|
||||
width: 100%;
|
||||
content-align: center middle;
|
||||
color: {_PRIMARY};
|
||||
text-style: bold;
|
||||
margin-bottom: 1;
|
||||
}}
|
||||
#confirm-buttons {{
|
||||
width: 100%;
|
||||
height: 3;
|
||||
layout: horizontal;
|
||||
align: center middle;
|
||||
}}
|
||||
Button {{
|
||||
margin: 0 2;
|
||||
min-width: 12;
|
||||
}}
|
||||
"""
|
||||
|
||||
def __init__(self, checkpoint_name: str) -> None:
|
||||
super().__init__()
|
||||
self._checkpoint_name = checkpoint_name
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
with Vertical(id="confirm-dialog"):
|
||||
yield Static("Resume from this checkpoint?", id="confirm-label")
|
||||
yield Static(self._checkpoint_name, id="confirm-name")
|
||||
with Horizontal(id="confirm-buttons"):
|
||||
yield Button("Resume", variant="success", id="btn-yes")
|
||||
yield Button("Cancel", variant="default", id="btn-no")
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
self.dismiss(event.button.id == "btn-yes")
|
||||
|
||||
def on_key(self, event: Any) -> None:
|
||||
if event.key == "y":
|
||||
self.dismiss(True)
|
||||
elif event.key in ("n", "escape"):
|
||||
self.dismiss(False)
|
||||
# Return type: (location, action, inputs, task_output_overrides)
|
||||
_TuiResult = tuple[str, str, dict[str, Any] | None, dict[int, str] | None] | None
|
||||
|
||||
|
||||
class CheckpointTUI(App[str | None]):
|
||||
class CheckpointTUI(App[_TuiResult]):
|
||||
"""TUI to browse and inspect checkpoints.
|
||||
|
||||
Returns the checkpoint location string to resume from, or None if
|
||||
the user quit without selecting.
|
||||
Returns ``(location, action, inputs)`` where action is ``"resume"`` or
|
||||
``"fork"`` and inputs is a parsed dict or ``None``,
|
||||
or ``None`` if the user quit without selecting.
|
||||
"""
|
||||
|
||||
TITLE = "CrewAI Checkpoints"
|
||||
@@ -199,145 +108,431 @@ class CheckpointTUI(App[str | None]):
|
||||
background: {_PRIMARY};
|
||||
color: {_TERTIARY};
|
||||
}}
|
||||
Horizontal {{
|
||||
#main-layout {{
|
||||
height: 1fr;
|
||||
}}
|
||||
#cp-list {{
|
||||
width: 38%;
|
||||
#tree-panel {{
|
||||
width: 45%;
|
||||
background: {_BG_PANEL};
|
||||
border: round {_SECONDARY};
|
||||
padding: 0 1;
|
||||
scrollbar-color: {_PRIMARY};
|
||||
}}
|
||||
#cp-list:focus {{
|
||||
#tree-panel:focus-within {{
|
||||
border: round {_PRIMARY};
|
||||
}}
|
||||
#cp-list > .option-list--option-highlighted {{
|
||||
background: {_SECONDARY};
|
||||
color: {_TERTIARY};
|
||||
text-style: none;
|
||||
}}
|
||||
#cp-list > .option-list--option-highlighted * {{
|
||||
color: {_TERTIARY};
|
||||
}}
|
||||
#detail-container {{
|
||||
width: 62%;
|
||||
padding: 0 1;
|
||||
width: 55%;
|
||||
height: 1fr;
|
||||
}}
|
||||
#detail {{
|
||||
#detail-scroll {{
|
||||
height: 1fr;
|
||||
background: {_BG_PANEL};
|
||||
border: round {_SECONDARY};
|
||||
padding: 1 2;
|
||||
overflow-y: auto;
|
||||
scrollbar-color: {_PRIMARY};
|
||||
}}
|
||||
#detail:focus {{
|
||||
#detail-scroll:focus-within {{
|
||||
border: round {_PRIMARY};
|
||||
}}
|
||||
#detail-header {{
|
||||
margin-bottom: 1;
|
||||
}}
|
||||
#status {{
|
||||
height: 1;
|
||||
padding: 0 2;
|
||||
color: {_DIM};
|
||||
}}
|
||||
#inputs-section {{
|
||||
display: none;
|
||||
height: auto;
|
||||
max-height: 8;
|
||||
padding: 0 1;
|
||||
}}
|
||||
#inputs-section.visible {{
|
||||
display: block;
|
||||
}}
|
||||
#inputs-label {{
|
||||
height: 1;
|
||||
color: {_DIM};
|
||||
padding: 0 1;
|
||||
}}
|
||||
.input-row {{
|
||||
height: 3;
|
||||
padding: 0 1;
|
||||
}}
|
||||
.input-row Static {{
|
||||
width: auto;
|
||||
min-width: 12;
|
||||
padding: 1 1 0 0;
|
||||
color: {_TERTIARY};
|
||||
}}
|
||||
.input-row Input {{
|
||||
width: 1fr;
|
||||
}}
|
||||
#no-inputs-label {{
|
||||
height: 1;
|
||||
color: {_DIM};
|
||||
padding: 0 1;
|
||||
}}
|
||||
#action-buttons {{
|
||||
height: 3;
|
||||
align: right middle;
|
||||
padding: 0 1;
|
||||
display: none;
|
||||
}}
|
||||
#action-buttons.visible {{
|
||||
display: block;
|
||||
}}
|
||||
#action-buttons Button {{
|
||||
margin: 0 0 0 1;
|
||||
min-width: 10;
|
||||
}}
|
||||
#btn-resume {{
|
||||
background: {_SECONDARY};
|
||||
color: {_TERTIARY};
|
||||
}}
|
||||
#btn-resume:hover {{
|
||||
background: {_PRIMARY};
|
||||
}}
|
||||
#btn-fork {{
|
||||
background: {_PRIMARY};
|
||||
color: {_TERTIARY};
|
||||
}}
|
||||
#btn-fork:hover {{
|
||||
background: {_SECONDARY};
|
||||
}}
|
||||
.entity-title {{
|
||||
padding: 1 1 0 1;
|
||||
}}
|
||||
.entity-detail {{
|
||||
padding: 0 1;
|
||||
}}
|
||||
.task-output-editor {{
|
||||
height: auto;
|
||||
max-height: 10;
|
||||
margin: 0 1 1 1;
|
||||
border: round {_DIM};
|
||||
}}
|
||||
.task-output-editor:focus {{
|
||||
border: round {_PRIMARY};
|
||||
}}
|
||||
.task-label {{
|
||||
padding: 0 1;
|
||||
}}
|
||||
Tree {{
|
||||
background: {_BG_PANEL};
|
||||
}}
|
||||
Tree > .tree--cursor {{
|
||||
background: {_SECONDARY};
|
||||
color: {_TERTIARY};
|
||||
}}
|
||||
"""
|
||||
|
||||
BINDINGS: ClassVar[list[Binding | tuple[str, str] | tuple[str, str, str]]] = [
|
||||
("q", "quit", "Quit"),
|
||||
("r", "refresh", "Refresh"),
|
||||
("j", "cursor_down", "Down"),
|
||||
("k", "cursor_up", "Up"),
|
||||
]
|
||||
|
||||
def __init__(self, location: str = "./.checkpoints") -> None:
|
||||
super().__init__()
|
||||
self._location = location
|
||||
self._entries: list[dict[str, Any]] = []
|
||||
self._selected_idx: int = 0
|
||||
self._pending_location: str = ""
|
||||
self._selected_entry: dict[str, Any] | None = None
|
||||
self._input_keys: list[str] = []
|
||||
self._task_output_ids: list[tuple[int, str, str]] = []
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header(show_clock=False)
|
||||
with Horizontal():
|
||||
yield OptionList(id="cp-list")
|
||||
with Horizontal(id="main-layout"):
|
||||
tree: Tree[dict[str, Any]] = Tree("Checkpoints", id="tree-panel")
|
||||
tree.show_root = True
|
||||
tree.guide_depth = 3
|
||||
yield tree
|
||||
with Vertical(id="detail-container"):
|
||||
yield Static("", id="status")
|
||||
yield Static(
|
||||
f"\n [{_DIM}]Select a checkpoint from the list[/]", # noqa: S608
|
||||
id="detail",
|
||||
)
|
||||
with VerticalScroll(id="detail-scroll"):
|
||||
yield Static(
|
||||
f"[{_DIM}]Select a checkpoint from the tree[/]", # noqa: S608
|
||||
id="detail-header",
|
||||
)
|
||||
with Vertical(id="inputs-section"):
|
||||
yield Static("Inputs", id="inputs-label")
|
||||
with Horizontal(id="action-buttons"):
|
||||
yield Button("Resume", id="btn-resume")
|
||||
yield Button("Fork", id="btn-fork")
|
||||
yield Footer()
|
||||
|
||||
async def on_mount(self) -> None:
|
||||
self.query_one("#cp-list", OptionList).border_title = "Checkpoints"
|
||||
self.query_one("#detail", Static).border_title = "Detail"
|
||||
self._refresh_list()
|
||||
self._refresh_tree()
|
||||
self.query_one("#tree-panel", Tree).root.expand()
|
||||
|
||||
def _refresh_list(self) -> None:
|
||||
def _refresh_tree(self) -> None:
|
||||
self._entries = _load_entries(self._location)
|
||||
option_list = self.query_one("#cp-list", OptionList)
|
||||
option_list.clear_options()
|
||||
self._selected_entry = None
|
||||
|
||||
tree = self.query_one("#tree-panel", Tree)
|
||||
tree.clear()
|
||||
|
||||
if not self._entries:
|
||||
self.query_one("#detail", Static).update(
|
||||
f"\n [{_DIM}]No checkpoints in {self._location}[/]"
|
||||
self.query_one("#detail-header", Static).update(
|
||||
f"[{_DIM}]No checkpoints in {self._location}[/]"
|
||||
)
|
||||
self.query_one("#status", Static).update("")
|
||||
self.sub_title = self._location
|
||||
return
|
||||
|
||||
# Group by branch
|
||||
branches: dict[str, list[dict[str, Any]]] = defaultdict(list)
|
||||
for entry in self._entries:
|
||||
option_list.add_option(Option(_format_list_label(entry)))
|
||||
branch = entry.get("branch", "main")
|
||||
branches[branch].append(entry)
|
||||
|
||||
# Index checkpoint names to tree nodes so forks can attach
|
||||
node_by_name: dict[str, Any] = {}
|
||||
|
||||
def _make_label(e: dict[str, Any]) -> str:
|
||||
name = e.get("name", "")
|
||||
ts = e.get("ts") or ""
|
||||
trigger = e.get("trigger") or ""
|
||||
parts = [f"[bold]{_short_id(name)}[/]"]
|
||||
if ts:
|
||||
time_part = ts.split(" ")[-1] if " " in ts else ts
|
||||
parts.append(f"[{_DIM}]{time_part}[/]")
|
||||
if trigger:
|
||||
parts.append(f"[{_PRIMARY}]{trigger}[/]")
|
||||
return " ".join(parts)
|
||||
|
||||
fork_parents: set[str] = set()
|
||||
for branch_name, entries in branches.items():
|
||||
if branch_name == "main" or not entries:
|
||||
continue
|
||||
oldest = min(entries, key=lambda e: str(e.get("name", "")))
|
||||
first_parent = oldest.get("parent_id")
|
||||
if first_parent:
|
||||
fork_parents.add(str(first_parent))
|
||||
|
||||
def _add_checkpoint(parent_node: Any, e: dict[str, Any]) -> None:
|
||||
"""Add a checkpoint node — expandable only if a fork attaches to it."""
|
||||
cp_id = _entry_id(e)
|
||||
if cp_id in fork_parents:
|
||||
node = parent_node.add(
|
||||
_make_label(e), data=e, expand=False, allow_expand=True
|
||||
)
|
||||
else:
|
||||
node = parent_node.add_leaf(_make_label(e), data=e)
|
||||
node_by_name[cp_id] = node
|
||||
|
||||
if "main" in branches:
|
||||
for entry in reversed(branches["main"]):
|
||||
_add_checkpoint(tree.root, entry)
|
||||
|
||||
fork_branches = [
|
||||
(name, sorted(entries, key=lambda e: str(e.get("name", ""))))
|
||||
for name, entries in branches.items()
|
||||
if name != "main"
|
||||
]
|
||||
remaining = fork_branches
|
||||
max_passes = len(remaining) + 1
|
||||
while remaining and max_passes > 0:
|
||||
max_passes -= 1
|
||||
deferred = []
|
||||
made_progress = False
|
||||
for branch_name, entries in remaining:
|
||||
first_parent = entries[0].get("parent_id") if entries else None
|
||||
if first_parent and str(first_parent) not in node_by_name:
|
||||
deferred.append((branch_name, entries))
|
||||
continue
|
||||
attach_to: Any = tree.root
|
||||
if first_parent:
|
||||
attach_to = node_by_name.get(str(first_parent), tree.root)
|
||||
branch_label = (
|
||||
f"[bold {_SECONDARY}]{branch_name}[/] [{_DIM}]({len(entries)})[/]"
|
||||
)
|
||||
branch_node = attach_to.add(branch_label, expand=False)
|
||||
for entry in entries:
|
||||
_add_checkpoint(branch_node, entry)
|
||||
made_progress = True
|
||||
remaining = deferred
|
||||
if not made_progress:
|
||||
break
|
||||
|
||||
for branch_name, entries in remaining:
|
||||
branch_label = (
|
||||
f"[bold {_SECONDARY}]{branch_name}[/] "
|
||||
f"[{_DIM}]({len(entries)})[/] [{_DIM}](orphaned)[/]"
|
||||
)
|
||||
branch_node = tree.root.add(branch_label, expand=False)
|
||||
for entry in entries:
|
||||
_add_checkpoint(branch_node, entry)
|
||||
|
||||
count = len(self._entries)
|
||||
storage = "SQLite" if _is_sqlite(self._location) else "JSON"
|
||||
self.sub_title = f"{self._location}"
|
||||
self.sub_title = self._location
|
||||
self.query_one("#status", Static).update(f" {count} checkpoint(s) | {storage}")
|
||||
|
||||
async def on_option_list_option_highlighted(
|
||||
self,
|
||||
event: OptionList.OptionHighlighted,
|
||||
) -> None:
|
||||
idx = event.option_index
|
||||
if idx is None:
|
||||
return
|
||||
if idx < len(self._entries):
|
||||
self._selected_idx = idx
|
||||
entry = self._entries[idx]
|
||||
self.query_one("#detail", Static).update(_format_detail(entry))
|
||||
async def _show_detail(self, entry: dict[str, Any]) -> None:
|
||||
"""Update the detail panel for a checkpoint entry."""
|
||||
self._selected_entry = entry
|
||||
self.query_one("#action-buttons").add_class("visible")
|
||||
|
||||
def action_cursor_down(self) -> None:
|
||||
self.query_one("#cp-list", OptionList).action_cursor_down()
|
||||
detail_scroll = self.query_one("#detail-scroll", VerticalScroll)
|
||||
|
||||
def action_cursor_up(self) -> None:
|
||||
self.query_one("#cp-list", OptionList).action_cursor_up()
|
||||
# Remove all dynamic children except the header — await so IDs are freed
|
||||
to_remove = [c for c in detail_scroll.children if c.id != "detail-header"]
|
||||
for child in to_remove:
|
||||
await child.remove()
|
||||
|
||||
async def on_option_list_option_selected(
|
||||
self,
|
||||
event: OptionList.OptionSelected,
|
||||
) -> None:
|
||||
idx = event.option_index
|
||||
if idx is None or idx >= len(self._entries):
|
||||
return
|
||||
entry = self._entries[idx]
|
||||
# Header
|
||||
name = entry.get("name", "")
|
||||
ts = entry.get("ts") or "unknown"
|
||||
trigger = entry.get("trigger") or ""
|
||||
branch = entry.get("branch", "main")
|
||||
parent_id = entry.get("parent_id")
|
||||
|
||||
header_lines = [
|
||||
f"[bold {_PRIMARY}]{name}[/]",
|
||||
f"[{_DIM}]{'─' * 50}[/]",
|
||||
"",
|
||||
f" [bold]Time[/] {ts}",
|
||||
]
|
||||
if "size" in entry:
|
||||
header_lines.append(f" [bold]Size[/] {_format_size(entry['size'])}")
|
||||
header_lines.append(f" [bold]Events[/] {entry.get('event_count', 0)}")
|
||||
if trigger:
|
||||
header_lines.append(f" [bold]Trigger[/] [{_PRIMARY}]{trigger}[/]")
|
||||
header_lines.append(f" [bold]Branch[/] [{_SECONDARY}]{branch}[/]")
|
||||
if parent_id:
|
||||
header_lines.append(f" [bold]Parent[/] [{_DIM}]{parent_id}[/]")
|
||||
if "path" in entry:
|
||||
loc = entry["path"]
|
||||
elif _is_sqlite(self._location):
|
||||
loc = f"{self._location}#{entry['name']}"
|
||||
else:
|
||||
loc = entry.get("name", "")
|
||||
self._pending_location = loc
|
||||
name = entry.get("name", loc)
|
||||
self.push_screen(ConfirmResumeScreen(name), self._on_confirm)
|
||||
header_lines.append(f" [bold]Path[/] [{_DIM}]{entry['path']}[/]")
|
||||
if "db" in entry:
|
||||
header_lines.append(f" [bold]Database[/] [{_DIM}]{entry['db']}[/]")
|
||||
|
||||
def _on_confirm(self, confirmed: bool | None) -> None:
|
||||
if confirmed:
|
||||
self.exit(self._pending_location)
|
||||
else:
|
||||
self._pending_location = ""
|
||||
self.query_one("#detail-header", Static).update("\n".join(header_lines))
|
||||
|
||||
# Entity details and editable task outputs — mounted flat for scrolling
|
||||
self._task_output_ids = []
|
||||
flat_task_idx = 0
|
||||
for ent_idx, ent in enumerate(entry.get("entities", [])):
|
||||
etype = ent.get("type", "unknown")
|
||||
ename = ent.get("name", "unnamed")
|
||||
completed = ent.get("tasks_completed")
|
||||
total = ent.get("tasks_total")
|
||||
entity_title = f"[bold {_SECONDARY}]{etype}: {ename}[/]"
|
||||
if completed is not None and total is not None:
|
||||
entity_title += f" [{_DIM}]{completed}/{total} tasks[/]"
|
||||
await detail_scroll.mount(Static(entity_title, classes="entity-title"))
|
||||
await detail_scroll.mount(
|
||||
Static(_build_entity_header(ent), classes="entity-detail")
|
||||
)
|
||||
|
||||
tasks = ent.get("tasks", [])
|
||||
for i, task in enumerate(tasks):
|
||||
desc = str(task.get("description", ""))
|
||||
if len(desc) > 55:
|
||||
desc = desc[:52] + "..."
|
||||
if task.get("completed"):
|
||||
icon = "[green]✓[/]"
|
||||
await detail_scroll.mount(
|
||||
Static(f" {icon} {i + 1}. {desc}", classes="task-label")
|
||||
)
|
||||
output_text = task.get("output", "")
|
||||
editor_id = f"task-output-{ent_idx}-{i}"
|
||||
await detail_scroll.mount(
|
||||
TextArea(
|
||||
str(output_text),
|
||||
classes="task-output-editor",
|
||||
id=editor_id,
|
||||
)
|
||||
)
|
||||
self._task_output_ids.append(
|
||||
(flat_task_idx, editor_id, str(output_text))
|
||||
)
|
||||
else:
|
||||
icon = "[yellow]○[/]"
|
||||
await detail_scroll.mount(
|
||||
Static(f" {icon} {i + 1}. {desc}", classes="task-label")
|
||||
)
|
||||
flat_task_idx += 1
|
||||
|
||||
# Build input fields
|
||||
await self._build_input_fields(entry.get("inputs", {}))
|
||||
|
||||
async def _build_input_fields(self, inputs: dict[str, Any]) -> None:
|
||||
"""Rebuild the inputs section with one field per input key."""
|
||||
section = self.query_one("#inputs-section")
|
||||
|
||||
# Remove old dynamic children — await so IDs are freed
|
||||
for widget in list(section.query(".input-row, .no-inputs")):
|
||||
await widget.remove()
|
||||
|
||||
self._input_keys = []
|
||||
|
||||
if not inputs:
|
||||
await section.mount(Static(f"[{_DIM}]No inputs[/]", classes="no-inputs"))
|
||||
section.add_class("visible")
|
||||
return
|
||||
|
||||
for key, value in inputs.items():
|
||||
self._input_keys.append(key)
|
||||
row = Horizontal(classes="input-row")
|
||||
row.compose_add_child(Static(f"[bold]{key}[/]"))
|
||||
row.compose_add_child(
|
||||
Input(value=str(value), placeholder=key, id=f"input-{key}")
|
||||
)
|
||||
await section.mount(row)
|
||||
|
||||
section.add_class("visible")
|
||||
|
||||
def _collect_inputs(self) -> dict[str, Any] | None:
|
||||
"""Collect current values from input fields."""
|
||||
if not self._input_keys:
|
||||
return None
|
||||
result: dict[str, Any] = {}
|
||||
for key in self._input_keys:
|
||||
widget = self.query_one(f"#input-{key}", Input)
|
||||
result[key] = widget.value
|
||||
return result
|
||||
|
||||
def _collect_task_overrides(self) -> dict[int, str] | None:
|
||||
"""Collect edited task outputs. Returns only changed values."""
|
||||
if not self._task_output_ids or self._selected_entry is None:
|
||||
return None
|
||||
overrides: dict[int, str] = {}
|
||||
for task_idx, editor_id, original in self._task_output_ids:
|
||||
editor = self.query_one(f"#{editor_id}", TextArea)
|
||||
if editor.text != original:
|
||||
overrides[task_idx] = editor.text
|
||||
return overrides or None
|
||||
|
||||
def _resolve_location(self, entry: dict[str, Any]) -> str:
|
||||
"""Get the restore location string for a checkpoint entry."""
|
||||
if "path" in entry:
|
||||
return str(entry["path"])
|
||||
if _is_sqlite(self._location):
|
||||
return f"{self._location}#{entry['name']}"
|
||||
return str(entry.get("name", ""))
|
||||
|
||||
async def on_tree_node_highlighted(
|
||||
self, event: Tree.NodeHighlighted[dict[str, Any]]
|
||||
) -> None:
|
||||
if event.node.data is not None:
|
||||
await self._show_detail(event.node.data)
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
if self._selected_entry is None:
|
||||
return
|
||||
inputs = self._collect_inputs()
|
||||
overrides = self._collect_task_overrides()
|
||||
loc = self._resolve_location(self._selected_entry)
|
||||
if event.button.id == "btn-resume":
|
||||
self.exit((loc, "resume", inputs, overrides))
|
||||
elif event.button.id == "btn-fork":
|
||||
self.exit((loc, "fork", inputs, overrides))
|
||||
|
||||
def action_refresh(self) -> None:
|
||||
self._refresh_list()
|
||||
self._refresh_tree()
|
||||
|
||||
|
||||
async def _run_checkpoint_tui_async(location: str) -> None:
|
||||
@@ -345,17 +540,78 @@ async def _run_checkpoint_tui_async(location: str) -> None:
|
||||
import click
|
||||
|
||||
app = CheckpointTUI(location=location)
|
||||
selected = await app.run_async()
|
||||
selection = await app.run_async()
|
||||
|
||||
if selected is None:
|
||||
if selection is None:
|
||||
return
|
||||
|
||||
click.echo(f"\nResuming from: {selected}\n")
|
||||
selected, action, inputs, task_overrides = selection
|
||||
|
||||
from crewai.crew import Crew
|
||||
from crewai.state.checkpoint_config import CheckpointConfig
|
||||
|
||||
crew = Crew.from_checkpoint(selected)
|
||||
result = await crew.akickoff()
|
||||
config = CheckpointConfig(restore_from=selected)
|
||||
|
||||
if action == "fork":
|
||||
click.echo(f"\nForking from: {selected}\n")
|
||||
crew = Crew.fork(config)
|
||||
else:
|
||||
click.echo(f"\nResuming from: {selected}\n")
|
||||
crew = Crew.from_checkpoint(config)
|
||||
|
||||
if task_overrides:
|
||||
click.echo("Modifications:")
|
||||
overridden_agents: set[int] = set()
|
||||
for task_idx, new_output in task_overrides.items():
|
||||
if task_idx < len(crew.tasks) and crew.tasks[task_idx].output is not None:
|
||||
desc = crew.tasks[task_idx].description or f"Task {task_idx + 1}"
|
||||
if len(desc) > 60:
|
||||
desc = desc[:57] + "..."
|
||||
crew.tasks[task_idx].output.raw = new_output # type: ignore[union-attr]
|
||||
preview = new_output.replace("\n", " ")
|
||||
if len(preview) > 80:
|
||||
preview = preview[:77] + "..."
|
||||
click.echo(f" Task {task_idx + 1}: {desc}")
|
||||
click.echo(f" -> {preview}")
|
||||
agent = crew.tasks[task_idx].agent
|
||||
if agent and agent.agent_executor:
|
||||
nth = sum(1 for t in crew.tasks[:task_idx] if t.agent is agent)
|
||||
messages = agent.agent_executor.messages
|
||||
system_positions = [
|
||||
i for i, m in enumerate(messages) if m.get("role") == "system"
|
||||
]
|
||||
if nth < len(system_positions):
|
||||
seg_start = system_positions[nth]
|
||||
seg_end = (
|
||||
system_positions[nth + 1]
|
||||
if nth + 1 < len(system_positions)
|
||||
else len(messages)
|
||||
)
|
||||
for j in range(seg_end - 1, seg_start, -1):
|
||||
if messages[j].get("role") == "assistant":
|
||||
messages[j]["content"] = new_output
|
||||
break
|
||||
overridden_agents.add(id(agent))
|
||||
|
||||
earliest = min(task_overrides)
|
||||
for offset, subsequent in enumerate(
|
||||
crew.tasks[earliest + 1 :], start=earliest + 1
|
||||
):
|
||||
if subsequent.output and offset not in task_overrides:
|
||||
subsequent.output = None
|
||||
if subsequent.agent and subsequent.agent.agent_executor:
|
||||
subsequent.agent.agent_executor._resuming = False
|
||||
if id(subsequent.agent) not in overridden_agents:
|
||||
subsequent.agent.agent_executor.messages = []
|
||||
click.echo()
|
||||
|
||||
if inputs:
|
||||
click.echo("Inputs:")
|
||||
for k, v in inputs.items():
|
||||
click.echo(f" {k}: {v}")
|
||||
click.echo()
|
||||
|
||||
result = await crew.akickoff(inputs=inputs)
|
||||
click.echo(f"\nResult: {getattr(result, 'raw', result)}")
|
||||
|
||||
|
||||
|
||||
@@ -793,6 +793,9 @@ def traces_status() -> None:
|
||||
@click.pass_context
|
||||
def checkpoint(ctx: click.Context, location: str) -> None:
|
||||
"""Browse and inspect checkpoints. Launches a TUI when called without a subcommand."""
|
||||
from crewai.cli.checkpoint_cli import _detect_location
|
||||
|
||||
location = _detect_location(location)
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj["location"] = location
|
||||
if ctx.invoked_subcommand is None:
|
||||
@@ -805,18 +808,18 @@ def checkpoint(ctx: click.Context, location: str) -> None:
|
||||
@click.argument("location", default="./.checkpoints")
|
||||
def checkpoint_list(location: str) -> None:
|
||||
"""List checkpoints in a directory."""
|
||||
from crewai.cli.checkpoint_cli import list_checkpoints
|
||||
from crewai.cli.checkpoint_cli import _detect_location, list_checkpoints
|
||||
|
||||
list_checkpoints(location)
|
||||
list_checkpoints(_detect_location(location))
|
||||
|
||||
|
||||
@checkpoint.command("info")
|
||||
@click.argument("path", default="./.checkpoints")
|
||||
def checkpoint_info(path: str) -> None:
|
||||
"""Show details of a checkpoint. Pass a file or directory for latest."""
|
||||
from crewai.cli.checkpoint_cli import info_checkpoint
|
||||
from crewai.cli.checkpoint_cli import _detect_location, info_checkpoint
|
||||
|
||||
info_checkpoint(path)
|
||||
info_checkpoint(_detect_location(path))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -13,7 +13,6 @@ from packaging import version
|
||||
import tomli
|
||||
|
||||
from crewai.cli.utils import read_toml
|
||||
from crewai.cli.version import get_crewai_version
|
||||
from crewai.crew import Crew
|
||||
from crewai.llm import LLM
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
@@ -21,6 +20,7 @@ from crewai.types.crew_chat import ChatInputField, ChatInputs
|
||||
from crewai.utilities.llm_utils import create_llm
|
||||
from crewai.utilities.printer import PRINTER
|
||||
from crewai.utilities.types import LLMMessage
|
||||
from crewai.utilities.version import get_crewai_version
|
||||
|
||||
|
||||
MIN_REQUIRED_VERSION: Final[Literal["0.98.0"]] = "0.98.0"
|
||||
|
||||
@@ -7,7 +7,7 @@ from rich.console import Console
|
||||
from crewai.cli.authentication.main import Oauth2Settings, ProviderFactory
|
||||
from crewai.cli.command import BaseCommand
|
||||
from crewai.cli.settings.main import SettingsCommand
|
||||
from crewai.cli.version import get_crewai_version
|
||||
from crewai.utilities.version import get_crewai_version
|
||||
|
||||
|
||||
console = Console()
|
||||
|
||||
@@ -6,7 +6,7 @@ import httpx
|
||||
|
||||
from crewai.cli.config import Settings
|
||||
from crewai.cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
|
||||
from crewai.cli.version import get_crewai_version
|
||||
from crewai.utilities.version import get_crewai_version
|
||||
|
||||
|
||||
class PlusAPI:
|
||||
|
||||
@@ -5,7 +5,7 @@ import click
|
||||
from packaging import version
|
||||
|
||||
from crewai.cli.utils import build_env_with_all_tool_credentials, read_toml
|
||||
from crewai.cli.version import get_crewai_version
|
||||
from crewai.utilities.version import get_crewai_version
|
||||
|
||||
|
||||
class CrewType(Enum):
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.14.2a1"
|
||||
"crewai[tools]==1.14.2a2"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.14.2a1"
|
||||
"crewai[tools]==1.14.2a2"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.14.2a1"
|
||||
"crewai[tools]==1.14.2a2"
|
||||
]
|
||||
|
||||
[tool.crewai]
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
from collections.abc import Mapping
|
||||
from datetime import datetime, timedelta
|
||||
from functools import lru_cache
|
||||
import importlib.metadata
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
@@ -13,6 +12,8 @@ from urllib.error import URLError
|
||||
import appdirs
|
||||
from packaging.version import InvalidVersion, Version, parse
|
||||
|
||||
from crewai.utilities.version import get_crewai_version
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def _get_cache_file() -> Path:
|
||||
@@ -25,11 +26,6 @@ def _get_cache_file() -> Path:
|
||||
return cache_dir / "version_cache.json"
|
||||
|
||||
|
||||
def get_crewai_version() -> str:
|
||||
"""Get the version number of CrewAI running the CLI."""
|
||||
return importlib.metadata.version("crewai")
|
||||
|
||||
|
||||
def _is_cache_valid(cache_data: Mapping[str, Any]) -> bool:
|
||||
"""Check if the cache is still valid, less than 24 hours old."""
|
||||
if "timestamp" not in cache_data:
|
||||
|
||||
@@ -42,7 +42,6 @@ if TYPE_CHECKING:
|
||||
from opentelemetry.trace import Span
|
||||
|
||||
from crewai.context import ExecutionContext
|
||||
from crewai.state.provider.core import BaseProvider
|
||||
|
||||
try:
|
||||
from crewai_files import get_supported_content_types
|
||||
@@ -104,7 +103,11 @@ from crewai.rag.types import SearchResult
|
||||
from crewai.security.fingerprint import Fingerprint
|
||||
from crewai.security.security_config import SecurityConfig
|
||||
from crewai.skills.models import Skill
|
||||
from crewai.state.checkpoint_config import CheckpointConfig, _coerce_checkpoint
|
||||
from crewai.state.checkpoint_config import (
|
||||
CheckpointConfig,
|
||||
_coerce_checkpoint,
|
||||
apply_checkpoint,
|
||||
)
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.conditional_task import ConditionalTask
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
@@ -365,32 +368,21 @@ class Crew(FlowTrackable, BaseModel):
|
||||
checkpoint_kickoff_event_id: str | None = Field(default=None)
|
||||
|
||||
@classmethod
|
||||
def from_checkpoint(
|
||||
cls, path: str, *, provider: BaseProvider | None = None
|
||||
) -> Crew:
|
||||
"""Restore a Crew from a checkpoint file, ready to resume via kickoff().
|
||||
def from_checkpoint(cls, config: CheckpointConfig) -> Crew:
|
||||
"""Restore a Crew from a checkpoint, ready to resume via kickoff().
|
||||
|
||||
Args:
|
||||
path: Path to a checkpoint JSON file.
|
||||
provider: Storage backend to read from. Defaults to JsonProvider.
|
||||
config: Checkpoint configuration with ``restore_from`` set to
|
||||
the path of the checkpoint to load.
|
||||
|
||||
Returns:
|
||||
A Crew instance. Call kickoff() to resume from the last completed task.
|
||||
"""
|
||||
from crewai.context import apply_execution_context
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.state.provider.json_provider import JsonProvider
|
||||
from crewai.state.provider.utils import detect_provider
|
||||
from crewai.state.runtime import RuntimeState
|
||||
|
||||
if provider is None:
|
||||
provider = detect_provider(path)
|
||||
|
||||
state = RuntimeState.from_checkpoint(
|
||||
path,
|
||||
provider=provider or JsonProvider(),
|
||||
context={"from_checkpoint": True},
|
||||
)
|
||||
state = RuntimeState.from_checkpoint(config, context={"from_checkpoint": True})
|
||||
crewai_event_bus.set_runtime_state(state)
|
||||
for entity in state.root:
|
||||
if isinstance(entity, cls):
|
||||
@@ -398,7 +390,32 @@ class Crew(FlowTrackable, BaseModel):
|
||||
apply_execution_context(entity.execution_context)
|
||||
entity._restore_runtime()
|
||||
return entity
|
||||
raise ValueError(f"No Crew found in checkpoint: {path}")
|
||||
raise ValueError(f"No Crew found in checkpoint: {config.restore_from}")
|
||||
|
||||
@classmethod
|
||||
def fork(
|
||||
cls,
|
||||
config: CheckpointConfig,
|
||||
branch: str | None = None,
|
||||
) -> Crew:
|
||||
"""Fork a Crew from a checkpoint, creating a new execution branch.
|
||||
|
||||
Args:
|
||||
config: Checkpoint configuration with ``restore_from`` set.
|
||||
branch: Branch label for the fork. Auto-generated if not provided.
|
||||
|
||||
Returns:
|
||||
A Crew instance on the new branch. Call kickoff() to run.
|
||||
"""
|
||||
crew = cls.from_checkpoint(config)
|
||||
state = crewai_event_bus._runtime_state
|
||||
if state is None:
|
||||
raise RuntimeError(
|
||||
"Cannot fork: no runtime state on the event bus. "
|
||||
"Ensure from_checkpoint() succeeded before calling fork()."
|
||||
)
|
||||
state.fork(branch)
|
||||
return crew
|
||||
|
||||
def _restore_runtime(self) -> None:
|
||||
"""Re-create runtime objects after restoring from a checkpoint."""
|
||||
@@ -419,6 +436,13 @@ class Crew(FlowTrackable, BaseModel):
|
||||
if agent.agent_executor is not None and task.output is None:
|
||||
agent.agent_executor.task = task
|
||||
break
|
||||
for task in self.tasks:
|
||||
if task.checkpoint_original_description is not None:
|
||||
task._original_description = task.checkpoint_original_description
|
||||
if task.checkpoint_original_expected_output is not None:
|
||||
task._original_expected_output = (
|
||||
task.checkpoint_original_expected_output
|
||||
)
|
||||
if self.checkpoint_inputs is not None:
|
||||
self._inputs = self.checkpoint_inputs
|
||||
if self.checkpoint_kickoff_event_id is not None:
|
||||
@@ -854,16 +878,23 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
input_files: dict[str, FileInput] | None = None,
|
||||
from_checkpoint: CheckpointConfig | None = None,
|
||||
) -> CrewOutput | CrewStreamingOutput:
|
||||
"""Execute the crew's workflow.
|
||||
|
||||
Args:
|
||||
inputs: Optional input dictionary for task interpolation.
|
||||
input_files: Optional dict of named file inputs for the crew.
|
||||
from_checkpoint: Optional checkpoint config. If ``restore_from``
|
||||
is set, the crew resumes from that checkpoint. Remaining
|
||||
config fields enable checkpointing for the run.
|
||||
|
||||
Returns:
|
||||
CrewOutput or CrewStreamingOutput if streaming is enabled.
|
||||
"""
|
||||
restored = apply_checkpoint(self, from_checkpoint)
|
||||
if restored is not None:
|
||||
return restored.kickoff(inputs=inputs, input_files=input_files) # type: ignore[no-any-return]
|
||||
get_env_context()
|
||||
if self.stream:
|
||||
enable_agent_streaming(self.agents)
|
||||
@@ -976,12 +1007,15 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
input_files: dict[str, FileInput] | None = None,
|
||||
from_checkpoint: CheckpointConfig | None = None,
|
||||
) -> CrewOutput | CrewStreamingOutput:
|
||||
"""Asynchronous kickoff method to start the crew execution.
|
||||
|
||||
Args:
|
||||
inputs: Optional input dictionary for task interpolation.
|
||||
input_files: Optional dict of named file inputs for the crew.
|
||||
from_checkpoint: Optional checkpoint config. If ``restore_from``
|
||||
is set, the crew resumes from that checkpoint.
|
||||
|
||||
Returns:
|
||||
CrewOutput or CrewStreamingOutput if streaming is enabled.
|
||||
@@ -990,6 +1024,9 @@ class Crew(FlowTrackable, BaseModel):
|
||||
to get stream chunks. After iteration completes, access the final result
|
||||
via .result.
|
||||
"""
|
||||
restored = apply_checkpoint(self, from_checkpoint)
|
||||
if restored is not None:
|
||||
return await restored.kickoff_async(inputs=inputs, input_files=input_files) # type: ignore[no-any-return]
|
||||
inputs = inputs or {}
|
||||
|
||||
if self.stream:
|
||||
@@ -1050,6 +1087,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
input_files: dict[str, FileInput] | None = None,
|
||||
from_checkpoint: CheckpointConfig | None = None,
|
||||
) -> CrewOutput | CrewStreamingOutput:
|
||||
"""Native async kickoff method using async task execution throughout.
|
||||
|
||||
@@ -1060,10 +1098,15 @@ class Crew(FlowTrackable, BaseModel):
|
||||
Args:
|
||||
inputs: Optional input dictionary for task interpolation.
|
||||
input_files: Optional dict of named file inputs for the crew.
|
||||
from_checkpoint: Optional checkpoint config. If ``restore_from``
|
||||
is set, the crew resumes from that checkpoint.
|
||||
|
||||
Returns:
|
||||
CrewOutput or CrewStreamingOutput if streaming is enabled.
|
||||
"""
|
||||
restored = apply_checkpoint(self, from_checkpoint)
|
||||
if restored is not None:
|
||||
return await restored.akickoff(inputs=inputs, input_files=input_files) # type: ignore[no-any-return]
|
||||
if self.stream:
|
||||
enable_agent_streaming(self.agents)
|
||||
ctx = StreamingContext(use_async=True)
|
||||
|
||||
@@ -13,13 +13,13 @@ from crewai.cli.authentication.token import AuthError, get_auth_token
|
||||
from crewai.cli.config import Settings
|
||||
from crewai.cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
|
||||
from crewai.cli.plus_api import PlusAPI
|
||||
from crewai.cli.version import get_crewai_version
|
||||
from crewai.events.listeners.tracing.types import TraceEvent
|
||||
from crewai.events.listeners.tracing.utils import (
|
||||
get_user_id,
|
||||
is_tracing_enabled_in_context,
|
||||
should_auto_collect_first_time_traces,
|
||||
)
|
||||
from crewai.utilities.version import get_crewai_version
|
||||
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
@@ -7,7 +7,6 @@ import uuid
|
||||
from typing_extensions import Self
|
||||
|
||||
from crewai.cli.authentication.token import AuthError, get_auth_token
|
||||
from crewai.cli.version import get_crewai_version
|
||||
from crewai.events.base_event_listener import BaseEventListener
|
||||
from crewai.events.base_events import BaseEvent
|
||||
from crewai.events.event_bus import CrewAIEventsBus
|
||||
@@ -127,6 +126,7 @@ from crewai.events.types.tool_usage_events import (
|
||||
ToolUsageStartedEvent,
|
||||
)
|
||||
from crewai.events.utils.console_formatter import ConsoleFormatter
|
||||
from crewai.utilities.version import get_crewai_version
|
||||
|
||||
|
||||
class TraceCollectionListener(BaseEventListener):
|
||||
|
||||
@@ -87,7 +87,6 @@ class LLMStreamChunkEvent(LLMEventBase):
|
||||
tool_call: ToolCall | None = None
|
||||
call_type: LLMCallType | None = None
|
||||
response_id: str | None = None
|
||||
run_id: str | None = None
|
||||
|
||||
|
||||
class LLMThinkingChunkEvent(LLMEventBase):
|
||||
|
||||
@@ -113,7 +113,11 @@ from crewai.flow.utils import (
|
||||
)
|
||||
from crewai.memory.memory_scope import MemoryScope, MemorySlice
|
||||
from crewai.memory.unified_memory import Memory
|
||||
from crewai.state.checkpoint_config import CheckpointConfig, _coerce_checkpoint
|
||||
from crewai.state.checkpoint_config import (
|
||||
CheckpointConfig,
|
||||
_coerce_checkpoint,
|
||||
apply_checkpoint,
|
||||
)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -122,7 +126,6 @@ if TYPE_CHECKING:
|
||||
from crewai.context import ExecutionContext
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
from crewai.state.provider.core import BaseProvider
|
||||
|
||||
from crewai.flow.visualization import build_flow_structure, render_interactive
|
||||
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
|
||||
@@ -928,20 +931,21 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
] = Field(default=None)
|
||||
|
||||
@classmethod
|
||||
def from_checkpoint(
|
||||
cls, path: str, *, provider: BaseProvider | None = None
|
||||
) -> Flow: # type: ignore[type-arg]
|
||||
"""Restore a Flow from a checkpoint file."""
|
||||
def from_checkpoint(cls, config: CheckpointConfig) -> Flow: # type: ignore[type-arg]
|
||||
"""Restore a Flow from a checkpoint.
|
||||
|
||||
Args:
|
||||
config: Checkpoint configuration with ``restore_from`` set to
|
||||
the path of the checkpoint to load.
|
||||
|
||||
Returns:
|
||||
A Flow instance ready to resume.
|
||||
"""
|
||||
from crewai.context import apply_execution_context
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.state.provider.json_provider import JsonProvider
|
||||
from crewai.state.runtime import RuntimeState
|
||||
|
||||
state = RuntimeState.from_checkpoint(
|
||||
path,
|
||||
provider=provider or JsonProvider(),
|
||||
context={"from_checkpoint": True},
|
||||
)
|
||||
state = RuntimeState.from_checkpoint(config, context={"from_checkpoint": True})
|
||||
crewai_event_bus.set_runtime_state(state)
|
||||
for entity in state.root:
|
||||
if not isinstance(entity, Flow):
|
||||
@@ -958,7 +962,32 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
instance.checkpoint_state = entity.checkpoint_state
|
||||
instance._restore_from_checkpoint()
|
||||
return instance
|
||||
raise ValueError(f"No Flow found in checkpoint: {path}")
|
||||
raise ValueError(f"No Flow found in checkpoint: {config.restore_from}")
|
||||
|
||||
@classmethod
|
||||
def fork(
|
||||
cls,
|
||||
config: CheckpointConfig,
|
||||
branch: str | None = None,
|
||||
) -> Flow: # type: ignore[type-arg]
|
||||
"""Fork a Flow from a checkpoint, creating a new execution branch.
|
||||
|
||||
Args:
|
||||
config: Checkpoint configuration with ``restore_from`` set.
|
||||
branch: Branch label for the fork. Auto-generated if not provided.
|
||||
|
||||
Returns:
|
||||
A Flow instance on the new branch. Call kickoff() to run.
|
||||
"""
|
||||
flow = cls.from_checkpoint(config)
|
||||
state = crewai_event_bus._runtime_state
|
||||
if state is None:
|
||||
raise RuntimeError(
|
||||
"Cannot fork: no runtime state on the event bus. "
|
||||
"Ensure from_checkpoint() succeeded before calling fork()."
|
||||
)
|
||||
state.fork(branch)
|
||||
return flow
|
||||
|
||||
checkpoint_completed_methods: set[str] | None = Field(default=None)
|
||||
checkpoint_method_outputs: list[Any] | None = Field(default=None)
|
||||
@@ -1956,6 +1985,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
input_files: dict[str, FileInput] | None = None,
|
||||
from_checkpoint: CheckpointConfig | None = None,
|
||||
) -> Any | FlowStreamingOutput:
|
||||
"""Start the flow execution in a synchronous context.
|
||||
|
||||
@@ -1965,10 +1995,15 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
Args:
|
||||
inputs: Optional dictionary containing input values and/or a state ID.
|
||||
input_files: Optional dict of named file inputs for the flow.
|
||||
from_checkpoint: Optional checkpoint config. If ``restore_from``
|
||||
is set, the flow resumes from that checkpoint.
|
||||
|
||||
Returns:
|
||||
The final output from the flow or FlowStreamingOutput if streaming.
|
||||
"""
|
||||
restored = apply_checkpoint(self, from_checkpoint)
|
||||
if restored is not None:
|
||||
return restored.kickoff(inputs=inputs, input_files=input_files)
|
||||
get_env_context()
|
||||
if self.stream:
|
||||
result_holder: list[Any] = []
|
||||
@@ -2025,6 +2060,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
input_files: dict[str, FileInput] | None = None,
|
||||
from_checkpoint: CheckpointConfig | None = None,
|
||||
) -> Any | FlowStreamingOutput:
|
||||
"""Start the flow execution asynchronously.
|
||||
|
||||
@@ -2036,10 +2072,15 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
Args:
|
||||
inputs: Optional dictionary containing input values and/or a state ID for restoration.
|
||||
input_files: Optional dict of named file inputs for the flow.
|
||||
from_checkpoint: Optional checkpoint config. If ``restore_from``
|
||||
is set, the flow resumes from that checkpoint.
|
||||
|
||||
Returns:
|
||||
The final output from the flow, which is the result of the last executed method.
|
||||
"""
|
||||
restored = apply_checkpoint(self, from_checkpoint)
|
||||
if restored is not None:
|
||||
return await restored.kickoff_async(inputs=inputs, input_files=input_files)
|
||||
if self.stream:
|
||||
result_holder: list[Any] = []
|
||||
current_task_info: TaskInfo = {
|
||||
@@ -2298,17 +2339,20 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
input_files: dict[str, FileInput] | None = None,
|
||||
from_checkpoint: CheckpointConfig | None = None,
|
||||
) -> Any | FlowStreamingOutput:
|
||||
"""Native async method to start the flow execution. Alias for kickoff_async.
|
||||
|
||||
Args:
|
||||
inputs: Optional dictionary containing input values and/or a state ID for restoration.
|
||||
input_files: Optional dict of named file inputs for the flow.
|
||||
from_checkpoint: Optional checkpoint config. If ``restore_from``
|
||||
is set, the flow resumes from that checkpoint.
|
||||
|
||||
Returns:
|
||||
The final output from the flow, which is the result of the last executed method.
|
||||
"""
|
||||
return await self.kickoff_async(inputs, input_files)
|
||||
return await self.kickoff_async(inputs, input_files, from_checkpoint)
|
||||
|
||||
async def _execute_start_method(self, start_method_name: FlowMethodName) -> None:
|
||||
"""Executes a flow's start method and its triggered listeners.
|
||||
|
||||
@@ -38,7 +38,6 @@ from crewai.llms.base_llm import (
|
||||
get_current_call_id,
|
||||
llm_call_context,
|
||||
)
|
||||
from crewai.utilities.streaming import get_current_stream_run_id
|
||||
from crewai.llms.constants import (
|
||||
ANTHROPIC_MODELS,
|
||||
AZURE_MODELS,
|
||||
@@ -791,7 +790,6 @@ class LLM(BaseLLM):
|
||||
call_type=LLMCallType.LLM_CALL,
|
||||
response_id=response_id,
|
||||
call_id=get_current_call_id(),
|
||||
run_id=get_current_stream_run_id(),
|
||||
),
|
||||
)
|
||||
# --- 4) Fallback to non-streaming if no content received
|
||||
@@ -1005,7 +1003,6 @@ class LLM(BaseLLM):
|
||||
call_type=LLMCallType.TOOL_CALL,
|
||||
response_id=response_id,
|
||||
call_id=get_current_call_id(),
|
||||
run_id=get_current_stream_run_id(),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -1459,7 +1456,6 @@ class LLM(BaseLLM):
|
||||
from_agent=from_agent,
|
||||
response_id=response_id,
|
||||
call_id=get_current_call_id(),
|
||||
run_id=get_current_stream_run_id(),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -36,7 +36,6 @@ from crewai.events.types.llm_events import (
|
||||
LLMStreamChunkEvent,
|
||||
LLMThinkingChunkEvent,
|
||||
)
|
||||
from crewai.utilities.streaming import get_current_stream_run_id
|
||||
from crewai.events.types.tool_usage_events import (
|
||||
ToolUsageErrorEvent,
|
||||
ToolUsageFinishedEvent,
|
||||
@@ -173,6 +172,8 @@ class BaseLLM(BaseModel, ABC):
|
||||
"completion_tokens": 0,
|
||||
"successful_requests": 0,
|
||||
"cached_prompt_tokens": 0,
|
||||
"reasoning_tokens": 0,
|
||||
"cache_creation_tokens": 0,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -528,7 +529,6 @@ class BaseLLM(BaseModel, ABC):
|
||||
call_type=call_type,
|
||||
response_id=response_id,
|
||||
call_id=get_current_call_id(),
|
||||
run_id=get_current_stream_run_id(),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -810,14 +810,24 @@ class BaseLLM(BaseModel, ABC):
|
||||
cached_tokens = (
|
||||
usage_data.get("cached_tokens")
|
||||
or usage_data.get("cached_prompt_tokens")
|
||||
or usage_data.get("cache_read_input_tokens")
|
||||
or 0
|
||||
)
|
||||
if not cached_tokens:
|
||||
prompt_details = usage_data.get("prompt_tokens_details")
|
||||
if isinstance(prompt_details, dict):
|
||||
cached_tokens = prompt_details.get("cached_tokens", 0) or 0
|
||||
|
||||
reasoning_tokens = usage_data.get("reasoning_tokens", 0) or 0
|
||||
cache_creation_tokens = usage_data.get("cache_creation_tokens", 0) or 0
|
||||
|
||||
self._token_usage["prompt_tokens"] += prompt_tokens
|
||||
self._token_usage["completion_tokens"] += completion_tokens
|
||||
self._token_usage["total_tokens"] += prompt_tokens + completion_tokens
|
||||
self._token_usage["successful_requests"] += 1
|
||||
self._token_usage["cached_prompt_tokens"] += cached_tokens
|
||||
self._token_usage["reasoning_tokens"] += reasoning_tokens
|
||||
self._token_usage["cache_creation_tokens"] += cache_creation_tokens
|
||||
|
||||
def get_token_usage_summary(self) -> UsageMetrics:
|
||||
"""Get summary of token usage for this LLM instance.
|
||||
|
||||
@@ -494,6 +494,10 @@ class AnthropicCompletion(BaseLLM):
|
||||
"required": [],
|
||||
}
|
||||
|
||||
func_info = tool.get("function", {})
|
||||
if func_info.get("strict"):
|
||||
anthropic_tool["strict"] = True
|
||||
|
||||
anthropic_tools.append(anthropic_tool)
|
||||
|
||||
return anthropic_tools
|
||||
@@ -1704,18 +1708,23 @@ class AnthropicCompletion(BaseLLM):
|
||||
def _extract_anthropic_token_usage(
|
||||
response: Message | BetaMessage,
|
||||
) -> dict[str, Any]:
|
||||
"""Extract token usage from Anthropic response."""
|
||||
"""Extract token usage and response metadata from Anthropic response."""
|
||||
if hasattr(response, "usage") and response.usage:
|
||||
usage = response.usage
|
||||
input_tokens = getattr(usage, "input_tokens", 0)
|
||||
output_tokens = getattr(usage, "output_tokens", 0)
|
||||
cache_read_tokens = getattr(usage, "cache_read_input_tokens", 0) or 0
|
||||
return {
|
||||
cache_creation_tokens = (
|
||||
getattr(usage, "cache_creation_input_tokens", 0) or 0
|
||||
)
|
||||
result: dict[str, Any] = {
|
||||
"input_tokens": input_tokens,
|
||||
"output_tokens": output_tokens,
|
||||
"total_tokens": input_tokens + output_tokens,
|
||||
"cached_prompt_tokens": cache_read_tokens,
|
||||
"cache_creation_tokens": cache_creation_tokens,
|
||||
}
|
||||
return result
|
||||
return {"total_tokens": 0}
|
||||
|
||||
def supports_multimodal(self) -> bool:
|
||||
|
||||
@@ -1076,19 +1076,27 @@ class AzureCompletion(BaseLLM):
|
||||
|
||||
@staticmethod
|
||||
def _extract_azure_token_usage(response: ChatCompletions) -> dict[str, Any]:
|
||||
"""Extract token usage from Azure response."""
|
||||
"""Extract token usage and response metadata from Azure response."""
|
||||
if hasattr(response, "usage") and response.usage:
|
||||
usage = response.usage
|
||||
cached_tokens = 0
|
||||
prompt_details = getattr(usage, "prompt_tokens_details", None)
|
||||
if prompt_details:
|
||||
cached_tokens = getattr(prompt_details, "cached_tokens", 0) or 0
|
||||
return {
|
||||
reasoning_tokens = 0
|
||||
completion_details = getattr(usage, "completion_tokens_details", None)
|
||||
if completion_details:
|
||||
reasoning_tokens = (
|
||||
getattr(completion_details, "reasoning_tokens", 0) or 0
|
||||
)
|
||||
result: dict[str, Any] = {
|
||||
"prompt_tokens": getattr(usage, "prompt_tokens", 0),
|
||||
"completion_tokens": getattr(usage, "completion_tokens", 0),
|
||||
"total_tokens": getattr(usage, "total_tokens", 0),
|
||||
"cached_prompt_tokens": cached_tokens,
|
||||
"reasoning_tokens": reasoning_tokens,
|
||||
}
|
||||
return result
|
||||
return {"total_tokens": 0}
|
||||
|
||||
async def aclose(self) -> None:
|
||||
|
||||
@@ -169,6 +169,7 @@ class ToolSpec(TypedDict, total=False):
|
||||
name: Required[str]
|
||||
description: Required[str]
|
||||
inputSchema: ToolInputSchema
|
||||
strict: bool
|
||||
|
||||
|
||||
class ConverseToolTypeDef(TypedDict):
|
||||
@@ -1965,6 +1966,10 @@ class BedrockCompletion(BaseLLM):
|
||||
input_schema: ToolInputSchema = {"json": parameters}
|
||||
tool_spec["inputSchema"] = input_schema
|
||||
|
||||
func_info = tool.get("function", {})
|
||||
if func_info.get("strict"):
|
||||
tool_spec["strict"] = True
|
||||
|
||||
converse_tool: ConverseToolTypeDef = {"toolSpec": tool_spec}
|
||||
|
||||
converse_tools.append(converse_tool)
|
||||
@@ -2025,11 +2030,18 @@ class BedrockCompletion(BaseLLM):
|
||||
input_tokens = usage.get("inputTokens", 0)
|
||||
output_tokens = usage.get("outputTokens", 0)
|
||||
total_tokens = usage.get("totalTokens", input_tokens + output_tokens)
|
||||
raw_cached = (
|
||||
usage.get("cacheReadInputTokenCount")
|
||||
or usage.get("cacheReadInputTokens")
|
||||
or 0
|
||||
)
|
||||
cached_tokens = raw_cached if isinstance(raw_cached, int) else 0
|
||||
|
||||
self._token_usage["prompt_tokens"] += input_tokens
|
||||
self._token_usage["completion_tokens"] += output_tokens
|
||||
self._token_usage["total_tokens"] += total_tokens
|
||||
self._token_usage["successful_requests"] += 1
|
||||
self._token_usage["cached_prompt_tokens"] += cached_tokens
|
||||
|
||||
def supports_function_calling(self) -> bool:
|
||||
"""Check if the model supports function calling."""
|
||||
|
||||
@@ -1306,17 +1306,20 @@ class GeminiCompletion(BaseLLM):
|
||||
|
||||
@staticmethod
|
||||
def _extract_token_usage(response: GenerateContentResponse) -> dict[str, Any]:
|
||||
"""Extract token usage from Gemini response."""
|
||||
"""Extract token usage and response metadata from Gemini response."""
|
||||
if response.usage_metadata:
|
||||
usage = response.usage_metadata
|
||||
cached_tokens = getattr(usage, "cached_content_token_count", 0) or 0
|
||||
return {
|
||||
thinking_tokens = getattr(usage, "thoughts_token_count", 0) or 0
|
||||
result: dict[str, Any] = {
|
||||
"prompt_token_count": getattr(usage, "prompt_token_count", 0),
|
||||
"candidates_token_count": getattr(usage, "candidates_token_count", 0),
|
||||
"total_token_count": getattr(usage, "total_token_count", 0),
|
||||
"total_tokens": getattr(usage, "total_token_count", 0),
|
||||
"cached_prompt_tokens": cached_tokens,
|
||||
"reasoning_tokens": thinking_tokens,
|
||||
}
|
||||
return result
|
||||
return {"total_tokens": 0}
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -1324,19 +1324,23 @@ class OpenAICompletion(BaseLLM):
|
||||
]
|
||||
|
||||
def _extract_responses_token_usage(self, response: Response) -> dict[str, Any]:
|
||||
"""Extract token usage from Responses API response."""
|
||||
"""Extract token usage and response metadata from Responses API response."""
|
||||
if response.usage:
|
||||
result = {
|
||||
result: dict[str, Any] = {
|
||||
"prompt_tokens": response.usage.input_tokens,
|
||||
"completion_tokens": response.usage.output_tokens,
|
||||
"total_tokens": response.usage.total_tokens,
|
||||
}
|
||||
# Extract cached prompt tokens from input_tokens_details
|
||||
input_details = getattr(response.usage, "input_tokens_details", None)
|
||||
if input_details:
|
||||
result["cached_prompt_tokens"] = (
|
||||
getattr(input_details, "cached_tokens", 0) or 0
|
||||
)
|
||||
output_details = getattr(response.usage, "output_tokens_details", None)
|
||||
if output_details:
|
||||
result["reasoning_tokens"] = (
|
||||
getattr(output_details, "reasoning_tokens", 0) or 0
|
||||
)
|
||||
return result
|
||||
return {"total_tokens": 0}
|
||||
|
||||
@@ -2307,20 +2311,24 @@ class OpenAICompletion(BaseLLM):
|
||||
def _extract_openai_token_usage(
|
||||
self, response: ChatCompletion | ChatCompletionChunk
|
||||
) -> dict[str, Any]:
|
||||
"""Extract token usage from OpenAI ChatCompletion or ChatCompletionChunk response."""
|
||||
"""Extract token usage and response metadata from OpenAI ChatCompletion."""
|
||||
if hasattr(response, "usage") and response.usage:
|
||||
usage = response.usage
|
||||
result = {
|
||||
result: dict[str, Any] = {
|
||||
"prompt_tokens": getattr(usage, "prompt_tokens", 0),
|
||||
"completion_tokens": getattr(usage, "completion_tokens", 0),
|
||||
"total_tokens": getattr(usage, "total_tokens", 0),
|
||||
}
|
||||
# Extract cached prompt tokens from prompt_tokens_details
|
||||
prompt_details = getattr(usage, "prompt_tokens_details", None)
|
||||
if prompt_details:
|
||||
result["cached_prompt_tokens"] = (
|
||||
getattr(prompt_details, "cached_tokens", 0) or 0
|
||||
)
|
||||
completion_details = getattr(usage, "completion_tokens_details", None)
|
||||
if completion_details:
|
||||
result["reasoning_tokens"] = (
|
||||
getattr(completion_details, "reasoning_tokens", 0) or 0
|
||||
)
|
||||
return result
|
||||
return {"total_tokens": 0}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Annotated, Any, Literal
|
||||
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
@@ -201,11 +202,20 @@ class CheckpointConfig(BaseModel):
|
||||
description="Maximum checkpoints to keep. Oldest are pruned after "
|
||||
"each write. None means keep all.",
|
||||
)
|
||||
restore_from: Path | str | None = Field(
|
||||
default=None,
|
||||
description="Path or location of a checkpoint to restore from. "
|
||||
"When passed via a kickoff method's from_checkpoint parameter, "
|
||||
"the crew or flow resumes from this checkpoint.",
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _register_handlers(self) -> CheckpointConfig:
|
||||
from crewai.state.checkpoint_listener import _ensure_handlers_registered
|
||||
|
||||
if isinstance(self.provider, SqliteProvider) and not Path(self.location).suffix:
|
||||
self.location = f"{self.location}.db"
|
||||
|
||||
_ensure_handlers_registered()
|
||||
return self
|
||||
|
||||
@@ -216,3 +226,25 @@ class CheckpointConfig(BaseModel):
|
||||
@property
|
||||
def trigger_events(self) -> set[str]:
|
||||
return set(self.on_events)
|
||||
|
||||
|
||||
def apply_checkpoint(instance: Any, from_checkpoint: CheckpointConfig | None) -> Any:
|
||||
"""Handle checkpoint config for a kickoff method.
|
||||
|
||||
If *from_checkpoint* carries a ``restore_from`` path, builds and returns a
|
||||
restored instance (with ``restore_from`` cleared). The caller should
|
||||
dispatch into its own kickoff variant on that restored instance.
|
||||
|
||||
If *from_checkpoint* is present but has no ``restore_from``, sets
|
||||
``instance.checkpoint`` and returns ``None`` (proceed normally).
|
||||
|
||||
If *from_checkpoint* is ``None``, returns ``None`` immediately.
|
||||
"""
|
||||
if from_checkpoint is None:
|
||||
return None
|
||||
if from_checkpoint.restore_from is not None:
|
||||
restored = type(instance).from_checkpoint(from_checkpoint)
|
||||
restored.checkpoint = from_checkpoint.model_copy(update={"restore_from": None})
|
||||
return restored
|
||||
instance.checkpoint = from_checkpoint
|
||||
return None
|
||||
|
||||
@@ -7,6 +7,7 @@ avoids per-event overhead when no entity uses checkpointing.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
from typing import Any
|
||||
@@ -102,14 +103,25 @@ def _find_checkpoint(source: Any) -> CheckpointConfig | None:
|
||||
return None
|
||||
|
||||
|
||||
def _do_checkpoint(state: RuntimeState, cfg: CheckpointConfig) -> None:
|
||||
def _do_checkpoint(
|
||||
state: RuntimeState, cfg: CheckpointConfig, event: BaseEvent | None = None
|
||||
) -> None:
|
||||
"""Write a checkpoint and prune old ones if configured."""
|
||||
_prepare_entities(state.root)
|
||||
data = state.model_dump_json()
|
||||
cfg.provider.checkpoint(data, cfg.location)
|
||||
payload = state.model_dump(mode="json")
|
||||
if event is not None:
|
||||
payload["trigger"] = event.type
|
||||
data = json.dumps(payload)
|
||||
location = cfg.provider.checkpoint(
|
||||
data,
|
||||
cfg.location,
|
||||
parent_id=state._parent_id,
|
||||
branch=state._branch,
|
||||
)
|
||||
state._chain_lineage(cfg.provider, location)
|
||||
|
||||
if cfg.max_checkpoints is not None:
|
||||
cfg.provider.prune(cfg.location, cfg.max_checkpoints)
|
||||
cfg.provider.prune(cfg.location, cfg.max_checkpoints, branch=state._branch)
|
||||
|
||||
|
||||
def _should_checkpoint(source: Any, event: BaseEvent) -> CheckpointConfig | None:
|
||||
@@ -128,7 +140,7 @@ def _on_any_event(source: Any, event: BaseEvent, state: Any) -> None:
|
||||
if cfg is None:
|
||||
return
|
||||
try:
|
||||
_do_checkpoint(state, cfg)
|
||||
_do_checkpoint(state, cfg, event)
|
||||
except Exception:
|
||||
logger.warning("Auto-checkpoint failed for event %s", event.type, exc_info=True)
|
||||
|
||||
|
||||
@@ -17,12 +17,21 @@ class BaseProvider(BaseModel, ABC):
|
||||
provider_type: str = "base"
|
||||
|
||||
@abstractmethod
|
||||
def checkpoint(self, data: str, location: str) -> str:
|
||||
def checkpoint(
|
||||
self,
|
||||
data: str,
|
||||
location: str,
|
||||
*,
|
||||
parent_id: str | None = None,
|
||||
branch: str = "main",
|
||||
) -> str:
|
||||
"""Persist a snapshot synchronously.
|
||||
|
||||
Args:
|
||||
data: The serialized string to persist.
|
||||
location: Storage destination (directory, file path, URI, etc.).
|
||||
parent_id: ID of the parent checkpoint for lineage tracking.
|
||||
branch: Branch label for this checkpoint.
|
||||
|
||||
Returns:
|
||||
A location identifier for the saved checkpoint.
|
||||
@@ -30,12 +39,21 @@ class BaseProvider(BaseModel, ABC):
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
async def acheckpoint(self, data: str, location: str) -> str:
|
||||
async def acheckpoint(
|
||||
self,
|
||||
data: str,
|
||||
location: str,
|
||||
*,
|
||||
parent_id: str | None = None,
|
||||
branch: str = "main",
|
||||
) -> str:
|
||||
"""Persist a snapshot asynchronously.
|
||||
|
||||
Args:
|
||||
data: The serialized string to persist.
|
||||
location: Storage destination (directory, file path, URI, etc.).
|
||||
parent_id: ID of the parent checkpoint for lineage tracking.
|
||||
branch: Branch label for this checkpoint.
|
||||
|
||||
Returns:
|
||||
A location identifier for the saved checkpoint.
|
||||
@@ -43,12 +61,25 @@ class BaseProvider(BaseModel, ABC):
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def prune(self, location: str, max_keep: int) -> None:
|
||||
"""Remove old checkpoints, keeping at most *max_keep*.
|
||||
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> None:
|
||||
"""Remove old checkpoints, keeping at most *max_keep* per branch.
|
||||
|
||||
Args:
|
||||
location: The storage destination passed to ``checkpoint``.
|
||||
max_keep: Maximum number of checkpoints to retain.
|
||||
branch: Only prune checkpoints on this branch.
|
||||
"""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def extract_id(self, location: str) -> str:
|
||||
"""Extract the checkpoint ID from a location string.
|
||||
|
||||
Args:
|
||||
location: The identifier returned by a previous ``checkpoint`` call.
|
||||
|
||||
Returns:
|
||||
The checkpoint ID.
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
@@ -19,48 +19,87 @@ from crewai.state.provider.core import BaseProvider
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _safe_branch(base: str, branch: str) -> None:
|
||||
"""Validate that a branch name doesn't escape the base directory.
|
||||
|
||||
Raises:
|
||||
ValueError: If the branch resolves outside the base directory.
|
||||
"""
|
||||
base_resolved = str(Path(base).resolve())
|
||||
target_resolved = str((Path(base) / branch).resolve())
|
||||
if (
|
||||
not target_resolved.startswith(base_resolved + os.sep)
|
||||
and target_resolved != base_resolved
|
||||
):
|
||||
raise ValueError(f"Branch name escapes checkpoint directory: {branch!r}")
|
||||
|
||||
|
||||
class JsonProvider(BaseProvider):
|
||||
"""Persists runtime state checkpoints as JSON files on the local filesystem."""
|
||||
|
||||
provider_type: Literal["json"] = "json"
|
||||
|
||||
def checkpoint(self, data: str, location: str) -> str:
|
||||
def checkpoint(
|
||||
self,
|
||||
data: str,
|
||||
location: str,
|
||||
*,
|
||||
parent_id: str | None = None,
|
||||
branch: str = "main",
|
||||
) -> str:
|
||||
"""Write a JSON checkpoint file.
|
||||
|
||||
Args:
|
||||
data: The serialized JSON string to persist.
|
||||
location: Directory where the checkpoint will be saved.
|
||||
location: Base directory where checkpoints are saved.
|
||||
parent_id: ID of the parent checkpoint for lineage tracking.
|
||||
Encoded in the filename for queryable lineage without
|
||||
parsing the blob.
|
||||
branch: Branch label. Files are stored under ``location/branch/``.
|
||||
|
||||
Returns:
|
||||
The path to the written checkpoint file.
|
||||
"""
|
||||
file_path = _build_path(location)
|
||||
file_path = _build_path(location, branch, parent_id)
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with open(file_path, "w") as f:
|
||||
f.write(data)
|
||||
return str(file_path)
|
||||
|
||||
async def acheckpoint(self, data: str, location: str) -> str:
|
||||
async def acheckpoint(
|
||||
self,
|
||||
data: str,
|
||||
location: str,
|
||||
*,
|
||||
parent_id: str | None = None,
|
||||
branch: str = "main",
|
||||
) -> str:
|
||||
"""Write a JSON checkpoint file asynchronously.
|
||||
|
||||
Args:
|
||||
data: The serialized JSON string to persist.
|
||||
location: Directory where the checkpoint will be saved.
|
||||
location: Base directory where checkpoints are saved.
|
||||
parent_id: ID of the parent checkpoint for lineage tracking.
|
||||
Encoded in the filename for queryable lineage without
|
||||
parsing the blob.
|
||||
branch: Branch label. Files are stored under ``location/branch/``.
|
||||
|
||||
Returns:
|
||||
The path to the written checkpoint file.
|
||||
"""
|
||||
file_path = _build_path(location)
|
||||
file_path = _build_path(location, branch, parent_id)
|
||||
await aiofiles.os.makedirs(str(file_path.parent), exist_ok=True)
|
||||
|
||||
async with aiofiles.open(file_path, "w") as f:
|
||||
await f.write(data)
|
||||
return str(file_path)
|
||||
|
||||
def prune(self, location: str, max_keep: int) -> None:
|
||||
"""Remove oldest checkpoint files beyond *max_keep*."""
|
||||
pattern = os.path.join(location, "*.json")
|
||||
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> None:
|
||||
"""Remove oldest checkpoint files beyond *max_keep* on a branch."""
|
||||
_safe_branch(location, branch)
|
||||
branch_dir = os.path.join(location, branch)
|
||||
pattern = os.path.join(branch_dir, "*.json")
|
||||
files = sorted(glob.glob(pattern), key=os.path.getmtime)
|
||||
for path in files if max_keep == 0 else files[:-max_keep]:
|
||||
try:
|
||||
@@ -68,6 +107,16 @@ class JsonProvider(BaseProvider):
|
||||
except OSError: # noqa: PERF203
|
||||
logger.debug("Failed to remove %s", path, exc_info=True)
|
||||
|
||||
def extract_id(self, location: str) -> str:
|
||||
"""Extract the checkpoint ID from a file path.
|
||||
|
||||
The filename format is ``{ts}_{uuid8}_p-{parent}.json``.
|
||||
The checkpoint ID is the ``{ts}_{uuid8}`` prefix.
|
||||
"""
|
||||
stem = Path(location).stem
|
||||
idx = stem.find("_p-")
|
||||
return stem[:idx] if idx != -1 else stem
|
||||
|
||||
def from_checkpoint(self, location: str) -> str:
|
||||
"""Read a JSON checkpoint file.
|
||||
|
||||
@@ -92,15 +141,24 @@ class JsonProvider(BaseProvider):
|
||||
return await f.read()
|
||||
|
||||
|
||||
def _build_path(directory: str) -> Path:
|
||||
"""Build a timestamped checkpoint file path.
|
||||
def _build_path(
|
||||
directory: str, branch: str = "main", parent_id: str | None = None
|
||||
) -> Path:
|
||||
"""Build a timestamped checkpoint file path under a branch subdirectory.
|
||||
|
||||
Filename format: ``{ts}_{uuid8}_p-{parent_id}.json``
|
||||
|
||||
Args:
|
||||
directory: Parent directory for the checkpoint file.
|
||||
directory: Base directory for checkpoints.
|
||||
branch: Branch label used as a subdirectory name.
|
||||
parent_id: Parent checkpoint ID to encode in the filename.
|
||||
|
||||
Returns:
|
||||
The target file path.
|
||||
"""
|
||||
_safe_branch(directory, branch)
|
||||
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S")
|
||||
filename = f"{ts}_{uuid.uuid4().hex[:8]}.json"
|
||||
return Path(directory) / filename
|
||||
short_uuid = uuid.uuid4().hex[:8]
|
||||
parent_suffix = parent_id or "none"
|
||||
filename = f"{ts}_{short_uuid}_p-{parent_suffix}.json"
|
||||
return Path(directory) / branch / filename
|
||||
|
||||
@@ -17,15 +17,20 @@ _CREATE_TABLE = """
|
||||
CREATE TABLE IF NOT EXISTS checkpoints (
|
||||
id TEXT PRIMARY KEY,
|
||||
created_at TEXT NOT NULL,
|
||||
parent_id TEXT,
|
||||
branch TEXT NOT NULL DEFAULT 'main',
|
||||
data JSONB NOT NULL
|
||||
)
|
||||
"""
|
||||
|
||||
_INSERT = "INSERT INTO checkpoints (id, created_at, data) VALUES (?, ?, jsonb(?))"
|
||||
_INSERT = (
|
||||
"INSERT INTO checkpoints (id, created_at, parent_id, branch, data) "
|
||||
"VALUES (?, ?, ?, ?, jsonb(?))"
|
||||
)
|
||||
_SELECT = "SELECT json(data) FROM checkpoints WHERE id = ?"
|
||||
_PRUNE = """
|
||||
DELETE FROM checkpoints WHERE rowid NOT IN (
|
||||
SELECT rowid FROM checkpoints ORDER BY rowid DESC LIMIT ?
|
||||
DELETE FROM checkpoints WHERE branch = ? AND rowid NOT IN (
|
||||
SELECT rowid FROM checkpoints WHERE branch = ? ORDER BY rowid DESC LIMIT ?
|
||||
)
|
||||
"""
|
||||
|
||||
@@ -50,12 +55,21 @@ class SqliteProvider(BaseProvider):
|
||||
|
||||
provider_type: Literal["sqlite"] = "sqlite"
|
||||
|
||||
def checkpoint(self, data: str, location: str) -> str:
|
||||
def checkpoint(
|
||||
self,
|
||||
data: str,
|
||||
location: str,
|
||||
*,
|
||||
parent_id: str | None = None,
|
||||
branch: str = "main",
|
||||
) -> str:
|
||||
"""Write a checkpoint to the SQLite database.
|
||||
|
||||
Args:
|
||||
data: The serialized JSON string to persist.
|
||||
location: Path to the SQLite database file.
|
||||
parent_id: ID of the parent checkpoint for lineage tracking.
|
||||
branch: Branch label for this checkpoint.
|
||||
|
||||
Returns:
|
||||
A location string in the format ``"db_path#checkpoint_id"``.
|
||||
@@ -65,16 +79,25 @@ class SqliteProvider(BaseProvider):
|
||||
with sqlite3.connect(location) as conn:
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute(_CREATE_TABLE)
|
||||
conn.execute(_INSERT, (checkpoint_id, ts, data))
|
||||
conn.execute(_INSERT, (checkpoint_id, ts, parent_id, branch, data))
|
||||
conn.commit()
|
||||
return f"{location}#{checkpoint_id}"
|
||||
|
||||
async def acheckpoint(self, data: str, location: str) -> str:
|
||||
async def acheckpoint(
|
||||
self,
|
||||
data: str,
|
||||
location: str,
|
||||
*,
|
||||
parent_id: str | None = None,
|
||||
branch: str = "main",
|
||||
) -> str:
|
||||
"""Write a checkpoint to the SQLite database asynchronously.
|
||||
|
||||
Args:
|
||||
data: The serialized JSON string to persist.
|
||||
location: Path to the SQLite database file.
|
||||
parent_id: ID of the parent checkpoint for lineage tracking.
|
||||
branch: Branch label for this checkpoint.
|
||||
|
||||
Returns:
|
||||
A location string in the format ``"db_path#checkpoint_id"``.
|
||||
@@ -84,16 +107,20 @@ class SqliteProvider(BaseProvider):
|
||||
async with aiosqlite.connect(location) as db:
|
||||
await db.execute("PRAGMA journal_mode=WAL")
|
||||
await db.execute(_CREATE_TABLE)
|
||||
await db.execute(_INSERT, (checkpoint_id, ts, data))
|
||||
await db.execute(_INSERT, (checkpoint_id, ts, parent_id, branch, data))
|
||||
await db.commit()
|
||||
return f"{location}#{checkpoint_id}"
|
||||
|
||||
def prune(self, location: str, max_keep: int) -> None:
|
||||
"""Remove oldest checkpoint rows beyond *max_keep*."""
|
||||
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> None:
|
||||
"""Remove oldest checkpoint rows beyond *max_keep* on a branch."""
|
||||
with sqlite3.connect(location) as conn:
|
||||
conn.execute(_PRUNE, (max_keep,))
|
||||
conn.execute(_PRUNE, (branch, branch, max_keep))
|
||||
conn.commit()
|
||||
|
||||
def extract_id(self, location: str) -> str:
|
||||
"""Extract the checkpoint ID from a ``db_path#id`` string."""
|
||||
return location.rsplit("#", 1)[1]
|
||||
|
||||
def from_checkpoint(self, location: str) -> str:
|
||||
"""Read a checkpoint from the SQLite database.
|
||||
|
||||
|
||||
@@ -9,8 +9,11 @@ via ``RuntimeState.model_rebuild()``.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any
|
||||
import uuid
|
||||
|
||||
from packaging.version import Version
|
||||
from pydantic import (
|
||||
ModelWrapValidatorHandler,
|
||||
PrivateAttr,
|
||||
@@ -20,9 +23,14 @@ from pydantic import (
|
||||
)
|
||||
|
||||
from crewai.context import capture_execution_context
|
||||
from crewai.state.checkpoint_config import CheckpointConfig
|
||||
from crewai.state.event_record import EventRecord
|
||||
from crewai.state.provider.core import BaseProvider
|
||||
from crewai.state.provider.json_provider import JsonProvider
|
||||
from crewai.utilities.version import get_crewai_version
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -58,12 +66,51 @@ def _sync_checkpoint_fields(entity: object) -> None:
|
||||
entity.checkpoint_inputs = entity._inputs
|
||||
entity.checkpoint_train = entity._train
|
||||
entity.checkpoint_kickoff_event_id = entity._kickoff_event_id
|
||||
for task in entity.tasks:
|
||||
task.checkpoint_original_description = task._original_description
|
||||
task.checkpoint_original_expected_output = task._original_expected_output
|
||||
|
||||
|
||||
def _migrate(data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Apply version-based migrations to checkpoint data.
|
||||
|
||||
Each block handles checkpoints older than a specific version,
|
||||
transforming them forward to the current format. Blocks run in
|
||||
version order so migrations compose.
|
||||
|
||||
Args:
|
||||
data: The raw deserialized checkpoint dict.
|
||||
|
||||
Returns:
|
||||
The migrated checkpoint dict.
|
||||
"""
|
||||
raw = data.get("crewai_version")
|
||||
current = Version(get_crewai_version())
|
||||
stored = Version(raw) if raw else Version("0.0.0")
|
||||
|
||||
if raw is None:
|
||||
logger.warning("Checkpoint has no crewai_version — treating as 0.0.0")
|
||||
elif stored != current:
|
||||
logger.debug(
|
||||
"Migrating checkpoint from crewAI %s to %s",
|
||||
stored,
|
||||
current,
|
||||
)
|
||||
|
||||
# --- migrations in version order ---
|
||||
# if stored < Version("X.Y.Z"):
|
||||
# data.setdefault("some_field", "default")
|
||||
|
||||
return data
|
||||
|
||||
|
||||
class RuntimeState(RootModel): # type: ignore[type-arg]
|
||||
root: list[Entity]
|
||||
_provider: BaseProvider = PrivateAttr(default_factory=JsonProvider)
|
||||
_event_record: EventRecord = PrivateAttr(default_factory=EventRecord)
|
||||
_checkpoint_id: str | None = PrivateAttr(default=None)
|
||||
_parent_id: str | None = PrivateAttr(default=None)
|
||||
_branch: str = PrivateAttr(default="main")
|
||||
|
||||
@property
|
||||
def event_record(self) -> EventRecord:
|
||||
@@ -73,8 +120,11 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
|
||||
@model_serializer(mode="plain")
|
||||
def _serialize(self) -> dict[str, Any]:
|
||||
return {
|
||||
"crewai_version": get_crewai_version(),
|
||||
"parent_id": self._parent_id,
|
||||
"branch": self._branch,
|
||||
"entities": [e.model_dump(mode="json") for e in self.root],
|
||||
"event_record": self._event_record.model_dump(),
|
||||
"event_record": self._event_record.model_dump(mode="json"),
|
||||
}
|
||||
|
||||
@model_validator(mode="wrap")
|
||||
@@ -83,13 +133,29 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
|
||||
cls, data: Any, handler: ModelWrapValidatorHandler[RuntimeState]
|
||||
) -> RuntimeState:
|
||||
if isinstance(data, dict) and "entities" in data:
|
||||
data = _migrate(data)
|
||||
record_data = data.get("event_record")
|
||||
state = handler(data["entities"])
|
||||
if record_data:
|
||||
state._event_record = EventRecord.model_validate(record_data)
|
||||
state._parent_id = data.get("parent_id")
|
||||
state._branch = data.get("branch", "main")
|
||||
return state
|
||||
return handler(data)
|
||||
|
||||
def _chain_lineage(self, provider: BaseProvider, location: str) -> None:
|
||||
"""Update lineage fields after a successful checkpoint write.
|
||||
|
||||
Sets ``_checkpoint_id`` and ``_parent_id`` so the next write
|
||||
records the correct parent in the lineage chain.
|
||||
|
||||
Args:
|
||||
provider: The provider that performed the write.
|
||||
location: The location string returned by the provider.
|
||||
"""
|
||||
self._checkpoint_id = provider.extract_id(location)
|
||||
self._parent_id = self._checkpoint_id
|
||||
|
||||
def checkpoint(self, location: str) -> str:
|
||||
"""Write a checkpoint.
|
||||
|
||||
@@ -101,7 +167,14 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
|
||||
A location identifier for the saved checkpoint.
|
||||
"""
|
||||
_prepare_entities(self.root)
|
||||
return self._provider.checkpoint(self.model_dump_json(), location)
|
||||
result = self._provider.checkpoint(
|
||||
self.model_dump_json(),
|
||||
location,
|
||||
parent_id=self._parent_id,
|
||||
branch=self._branch,
|
||||
)
|
||||
self._chain_lineage(self._provider, result)
|
||||
return result
|
||||
|
||||
async def acheckpoint(self, location: str) -> str:
|
||||
"""Async version of :meth:`checkpoint`.
|
||||
@@ -114,41 +187,84 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
|
||||
A location identifier for the saved checkpoint.
|
||||
"""
|
||||
_prepare_entities(self.root)
|
||||
return await self._provider.acheckpoint(self.model_dump_json(), location)
|
||||
result = await self._provider.acheckpoint(
|
||||
self.model_dump_json(),
|
||||
location,
|
||||
parent_id=self._parent_id,
|
||||
branch=self._branch,
|
||||
)
|
||||
self._chain_lineage(self._provider, result)
|
||||
return result
|
||||
|
||||
def fork(self, branch: str | None = None) -> None:
|
||||
"""Create a new execution branch and write an initial checkpoint.
|
||||
|
||||
If this state was restored from a checkpoint, an initial checkpoint
|
||||
is written on the new branch so the fork point is recorded.
|
||||
|
||||
Args:
|
||||
branch: Branch label. Auto-generated from the current checkpoint
|
||||
ID if not provided. Always unique — safe to call multiple
|
||||
times without collisions.
|
||||
"""
|
||||
if branch:
|
||||
self._branch = branch
|
||||
elif self._checkpoint_id:
|
||||
self._branch = f"fork/{self._checkpoint_id}_{uuid.uuid4().hex[:6]}"
|
||||
else:
|
||||
self._branch = f"fork/{uuid.uuid4().hex[:8]}"
|
||||
|
||||
@classmethod
|
||||
def from_checkpoint(
|
||||
cls, location: str, provider: BaseProvider, **kwargs: Any
|
||||
) -> RuntimeState:
|
||||
def from_checkpoint(cls, config: CheckpointConfig, **kwargs: Any) -> RuntimeState:
|
||||
"""Restore a RuntimeState from a checkpoint.
|
||||
|
||||
Args:
|
||||
location: The identifier returned by a previous ``checkpoint`` call.
|
||||
provider: The storage backend to read from.
|
||||
config: Checkpoint configuration with ``restore_from`` set.
|
||||
**kwargs: Passed to ``model_validate_json``.
|
||||
|
||||
Returns:
|
||||
A restored RuntimeState.
|
||||
"""
|
||||
from crewai.state.provider.utils import detect_provider
|
||||
|
||||
if config.restore_from is None:
|
||||
raise ValueError("CheckpointConfig.restore_from must be set")
|
||||
location = str(config.restore_from)
|
||||
provider = detect_provider(location)
|
||||
raw = provider.from_checkpoint(location)
|
||||
return cls.model_validate_json(raw, **kwargs)
|
||||
state = cls.model_validate_json(raw, **kwargs)
|
||||
state._provider = provider
|
||||
checkpoint_id = provider.extract_id(location)
|
||||
state._checkpoint_id = checkpoint_id
|
||||
state._parent_id = checkpoint_id
|
||||
return state
|
||||
|
||||
@classmethod
|
||||
async def afrom_checkpoint(
|
||||
cls, location: str, provider: BaseProvider, **kwargs: Any
|
||||
cls, config: CheckpointConfig, **kwargs: Any
|
||||
) -> RuntimeState:
|
||||
"""Async version of :meth:`from_checkpoint`.
|
||||
|
||||
Args:
|
||||
location: The identifier returned by a previous ``acheckpoint`` call.
|
||||
provider: The storage backend to read from.
|
||||
config: Checkpoint configuration with ``restore_from`` set.
|
||||
**kwargs: Passed to ``model_validate_json``.
|
||||
|
||||
Returns:
|
||||
A restored RuntimeState.
|
||||
"""
|
||||
from crewai.state.provider.utils import detect_provider
|
||||
|
||||
if config.restore_from is None:
|
||||
raise ValueError("CheckpointConfig.restore_from must be set")
|
||||
location = str(config.restore_from)
|
||||
provider = detect_provider(location)
|
||||
raw = await provider.afrom_checkpoint(location)
|
||||
return cls.model_validate_json(raw, **kwargs)
|
||||
state = cls.model_validate_json(raw, **kwargs)
|
||||
state._provider = provider
|
||||
checkpoint_id = provider.extract_id(location)
|
||||
state._checkpoint_id = checkpoint_id
|
||||
state._parent_id = checkpoint_id
|
||||
return state
|
||||
|
||||
|
||||
def _prepare_entities(root: list[Entity]) -> None:
|
||||
|
||||
@@ -230,6 +230,8 @@ class Task(BaseModel):
|
||||
_original_description: str | None = PrivateAttr(default=None)
|
||||
_original_expected_output: str | None = PrivateAttr(default=None)
|
||||
_original_output_file: str | None = PrivateAttr(default=None)
|
||||
checkpoint_original_description: str | None = Field(default=None, exclude=False)
|
||||
checkpoint_original_expected_output: str | None = Field(default=None, exclude=False)
|
||||
_thread: threading.Thread | None = PrivateAttr(default=None)
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
|
||||
@@ -29,6 +29,14 @@ class UsageMetrics(BaseModel):
|
||||
completion_tokens: int = Field(
|
||||
default=0, description="Number of tokens used in completions."
|
||||
)
|
||||
reasoning_tokens: int = Field(
|
||||
default=0,
|
||||
description="Number of reasoning/thinking tokens (e.g. OpenAI o-series, Gemini thinking).",
|
||||
)
|
||||
cache_creation_tokens: int = Field(
|
||||
default=0,
|
||||
description="Number of cache creation tokens (e.g. Anthropic cache writes).",
|
||||
)
|
||||
successful_requests: int = Field(
|
||||
default=0, description="Number of successful requests made."
|
||||
)
|
||||
@@ -43,4 +51,6 @@ class UsageMetrics(BaseModel):
|
||||
self.prompt_tokens += usage_metrics.prompt_tokens
|
||||
self.cached_prompt_tokens += usage_metrics.cached_prompt_tokens
|
||||
self.completion_tokens += usage_metrics.completion_tokens
|
||||
self.reasoning_tokens += usage_metrics.reasoning_tokens
|
||||
self.cache_creation_tokens += usage_metrics.cache_creation_tokens
|
||||
self.successful_requests += usage_metrics.successful_requests
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
import queue
|
||||
import threading
|
||||
from typing import Any, NamedTuple
|
||||
import uuid
|
||||
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
@@ -26,17 +25,6 @@ from crewai.utilities.string_utils import sanitize_tool_name
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ContextVar that tracks the current streaming run_id.
|
||||
# Set by create_streaming_state() so that LLM emit paths can stamp events.
|
||||
_current_stream_run_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
|
||||
"_current_stream_run_id", default=None
|
||||
)
|
||||
|
||||
|
||||
def get_current_stream_run_id() -> str | None:
|
||||
"""Return the active streaming run_id for the current context, if any."""
|
||||
return _current_stream_run_id.get()
|
||||
|
||||
|
||||
class TaskInfo(TypedDict):
|
||||
"""Task context information for streaming."""
|
||||
@@ -118,7 +106,6 @@ def _create_stream_handler(
|
||||
sync_queue: queue.Queue[StreamChunk | None | Exception],
|
||||
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None,
|
||||
loop: asyncio.AbstractEventLoop | None = None,
|
||||
run_id: str | None = None,
|
||||
) -> Callable[[Any, BaseEvent], None]:
|
||||
"""Create a stream handler function.
|
||||
|
||||
@@ -127,9 +114,6 @@ def _create_stream_handler(
|
||||
sync_queue: Synchronous queue for chunks.
|
||||
async_queue: Optional async queue for chunks.
|
||||
loop: Optional event loop for async operations.
|
||||
run_id: Unique identifier for this streaming run. When set, the handler
|
||||
only accepts events whose ``run_id`` matches, preventing cross-run
|
||||
chunk contamination in concurrent streaming scenarios.
|
||||
|
||||
Returns:
|
||||
Handler function that can be registered with the event bus.
|
||||
@@ -145,10 +129,6 @@ def _create_stream_handler(
|
||||
if not isinstance(event, LLMStreamChunkEvent):
|
||||
return
|
||||
|
||||
# Filter: only accept events belonging to this streaming run.
|
||||
if run_id is not None and event.run_id is not None and event.run_id != run_id:
|
||||
return
|
||||
|
||||
chunk = _create_stream_chunk(event, current_task_info)
|
||||
|
||||
if async_queue is not None and loop is not None:
|
||||
@@ -207,16 +187,6 @@ def create_streaming_state(
|
||||
) -> StreamingState:
|
||||
"""Create and register streaming state.
|
||||
|
||||
Each call assigns a ``run_id`` that is:
|
||||
* stored in a ``contextvars.ContextVar`` so that downstream LLM emit
|
||||
paths can stamp ``LLMStreamChunkEvent.run_id`` automatically, and
|
||||
* passed to the stream handler so it only accepts events with a
|
||||
matching ``run_id``, preventing cross-run chunk contamination.
|
||||
|
||||
If the current context already carries a ``run_id`` (e.g. a parent
|
||||
flow already created a streaming state), the existing value is reused
|
||||
so that nested streaming (flow → crew) shares the same scope.
|
||||
|
||||
Args:
|
||||
current_task_info: Task context info.
|
||||
result_holder: List to hold the final result.
|
||||
@@ -225,9 +195,6 @@ def create_streaming_state(
|
||||
Returns:
|
||||
Initialized StreamingState with registered handler.
|
||||
"""
|
||||
run_id = _current_stream_run_id.get() or str(uuid.uuid4())
|
||||
_current_stream_run_id.set(run_id)
|
||||
|
||||
sync_queue: queue.Queue[StreamChunk | None | Exception] = queue.Queue()
|
||||
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None
|
||||
loop: asyncio.AbstractEventLoop | None = None
|
||||
@@ -236,9 +203,7 @@ def create_streaming_state(
|
||||
async_queue = asyncio.Queue()
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
handler = _create_stream_handler(
|
||||
current_task_info, sync_queue, async_queue, loop, run_id=run_id
|
||||
)
|
||||
handler = _create_stream_handler(current_task_info, sync_queue, async_queue, loop)
|
||||
crewai_event_bus.register_handler(LLMStreamChunkEvent, handler)
|
||||
|
||||
return StreamingState(
|
||||
|
||||
12
lib/crewai/src/crewai/utilities/version.py
Normal file
12
lib/crewai/src/crewai/utilities/version.py
Normal file
@@ -0,0 +1,12 @@
|
||||
"""Version utilities for crewAI."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from functools import cache
|
||||
import importlib.metadata
|
||||
|
||||
|
||||
@cache
|
||||
def get_crewai_version() -> str:
|
||||
"""Get the installed crewAI version string."""
|
||||
return importlib.metadata.version("crewai")
|
||||
@@ -174,3 +174,51 @@ class TestEmitCallCompletedEventPassesUsage:
|
||||
event = mock_emit.call_args[1]["event"]
|
||||
assert isinstance(event, LLMCallCompletedEvent)
|
||||
assert event.usage is None
|
||||
|
||||
class TestUsageMetricsNewFields:
|
||||
def test_add_usage_metrics_aggregates_reasoning_and_cache_creation(self):
|
||||
from crewai.types.usage_metrics import UsageMetrics
|
||||
|
||||
metrics1 = UsageMetrics(
|
||||
total_tokens=100,
|
||||
prompt_tokens=60,
|
||||
completion_tokens=40,
|
||||
cached_prompt_tokens=10,
|
||||
reasoning_tokens=15,
|
||||
cache_creation_tokens=5,
|
||||
successful_requests=1,
|
||||
)
|
||||
metrics2 = UsageMetrics(
|
||||
total_tokens=200,
|
||||
prompt_tokens=120,
|
||||
completion_tokens=80,
|
||||
cached_prompt_tokens=20,
|
||||
reasoning_tokens=25,
|
||||
cache_creation_tokens=10,
|
||||
successful_requests=1,
|
||||
)
|
||||
|
||||
metrics1.add_usage_metrics(metrics2)
|
||||
|
||||
assert metrics1.total_tokens == 300
|
||||
assert metrics1.prompt_tokens == 180
|
||||
assert metrics1.completion_tokens == 120
|
||||
assert metrics1.cached_prompt_tokens == 30
|
||||
assert metrics1.reasoning_tokens == 40
|
||||
assert metrics1.cache_creation_tokens == 15
|
||||
assert metrics1.successful_requests == 2
|
||||
|
||||
def test_new_fields_default_to_zero(self):
|
||||
from crewai.types.usage_metrics import UsageMetrics
|
||||
|
||||
metrics = UsageMetrics()
|
||||
assert metrics.reasoning_tokens == 0
|
||||
assert metrics.cache_creation_tokens == 0
|
||||
|
||||
def test_model_dump_includes_new_fields(self):
|
||||
from crewai.types.usage_metrics import UsageMetrics
|
||||
|
||||
metrics = UsageMetrics(reasoning_tokens=10, cache_creation_tokens=5)
|
||||
dumped = metrics.model_dump()
|
||||
assert dumped["reasoning_tokens"] == 10
|
||||
assert dumped["cache_creation_tokens"] == 5
|
||||
|
||||
@@ -1463,3 +1463,45 @@ def test_tool_search_saves_input_tokens():
|
||||
f"Expected tool_search ({usage_search.prompt_tokens}) to use fewer input tokens "
|
||||
f"than no search ({usage_no_search.prompt_tokens})"
|
||||
)
|
||||
|
||||
|
||||
def test_anthropic_cache_creation_tokens_extraction():
|
||||
"""Test that cache_creation_input_tokens are extracted from Anthropic responses."""
|
||||
llm = LLM(model="anthropic/claude-3-5-sonnet-20241022")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = [MagicMock(text="test response")]
|
||||
mock_response.usage = MagicMock(
|
||||
input_tokens=100,
|
||||
output_tokens=50,
|
||||
cache_read_input_tokens=30,
|
||||
cache_creation_input_tokens=20,
|
||||
)
|
||||
mock_response.stop_reason = None
|
||||
mock_response.model = None
|
||||
|
||||
usage = llm._extract_anthropic_token_usage(mock_response)
|
||||
assert usage["input_tokens"] == 100
|
||||
assert usage["output_tokens"] == 50
|
||||
assert usage["total_tokens"] == 150
|
||||
assert usage["cached_prompt_tokens"] == 30
|
||||
assert usage["cache_creation_tokens"] == 20
|
||||
|
||||
|
||||
def test_anthropic_missing_cache_fields_default_to_zero():
|
||||
"""Test that missing cache fields default to zero."""
|
||||
llm = LLM(model="anthropic/claude-3-5-sonnet-20241022")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = [MagicMock(text="test response")]
|
||||
mock_response.usage = MagicMock(
|
||||
input_tokens=40,
|
||||
output_tokens=20,
|
||||
spec=["input_tokens", "output_tokens"],
|
||||
)
|
||||
mock_response.usage.cache_read_input_tokens = None
|
||||
mock_response.usage.cache_creation_input_tokens = None
|
||||
|
||||
usage = llm._extract_anthropic_token_usage(mock_response)
|
||||
assert usage["cached_prompt_tokens"] == 0
|
||||
assert usage["cache_creation_tokens"] == 0
|
||||
|
||||
@@ -1403,3 +1403,44 @@ def test_azure_stop_words_still_applied_to_regular_responses():
|
||||
assert "Observation:" not in result
|
||||
assert "Found results" not in result
|
||||
assert "I need to search for more information" in result
|
||||
|
||||
|
||||
def test_azure_reasoning_tokens_and_cached_tokens():
|
||||
"""Test that reasoning_tokens and cached_tokens are extracted from Azure responses."""
|
||||
llm = LLM(model="azure/gpt-4")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.usage = MagicMock(
|
||||
prompt_tokens=100,
|
||||
completion_tokens=200,
|
||||
total_tokens=300,
|
||||
)
|
||||
mock_response.usage.prompt_tokens_details = MagicMock(cached_tokens=40)
|
||||
mock_response.usage.completion_tokens_details = MagicMock(reasoning_tokens=60)
|
||||
|
||||
usage = llm._extract_azure_token_usage(mock_response)
|
||||
assert usage["prompt_tokens"] == 100
|
||||
assert usage["completion_tokens"] == 200
|
||||
assert usage["total_tokens"] == 300
|
||||
assert usage["cached_prompt_tokens"] == 40
|
||||
assert usage["reasoning_tokens"] == 60
|
||||
|
||||
|
||||
def test_azure_no_detail_fields():
|
||||
"""Test Azure extraction without detail fields."""
|
||||
llm = LLM(model="azure/gpt-4")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.usage = MagicMock(
|
||||
prompt_tokens=50,
|
||||
completion_tokens=30,
|
||||
total_tokens=80,
|
||||
)
|
||||
mock_response.usage.prompt_tokens_details = None
|
||||
mock_response.usage.completion_tokens_details = None
|
||||
|
||||
usage = llm._extract_azure_token_usage(mock_response)
|
||||
assert usage["prompt_tokens"] == 50
|
||||
assert usage["completion_tokens"] == 30
|
||||
assert usage["cached_prompt_tokens"] == 0
|
||||
assert usage["reasoning_tokens"] == 0
|
||||
|
||||
@@ -1175,3 +1175,81 @@ def test_bedrock_tool_results_not_merged_across_assistant_messages():
|
||||
)
|
||||
assert tool_result_messages[0]["content"][0]["toolResult"]["toolUseId"] == "call_a"
|
||||
assert tool_result_messages[1]["content"][0]["toolResult"]["toolUseId"] == "call_b"
|
||||
|
||||
|
||||
def test_bedrock_cached_token_tracking():
|
||||
"""Test that cached tokens (cacheReadInputTokenCount) are tracked for Bedrock."""
|
||||
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
|
||||
|
||||
with patch.object(llm._client, 'converse') as mock_converse:
|
||||
mock_response = {
|
||||
'output': {
|
||||
'message': {
|
||||
'role': 'assistant',
|
||||
'content': [{'text': 'test response'}]
|
||||
}
|
||||
},
|
||||
'usage': {
|
||||
'inputTokens': 100,
|
||||
'outputTokens': 50,
|
||||
'totalTokens': 150,
|
||||
'cacheReadInputTokenCount': 30,
|
||||
}
|
||||
}
|
||||
mock_converse.return_value = mock_response
|
||||
|
||||
result = llm.call("Hello")
|
||||
assert result == "test response"
|
||||
assert llm._token_usage['prompt_tokens'] == 100
|
||||
assert llm._token_usage['completion_tokens'] == 50
|
||||
assert llm._token_usage['total_tokens'] == 150
|
||||
assert llm._token_usage['cached_prompt_tokens'] == 30
|
||||
|
||||
|
||||
def test_bedrock_cached_token_alternate_key():
|
||||
"""Test that the alternate key cacheReadInputTokens also works."""
|
||||
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
|
||||
|
||||
with patch.object(llm._client, 'converse') as mock_converse:
|
||||
mock_response = {
|
||||
'output': {
|
||||
'message': {
|
||||
'role': 'assistant',
|
||||
'content': [{'text': 'test response'}]
|
||||
}
|
||||
},
|
||||
'usage': {
|
||||
'inputTokens': 80,
|
||||
'outputTokens': 40,
|
||||
'totalTokens': 120,
|
||||
'cacheReadInputTokens': 25,
|
||||
}
|
||||
}
|
||||
mock_converse.return_value = mock_response
|
||||
|
||||
llm.call("Hello")
|
||||
assert llm._token_usage['cached_prompt_tokens'] == 25
|
||||
|
||||
|
||||
def test_bedrock_no_cache_tokens_defaults_to_zero():
|
||||
"""Test that missing cache token keys default to zero."""
|
||||
llm = LLM(model="bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0")
|
||||
|
||||
with patch.object(llm._client, 'converse') as mock_converse:
|
||||
mock_response = {
|
||||
'output': {
|
||||
'message': {
|
||||
'role': 'assistant',
|
||||
'content': [{'text': 'test response'}]
|
||||
}
|
||||
},
|
||||
'usage': {
|
||||
'inputTokens': 60,
|
||||
'outputTokens': 30,
|
||||
'totalTokens': 90,
|
||||
}
|
||||
}
|
||||
mock_converse.return_value = mock_response
|
||||
|
||||
llm.call("Hello")
|
||||
assert llm._token_usage['cached_prompt_tokens'] == 0
|
||||
|
||||
@@ -1190,3 +1190,42 @@ def test_gemini_cached_prompt_tokens_with_tools():
|
||||
# cached_prompt_tokens should be populated (may be 0 if Gemini
|
||||
# doesn't cache for this particular request, but the field should exist)
|
||||
assert usage.cached_prompt_tokens >= 0
|
||||
|
||||
|
||||
def test_gemini_reasoning_tokens_extraction():
|
||||
"""Test that thoughts_token_count is extracted as reasoning_tokens from Gemini."""
|
||||
llm = LLM(model="google/gemini-2.0-flash-001")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.usage_metadata = MagicMock(
|
||||
prompt_token_count=100,
|
||||
candidates_token_count=50,
|
||||
total_token_count=150,
|
||||
cached_content_token_count=10,
|
||||
thoughts_token_count=30,
|
||||
)
|
||||
usage = llm._extract_token_usage(mock_response)
|
||||
assert usage["prompt_token_count"] == 100
|
||||
assert usage["candidates_token_count"] == 50
|
||||
assert usage["total_tokens"] == 150
|
||||
assert usage["cached_prompt_tokens"] == 10
|
||||
assert usage["reasoning_tokens"] == 30
|
||||
|
||||
|
||||
def test_gemini_no_thinking_tokens_defaults_to_zero():
|
||||
"""Test that missing thoughts_token_count defaults to zero."""
|
||||
llm = LLM(model="google/gemini-2.0-flash-001")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.usage_metadata = MagicMock(
|
||||
prompt_token_count=80,
|
||||
candidates_token_count=40,
|
||||
total_token_count=120,
|
||||
cached_content_token_count=0,
|
||||
thoughts_token_count=None,
|
||||
)
|
||||
mock_response.candidates = []
|
||||
|
||||
usage = llm._extract_token_usage(mock_response)
|
||||
assert usage["reasoning_tokens"] == 0
|
||||
assert usage["cached_prompt_tokens"] == 0
|
||||
|
||||
@@ -1929,6 +1929,47 @@ def test_openai_streaming_returns_tool_calls_without_available_functions():
|
||||
assert result[0]["type"] == "function"
|
||||
|
||||
|
||||
def test_openai_responses_api_reasoning_tokens_extraction():
|
||||
"""Test that reasoning_tokens are extracted from Responses API responses."""
|
||||
llm = LLM(model="openai/gpt-4o")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.usage = MagicMock(
|
||||
input_tokens=100,
|
||||
output_tokens=200,
|
||||
total_tokens=300,
|
||||
)
|
||||
mock_response.usage.input_tokens_details = MagicMock(cached_tokens=25)
|
||||
mock_response.usage.output_tokens_details = MagicMock(reasoning_tokens=80)
|
||||
|
||||
usage = llm._extract_responses_token_usage(mock_response)
|
||||
assert usage["prompt_tokens"] == 100
|
||||
assert usage["completion_tokens"] == 200
|
||||
assert usage["total_tokens"] == 300
|
||||
assert usage["cached_prompt_tokens"] == 25
|
||||
assert usage["reasoning_tokens"] == 80
|
||||
|
||||
|
||||
def test_openai_responses_api_no_detail_fields_omitted():
|
||||
"""Test that reasoning/cached fields are omitted when Responses API details are absent."""
|
||||
llm = LLM(model="openai/gpt-4o")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.usage = MagicMock(
|
||||
input_tokens=50,
|
||||
output_tokens=30,
|
||||
total_tokens=80,
|
||||
)
|
||||
mock_response.usage.input_tokens_details = None
|
||||
mock_response.usage.output_tokens_details = None
|
||||
|
||||
usage = llm._extract_responses_token_usage(mock_response)
|
||||
assert usage["prompt_tokens"] == 50
|
||||
assert usage["completion_tokens"] == 30
|
||||
assert "cached_prompt_tokens" not in usage
|
||||
assert "reasoning_tokens" not in usage
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_openai_async_streaming_returns_tool_calls_without_available_functions():
|
||||
"""Test that async streaming returns tool calls list when available_functions is None.
|
||||
@@ -2018,3 +2059,44 @@ async def test_openai_async_streaming_returns_tool_calls_without_available_funct
|
||||
assert result[0]["function"]["arguments"] == '{"expression": "1+1"}'
|
||||
assert result[0]["id"] == "call_abc123"
|
||||
assert result[0]["type"] == "function"
|
||||
|
||||
|
||||
def test_openai_reasoning_tokens_extraction():
|
||||
"""Test that reasoning_tokens are extracted from OpenAI o-series responses."""
|
||||
llm = LLM(model="openai/gpt-4o")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.usage = MagicMock(
|
||||
prompt_tokens=100,
|
||||
completion_tokens=200,
|
||||
total_tokens=300,
|
||||
)
|
||||
mock_response.usage.prompt_tokens_details = MagicMock(cached_tokens=25)
|
||||
mock_response.usage.completion_tokens_details = MagicMock(reasoning_tokens=80)
|
||||
|
||||
usage = llm._extract_openai_token_usage(mock_response)
|
||||
assert usage["prompt_tokens"] == 100
|
||||
assert usage["completion_tokens"] == 200
|
||||
assert usage["total_tokens"] == 300
|
||||
assert usage["cached_prompt_tokens"] == 25
|
||||
assert usage["reasoning_tokens"] == 80
|
||||
|
||||
|
||||
def test_openai_no_detail_fields_omitted():
|
||||
"""Test that reasoning/cached fields are omitted when details are absent."""
|
||||
llm = LLM(model="openai/gpt-4o")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.usage = MagicMock(
|
||||
prompt_tokens=50,
|
||||
completion_tokens=30,
|
||||
total_tokens=80,
|
||||
)
|
||||
mock_response.usage.prompt_tokens_details = None
|
||||
mock_response.usage.completion_tokens_details = None
|
||||
|
||||
usage = llm._extract_openai_token_usage(mock_response)
|
||||
assert usage["prompt_tokens"] == 50
|
||||
assert usage["completion_tokens"] == 30
|
||||
assert "cached_prompt_tokens" not in usage
|
||||
assert "reasoning_tokens" not in usage
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
"""Tests for CheckpointConfig, checkpoint listener, and pruning."""
|
||||
"""Tests for CheckpointConfig, checkpoint listener, pruning, and forking."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import tempfile
|
||||
import time
|
||||
from typing import Any
|
||||
@@ -21,6 +23,8 @@ from crewai.state.checkpoint_listener import (
|
||||
_SENTINEL,
|
||||
)
|
||||
from crewai.state.provider.json_provider import JsonProvider
|
||||
from crewai.state.provider.sqlite_provider import SqliteProvider
|
||||
from crewai.state.runtime import RuntimeState
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
@@ -116,35 +120,41 @@ class TestFindCheckpoint:
|
||||
class TestPrune:
|
||||
def test_prune_keeps_newest(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
branch_dir = os.path.join(d, "main")
|
||||
os.makedirs(branch_dir)
|
||||
for i in range(5):
|
||||
path = os.path.join(d, f"cp_{i}.json")
|
||||
path = os.path.join(branch_dir, f"cp_{i}.json")
|
||||
with open(path, "w") as f:
|
||||
f.write("{}")
|
||||
# Ensure distinct mtime
|
||||
time.sleep(0.01)
|
||||
|
||||
JsonProvider().prune(d, max_keep=2)
|
||||
remaining = os.listdir(d)
|
||||
JsonProvider().prune(d, max_keep=2, branch="main")
|
||||
remaining = os.listdir(branch_dir)
|
||||
assert len(remaining) == 2
|
||||
assert "cp_3.json" in remaining
|
||||
assert "cp_4.json" in remaining
|
||||
|
||||
def test_prune_zero_removes_all(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
branch_dir = os.path.join(d, "main")
|
||||
os.makedirs(branch_dir)
|
||||
for i in range(3):
|
||||
with open(os.path.join(d, f"cp_{i}.json"), "w") as f:
|
||||
with open(os.path.join(branch_dir, f"cp_{i}.json"), "w") as f:
|
||||
f.write("{}")
|
||||
|
||||
JsonProvider().prune(d, max_keep=0)
|
||||
assert os.listdir(d) == []
|
||||
JsonProvider().prune(d, max_keep=0, branch="main")
|
||||
assert os.listdir(branch_dir) == []
|
||||
|
||||
def test_prune_more_than_existing(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
with open(os.path.join(d, "cp.json"), "w") as f:
|
||||
branch_dir = os.path.join(d, "main")
|
||||
os.makedirs(branch_dir)
|
||||
with open(os.path.join(branch_dir, "cp.json"), "w") as f:
|
||||
f.write("{}")
|
||||
|
||||
JsonProvider().prune(d, max_keep=10)
|
||||
assert len(os.listdir(d)) == 1
|
||||
JsonProvider().prune(d, max_keep=10, branch="main")
|
||||
assert len(os.listdir(branch_dir)) == 1
|
||||
|
||||
|
||||
# ---------- CheckpointConfig ----------
|
||||
@@ -162,8 +172,368 @@ class TestCheckpointConfig:
|
||||
cfg = CheckpointConfig(on_events=["*"])
|
||||
assert cfg.trigger_all
|
||||
|
||||
def test_restore_from_field(self) -> None:
|
||||
cfg = CheckpointConfig(restore_from="/path/to/checkpoint.json")
|
||||
assert cfg.restore_from == "/path/to/checkpoint.json"
|
||||
|
||||
def test_restore_from_default_none(self) -> None:
|
||||
cfg = CheckpointConfig()
|
||||
assert cfg.restore_from is None
|
||||
|
||||
def test_trigger_events(self) -> None:
|
||||
cfg = CheckpointConfig(
|
||||
on_events=["task_completed", "crew_kickoff_completed"]
|
||||
)
|
||||
assert cfg.trigger_events == {"task_completed", "crew_kickoff_completed"}
|
||||
|
||||
|
||||
# ---------- RuntimeState lineage ----------
|
||||
|
||||
|
||||
class TestRuntimeStateLineage:
|
||||
def _make_state(self) -> RuntimeState:
|
||||
from crewai import Agent, Crew
|
||||
|
||||
agent = Agent(role="r", goal="g", backstory="b", llm="gpt-4o-mini")
|
||||
crew = Crew(agents=[agent], tasks=[], verbose=False)
|
||||
return RuntimeState(root=[crew])
|
||||
|
||||
def test_default_lineage_fields(self) -> None:
|
||||
state = self._make_state()
|
||||
assert state._checkpoint_id is None
|
||||
assert state._parent_id is None
|
||||
assert state._branch == "main"
|
||||
|
||||
def test_serialize_includes_version(self) -> None:
|
||||
from crewai.utilities.version import get_crewai_version
|
||||
|
||||
state = self._make_state()
|
||||
dumped = json.loads(state.model_dump_json())
|
||||
assert dumped["crewai_version"] == get_crewai_version()
|
||||
|
||||
def test_deserialize_migrates_on_version_mismatch(self, caplog: Any) -> None:
|
||||
import logging
|
||||
|
||||
state = self._make_state()
|
||||
raw = state.model_dump_json()
|
||||
data = json.loads(raw)
|
||||
data["crewai_version"] = "0.1.0"
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
RuntimeState.model_validate_json(
|
||||
json.dumps(data), context={"from_checkpoint": True}
|
||||
)
|
||||
assert "Migrating checkpoint from crewAI 0.1.0" in caplog.text
|
||||
|
||||
def test_deserialize_warns_on_missing_version(self, caplog: Any) -> None:
|
||||
import logging
|
||||
|
||||
state = self._make_state()
|
||||
raw = state.model_dump_json()
|
||||
data = json.loads(raw)
|
||||
data.pop("crewai_version", None)
|
||||
with caplog.at_level(logging.WARNING):
|
||||
RuntimeState.model_validate_json(
|
||||
json.dumps(data), context={"from_checkpoint": True}
|
||||
)
|
||||
assert "treating as 0.0.0" in caplog.text
|
||||
|
||||
def test_serialize_includes_lineage(self) -> None:
|
||||
state = self._make_state()
|
||||
state._parent_id = "parent456"
|
||||
state._branch = "experiment"
|
||||
dumped = json.loads(state.model_dump_json())
|
||||
assert dumped["parent_id"] == "parent456"
|
||||
assert dumped["branch"] == "experiment"
|
||||
assert "checkpoint_id" not in dumped
|
||||
|
||||
def test_deserialize_restores_lineage(self) -> None:
|
||||
state = self._make_state()
|
||||
state._parent_id = "parent456"
|
||||
state._branch = "experiment"
|
||||
raw = state.model_dump_json()
|
||||
restored = RuntimeState.model_validate_json(
|
||||
raw, context={"from_checkpoint": True}
|
||||
)
|
||||
assert restored._parent_id == "parent456"
|
||||
assert restored._branch == "experiment"
|
||||
|
||||
def test_deserialize_defaults_missing_lineage(self) -> None:
|
||||
state = self._make_state()
|
||||
raw = state.model_dump_json()
|
||||
data = json.loads(raw)
|
||||
data.pop("parent_id", None)
|
||||
data.pop("branch", None)
|
||||
restored = RuntimeState.model_validate_json(
|
||||
json.dumps(data), context={"from_checkpoint": True}
|
||||
)
|
||||
assert restored._parent_id is None
|
||||
assert restored._branch == "main"
|
||||
|
||||
def test_from_checkpoint_sets_checkpoint_id(self) -> None:
|
||||
"""from_checkpoint sets _checkpoint_id from the location, not the blob."""
|
||||
state = self._make_state()
|
||||
state._provider = JsonProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
loc = state.checkpoint(d)
|
||||
written_id = state._checkpoint_id
|
||||
|
||||
cfg = CheckpointConfig(restore_from=loc)
|
||||
restored = RuntimeState.from_checkpoint(
|
||||
cfg, context={"from_checkpoint": True}
|
||||
)
|
||||
assert restored._checkpoint_id == written_id
|
||||
assert restored._parent_id == written_id
|
||||
|
||||
def test_fork_sets_branch(self) -> None:
|
||||
state = self._make_state()
|
||||
state._checkpoint_id = "abc12345"
|
||||
state._parent_id = "abc12345"
|
||||
state.fork("my-experiment")
|
||||
assert state._branch == "my-experiment"
|
||||
assert state._parent_id == "abc12345"
|
||||
|
||||
def test_fork_auto_branch(self) -> None:
|
||||
state = self._make_state()
|
||||
state._checkpoint_id = "20260409T120000_abc12345"
|
||||
state.fork()
|
||||
assert state._branch.startswith("fork/20260409T120000_abc12345_")
|
||||
assert len(state._branch) == len("fork/20260409T120000_abc12345_") + 6
|
||||
|
||||
def test_fork_no_checkpoint_id_unique(self) -> None:
|
||||
state = self._make_state()
|
||||
state.fork()
|
||||
assert state._branch.startswith("fork/")
|
||||
assert len(state._branch) == len("fork/") + 8
|
||||
# Two forks without checkpoint_id produce different branches
|
||||
first = state._branch
|
||||
state.fork()
|
||||
assert state._branch != first
|
||||
|
||||
|
||||
# ---------- JsonProvider forking ----------
|
||||
|
||||
|
||||
class TestJsonProviderFork:
|
||||
def test_checkpoint_writes_to_branch_subdir(self) -> None:
|
||||
provider = JsonProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
path = provider.checkpoint("{}", d, branch="main")
|
||||
assert "/main/" in path
|
||||
assert path.endswith(".json")
|
||||
assert os.path.isfile(path)
|
||||
|
||||
def test_checkpoint_fork_branch_subdir(self) -> None:
|
||||
provider = JsonProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
path = provider.checkpoint("{}", d, branch="fork/exp1")
|
||||
assert "/fork/exp1/" in path
|
||||
assert os.path.isfile(path)
|
||||
|
||||
def test_prune_branch_aware(self) -> None:
|
||||
provider = JsonProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
# Write 3 checkpoints on main, 2 on fork
|
||||
for _ in range(3):
|
||||
provider.checkpoint("{}", d, branch="main")
|
||||
time.sleep(0.01)
|
||||
for _ in range(2):
|
||||
provider.checkpoint("{}", d, branch="fork/a")
|
||||
time.sleep(0.01)
|
||||
|
||||
# Prune main to 1
|
||||
provider.prune(d, max_keep=1, branch="main")
|
||||
|
||||
main_dir = os.path.join(d, "main")
|
||||
fork_dir = os.path.join(d, "fork", "a")
|
||||
assert len(os.listdir(main_dir)) == 1
|
||||
assert len(os.listdir(fork_dir)) == 2 # untouched
|
||||
|
||||
def test_extract_id(self) -> None:
|
||||
provider = JsonProvider()
|
||||
assert provider.extract_id("/dir/main/20260409T120000_abc12345_p-none.json") == "20260409T120000_abc12345"
|
||||
assert provider.extract_id("/dir/main/20260409T120000_abc12345_p-20260409T115900_def67890.json") == "20260409T120000_abc12345"
|
||||
|
||||
def test_branch_traversal_rejected(self) -> None:
|
||||
provider = JsonProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
with pytest.raises(ValueError, match="escapes checkpoint directory"):
|
||||
provider.checkpoint("{}", d, branch="../../etc")
|
||||
with pytest.raises(ValueError, match="escapes checkpoint directory"):
|
||||
provider.prune(d, max_keep=1, branch="../../etc")
|
||||
|
||||
def test_filename_encodes_parent_id(self) -> None:
|
||||
provider = JsonProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
# First checkpoint — no parent
|
||||
path1 = provider.checkpoint("{}", d, branch="main")
|
||||
assert "_p-none.json" in path1
|
||||
|
||||
# Second checkpoint — with parent
|
||||
id1 = provider.extract_id(path1)
|
||||
path2 = provider.checkpoint("{}", d, parent_id=id1, branch="main")
|
||||
assert f"_p-{id1}.json" in path2
|
||||
|
||||
def test_checkpoint_chaining(self) -> None:
|
||||
"""RuntimeState.checkpoint() chains parent_id after each write."""
|
||||
state = self._make_state()
|
||||
state._provider = JsonProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
state.checkpoint(d)
|
||||
id1 = state._checkpoint_id
|
||||
assert id1 is not None
|
||||
assert state._parent_id == id1
|
||||
|
||||
loc2 = state.checkpoint(d)
|
||||
id2 = state._checkpoint_id
|
||||
assert id2 is not None
|
||||
assert id2 != id1
|
||||
assert state._parent_id == id2
|
||||
|
||||
# Verify the second checkpoint blob has parent_id == id1
|
||||
with open(loc2) as f:
|
||||
data2 = json.loads(f.read())
|
||||
assert data2["parent_id"] == id1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_acheckpoint_chaining(self) -> None:
|
||||
"""Async checkpoint path chains lineage identically to sync."""
|
||||
state = self._make_state()
|
||||
state._provider = JsonProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
await state.acheckpoint(d)
|
||||
id1 = state._checkpoint_id
|
||||
assert id1 is not None
|
||||
|
||||
loc2 = await state.acheckpoint(d)
|
||||
id2 = state._checkpoint_id
|
||||
assert id2 != id1
|
||||
assert state._parent_id == id2
|
||||
|
||||
with open(loc2) as f:
|
||||
data2 = json.loads(f.read())
|
||||
assert data2["parent_id"] == id1
|
||||
|
||||
def _make_state(self) -> RuntimeState:
|
||||
from crewai import Agent, Crew
|
||||
|
||||
agent = Agent(role="r", goal="g", backstory="b", llm="gpt-4o-mini")
|
||||
crew = Crew(agents=[agent], tasks=[], verbose=False)
|
||||
return RuntimeState(root=[crew])
|
||||
|
||||
|
||||
# ---------- SqliteProvider forking ----------
|
||||
|
||||
|
||||
class TestSqliteProviderFork:
|
||||
def test_checkpoint_stores_branch_and_parent(self) -> None:
|
||||
provider = SqliteProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
db = os.path.join(d, "cp.db")
|
||||
loc = provider.checkpoint("{}", db, parent_id="p1", branch="exp")
|
||||
cid = provider.extract_id(loc)
|
||||
|
||||
with sqlite3.connect(db) as conn:
|
||||
row = conn.execute(
|
||||
"SELECT parent_id, branch FROM checkpoints WHERE id = ?",
|
||||
(cid,),
|
||||
).fetchone()
|
||||
assert row == ("p1", "exp")
|
||||
|
||||
def test_prune_branch_aware(self) -> None:
|
||||
provider = SqliteProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
db = os.path.join(d, "cp.db")
|
||||
for _ in range(3):
|
||||
provider.checkpoint("{}", db, branch="main")
|
||||
for _ in range(2):
|
||||
provider.checkpoint("{}", db, branch="fork/a")
|
||||
|
||||
provider.prune(db, max_keep=1, branch="main")
|
||||
|
||||
with sqlite3.connect(db) as conn:
|
||||
main_count = conn.execute(
|
||||
"SELECT COUNT(*) FROM checkpoints WHERE branch = 'main'"
|
||||
).fetchone()[0]
|
||||
fork_count = conn.execute(
|
||||
"SELECT COUNT(*) FROM checkpoints WHERE branch = 'fork/a'"
|
||||
).fetchone()[0]
|
||||
assert main_count == 1
|
||||
assert fork_count == 2
|
||||
|
||||
def test_extract_id(self) -> None:
|
||||
provider = SqliteProvider()
|
||||
assert provider.extract_id("/path/to/db#abc123") == "abc123"
|
||||
|
||||
def test_checkpoint_chaining_sqlite(self) -> None:
|
||||
state = self._make_state()
|
||||
state._provider = SqliteProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
db = os.path.join(d, "cp.db")
|
||||
state.checkpoint(db)
|
||||
id1 = state._checkpoint_id
|
||||
|
||||
state.checkpoint(db)
|
||||
id2 = state._checkpoint_id
|
||||
assert id2 != id1
|
||||
|
||||
# Second row should have parent_id == id1
|
||||
with sqlite3.connect(db) as conn:
|
||||
row = conn.execute(
|
||||
"SELECT parent_id FROM checkpoints WHERE id = ?", (id2,)
|
||||
).fetchone()
|
||||
assert row[0] == id1
|
||||
|
||||
def _make_state(self) -> RuntimeState:
|
||||
from crewai import Agent, Crew
|
||||
|
||||
agent = Agent(role="r", goal="g", backstory="b", llm="gpt-4o-mini")
|
||||
crew = Crew(agents=[agent], tasks=[], verbose=False)
|
||||
return RuntimeState(root=[crew])
|
||||
|
||||
|
||||
# ---------- Kickoff from_checkpoint parameter ----------
|
||||
|
||||
|
||||
class TestKickoffFromCheckpoint:
|
||||
def test_crew_kickoff_delegates_to_from_checkpoint(self) -> None:
|
||||
mock_restored = MagicMock(spec=Crew)
|
||||
mock_restored.kickoff.return_value = "result"
|
||||
|
||||
cfg = CheckpointConfig(restore_from="/path/to/cp.json")
|
||||
with patch.object(Crew, "from_checkpoint", return_value=mock_restored):
|
||||
agent = Agent(role="r", goal="g", backstory="b", llm="gpt-4o-mini")
|
||||
crew = Crew(agents=[agent], tasks=[], verbose=False)
|
||||
result = crew.kickoff(inputs={"k": "v"}, from_checkpoint=cfg)
|
||||
|
||||
mock_restored.kickoff.assert_called_once_with(
|
||||
inputs={"k": "v"}, input_files=None
|
||||
)
|
||||
assert mock_restored.checkpoint.restore_from is None
|
||||
assert result == "result"
|
||||
|
||||
def test_crew_kickoff_config_only_sets_checkpoint(self) -> None:
|
||||
cfg = CheckpointConfig(on_events=["task_completed"])
|
||||
agent = Agent(role="r", goal="g", backstory="b", llm="gpt-4o-mini")
|
||||
crew = Crew(agents=[agent], tasks=[], verbose=False)
|
||||
assert crew.checkpoint is None
|
||||
with patch("crewai.crew.get_env_context"), \
|
||||
patch("crewai.crew.prepare_kickoff", side_effect=RuntimeError("stop")):
|
||||
with pytest.raises(RuntimeError, match="stop"):
|
||||
crew.kickoff(from_checkpoint=cfg)
|
||||
assert isinstance(crew.checkpoint, CheckpointConfig)
|
||||
assert crew.checkpoint.on_events == ["task_completed"]
|
||||
|
||||
def test_flow_kickoff_delegates_to_from_checkpoint(self) -> None:
|
||||
mock_restored = MagicMock(spec=Flow)
|
||||
mock_restored.kickoff.return_value = "flow_result"
|
||||
|
||||
cfg = CheckpointConfig(restore_from="/path/to/flow_cp.json")
|
||||
with patch.object(Flow, "from_checkpoint", return_value=mock_restored):
|
||||
flow = Flow()
|
||||
result = flow.kickoff(from_checkpoint=cfg)
|
||||
|
||||
mock_restored.kickoff.assert_called_once_with(
|
||||
inputs=None, input_files=None
|
||||
)
|
||||
assert mock_restored.checkpoint.restore_from is None
|
||||
assert result == "flow_result"
|
||||
|
||||
@@ -1001,6 +1001,8 @@ def test_usage_info_non_streaming_with_call():
|
||||
"completion_tokens": 0,
|
||||
"successful_requests": 0,
|
||||
"cached_prompt_tokens": 0,
|
||||
"reasoning_tokens": 0,
|
||||
"cache_creation_tokens": 0,
|
||||
}
|
||||
assert llm.stream is False
|
||||
|
||||
@@ -1025,6 +1027,8 @@ def test_usage_info_streaming_with_call():
|
||||
"completion_tokens": 0,
|
||||
"successful_requests": 0,
|
||||
"cached_prompt_tokens": 0,
|
||||
"reasoning_tokens": 0,
|
||||
"cache_creation_tokens": 0,
|
||||
}
|
||||
assert llm.stream is True
|
||||
|
||||
@@ -1056,6 +1060,8 @@ async def test_usage_info_non_streaming_with_acall():
|
||||
"completion_tokens": 0,
|
||||
"successful_requests": 0,
|
||||
"cached_prompt_tokens": 0,
|
||||
"reasoning_tokens": 0,
|
||||
"cache_creation_tokens": 0,
|
||||
}
|
||||
|
||||
with patch.object(
|
||||
@@ -1089,6 +1095,8 @@ async def test_usage_info_non_streaming_with_acall_and_stop():
|
||||
"completion_tokens": 0,
|
||||
"successful_requests": 0,
|
||||
"cached_prompt_tokens": 0,
|
||||
"reasoning_tokens": 0,
|
||||
"cache_creation_tokens": 0,
|
||||
}
|
||||
|
||||
with patch.object(
|
||||
@@ -1121,6 +1129,8 @@ async def test_usage_info_streaming_with_acall():
|
||||
"completion_tokens": 0,
|
||||
"successful_requests": 0,
|
||||
"cached_prompt_tokens": 0,
|
||||
"reasoning_tokens": 0,
|
||||
"cache_creation_tokens": 0,
|
||||
}
|
||||
|
||||
with patch.object(
|
||||
|
||||
@@ -861,265 +861,6 @@ class TestStreamingCancellation:
|
||||
assert not streaming.is_cancelled
|
||||
|
||||
|
||||
class TestStreamingRunIsolation:
|
||||
"""Tests for concurrent streaming run isolation (issue #5376).
|
||||
|
||||
The singleton event bus fans out events to all registered handlers.
|
||||
Without run_id scoping, concurrent streaming runs receive each other's
|
||||
chunks. These tests verify that the run_id filtering prevents
|
||||
cross-run chunk contamination.
|
||||
"""
|
||||
|
||||
def test_handler_ignores_events_from_different_run(self) -> None:
|
||||
"""A handler with run_id must reject events carrying a different run_id."""
|
||||
import queue as _queue
|
||||
|
||||
from crewai.utilities.streaming import _create_stream_handler, TaskInfo
|
||||
|
||||
task_info: TaskInfo = {
|
||||
"index": 0,
|
||||
"name": "task-a",
|
||||
"id": "tid-a",
|
||||
"agent_role": "Agent",
|
||||
"agent_id": "aid-a",
|
||||
}
|
||||
q: _queue.Queue[StreamChunk | None | Exception] = _queue.Queue()
|
||||
handler = _create_stream_handler(task_info, q, run_id="run-A")
|
||||
|
||||
# Event from a *different* run – must be silently dropped.
|
||||
foreign_event = LLMStreamChunkEvent(
|
||||
chunk="foreign-chunk",
|
||||
call_id="cid",
|
||||
run_id="run-B",
|
||||
)
|
||||
handler(None, foreign_event)
|
||||
assert q.empty(), "Handler must not enqueue events from a different run_id"
|
||||
|
||||
# Event from the *same* run – must be enqueued.
|
||||
own_event = LLMStreamChunkEvent(
|
||||
chunk="own-chunk",
|
||||
call_id="cid",
|
||||
run_id="run-A",
|
||||
)
|
||||
handler(None, own_event)
|
||||
assert not q.empty(), "Handler must enqueue events with matching run_id"
|
||||
item = q.get_nowait()
|
||||
assert item.content == "own-chunk"
|
||||
|
||||
def test_concurrent_streaming_states_do_not_cross_contaminate(self) -> None:
|
||||
"""Two streaming states created in separate contexts (simulating
|
||||
concurrent runs) must each receive only their own events, even
|
||||
though both handlers are registered on the same global event bus.
|
||||
"""
|
||||
import contextvars
|
||||
|
||||
from crewai.utilities.streaming import (
|
||||
create_streaming_state,
|
||||
_current_stream_run_id,
|
||||
TaskInfo,
|
||||
_unregister_handler,
|
||||
)
|
||||
|
||||
task_a: TaskInfo = {
|
||||
"index": 0,
|
||||
"name": "task-a",
|
||||
"id": "tid-a",
|
||||
"agent_role": "Agent-A",
|
||||
"agent_id": "aid-a",
|
||||
}
|
||||
task_b: TaskInfo = {
|
||||
"index": 1,
|
||||
"name": "task-b",
|
||||
"id": "tid-b",
|
||||
"agent_role": "Agent-B",
|
||||
"agent_id": "aid-b",
|
||||
}
|
||||
|
||||
def _create_in_fresh_context(
|
||||
task_info: TaskInfo,
|
||||
) -> "StreamingState":
|
||||
"""Reset the run_id contextvar and create streaming state."""
|
||||
_current_stream_run_id.set(None)
|
||||
return create_streaming_state(task_info, [])
|
||||
|
||||
# Create each streaming state in a *separate* context so they get
|
||||
# distinct run_ids (simulates truly concurrent runs).
|
||||
state_a = contextvars.copy_context().run(_create_in_fresh_context, task_a)
|
||||
state_b = contextvars.copy_context().run(_create_in_fresh_context, task_b)
|
||||
|
||||
# Extract run_ids from handler closures.
|
||||
def _get_run_id_from_handler(handler: Any) -> str | None:
|
||||
"""Extract the run_id captured in the handler closure."""
|
||||
fn = handler
|
||||
if hasattr(fn, "__wrapped__"):
|
||||
fn = fn.__wrapped__
|
||||
for cell in (fn.__closure__ or []):
|
||||
try:
|
||||
val = cell.cell_contents
|
||||
if isinstance(val, str) and len(val) == 36 and val.count("-") == 4:
|
||||
return val
|
||||
except ValueError:
|
||||
continue
|
||||
return None
|
||||
|
||||
rid_a = _get_run_id_from_handler(state_a.handler)
|
||||
rid_b = _get_run_id_from_handler(state_b.handler)
|
||||
assert rid_a is not None and rid_b is not None
|
||||
assert rid_a != rid_b, "Each streaming state must have a unique run_id"
|
||||
|
||||
# Emit events for run A.
|
||||
for i in range(3):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
LLMStreamChunkEvent(
|
||||
chunk=f"A-{i}",
|
||||
call_id="cid-a",
|
||||
run_id=rid_a,
|
||||
),
|
||||
)
|
||||
|
||||
# Emit events for run B.
|
||||
for i in range(3):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
LLMStreamChunkEvent(
|
||||
chunk=f"B-{i}",
|
||||
call_id="cid-b",
|
||||
run_id=rid_b,
|
||||
),
|
||||
)
|
||||
|
||||
# Drain queues.
|
||||
chunks_a = []
|
||||
while not state_a.sync_queue.empty():
|
||||
chunks_a.append(state_a.sync_queue.get_nowait())
|
||||
|
||||
chunks_b = []
|
||||
while not state_b.sync_queue.empty():
|
||||
chunks_b.append(state_b.sync_queue.get_nowait())
|
||||
|
||||
# Verify isolation.
|
||||
contents_a = [c.content for c in chunks_a]
|
||||
contents_b = [c.content for c in chunks_b]
|
||||
|
||||
assert contents_a == ["A-0", "A-1", "A-2"], (
|
||||
f"State A must only contain its own chunks, got {contents_a}"
|
||||
)
|
||||
assert contents_b == ["B-0", "B-1", "B-2"], (
|
||||
f"State B must only contain its own chunks, got {contents_b}"
|
||||
)
|
||||
|
||||
# No cross-contamination.
|
||||
for c in contents_a:
|
||||
assert not c.startswith("B-"), f"Run A received run B chunk: {c}"
|
||||
for c in contents_b:
|
||||
assert not c.startswith("A-"), f"Run B received run A chunk: {c}"
|
||||
|
||||
# Cleanup.
|
||||
_unregister_handler(state_a.handler)
|
||||
_unregister_handler(state_b.handler)
|
||||
|
||||
def test_concurrent_threads_isolated(self) -> None:
|
||||
"""Simulate two concurrent streaming runs in separate threads and
|
||||
verify that each collects only its own chunks.
|
||||
"""
|
||||
import contextvars
|
||||
import threading
|
||||
import time
|
||||
|
||||
from crewai.utilities.streaming import (
|
||||
create_streaming_state,
|
||||
get_current_stream_run_id,
|
||||
TaskInfo,
|
||||
_unregister_handler,
|
||||
)
|
||||
|
||||
results: dict[str, list[str]] = {"A": [], "B": []}
|
||||
errors: list[Exception] = []
|
||||
|
||||
def run_streaming(label: str, task_info: TaskInfo) -> None:
|
||||
try:
|
||||
state = create_streaming_state(task_info, [])
|
||||
run_id = get_current_stream_run_id()
|
||||
assert run_id is not None
|
||||
|
||||
# Simulate LLM emitting chunks stamped with this run's id.
|
||||
for i in range(5):
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
LLMStreamChunkEvent(
|
||||
chunk=f"{label}-{i}",
|
||||
call_id=f"cid-{label}",
|
||||
run_id=run_id,
|
||||
),
|
||||
)
|
||||
time.sleep(0.005)
|
||||
|
||||
# Drain the queue.
|
||||
while not state.sync_queue.empty():
|
||||
item = state.sync_queue.get_nowait()
|
||||
results[label].append(item.content)
|
||||
|
||||
_unregister_handler(state.handler)
|
||||
except Exception as exc:
|
||||
errors.append(exc)
|
||||
|
||||
task_a: TaskInfo = {
|
||||
"index": 0,
|
||||
"name": "task-a",
|
||||
"id": "tid-a",
|
||||
"agent_role": "Agent-A",
|
||||
"agent_id": "aid-a",
|
||||
}
|
||||
task_b: TaskInfo = {
|
||||
"index": 1,
|
||||
"name": "task-b",
|
||||
"id": "tid-b",
|
||||
"agent_role": "Agent-B",
|
||||
"agent_id": "aid-b",
|
||||
}
|
||||
|
||||
t_a = threading.Thread(target=run_streaming, args=("A", task_a))
|
||||
t_b = threading.Thread(target=run_streaming, args=("B", task_b))
|
||||
|
||||
t_a.start()
|
||||
t_b.start()
|
||||
t_a.join(timeout=10)
|
||||
t_b.join(timeout=10)
|
||||
|
||||
assert not errors, f"Threads raised errors: {errors}"
|
||||
|
||||
# Each thread must see only its own chunks.
|
||||
for c in results["A"]:
|
||||
assert c.startswith("A-"), f"Run A received foreign chunk: {c}"
|
||||
for c in results["B"]:
|
||||
assert c.startswith("B-"), f"Run B received foreign chunk: {c}"
|
||||
|
||||
assert len(results["A"]) == 5, (
|
||||
f"Run A expected 5 chunks, got {len(results['A'])}: {results['A']}"
|
||||
)
|
||||
assert len(results["B"]) == 5, (
|
||||
f"Run B expected 5 chunks, got {len(results['B'])}: {results['B']}"
|
||||
)
|
||||
|
||||
def test_run_id_stamped_on_llm_stream_chunk_event(self) -> None:
|
||||
"""Verify that LLMStreamChunkEvent accepts and stores run_id."""
|
||||
event = LLMStreamChunkEvent(
|
||||
chunk="test",
|
||||
call_id="cid",
|
||||
run_id="my-run-id",
|
||||
)
|
||||
assert event.run_id == "my-run-id"
|
||||
|
||||
def test_run_id_defaults_to_none(self) -> None:
|
||||
"""Verify that run_id defaults to None when not provided."""
|
||||
event = LLMStreamChunkEvent(
|
||||
chunk="test",
|
||||
call_id="cid",
|
||||
)
|
||||
assert event.run_id is None
|
||||
|
||||
|
||||
class TestStreamingImports:
|
||||
"""Tests for correct imports of streaming types."""
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""CrewAI development tools."""
|
||||
|
||||
__version__ = "1.14.2a1"
|
||||
__version__ = "1.14.2a2"
|
||||
|
||||
@@ -166,14 +166,14 @@ exclude-newer = "2026-04-10" # pinned for CVE-2026-39892; restore to "3 days" a
|
||||
# composio-core pins rich<14 but textual requires rich>=14.
|
||||
# onnxruntime 1.24+ dropped Python 3.10 wheels; cap it so qdrant[fastembed] resolves on 3.10.
|
||||
# fastembed 0.7.x and docling 2.63 cap pillow<12; the removed APIs don't affect them.
|
||||
# langchain-core <1.2.11 has SSRF via image_url token counting (CVE-2026-26013).
|
||||
# langchain-core <1.2.28 has GHSA-926x-3r5x-gfhw (incomplete f-string validation).
|
||||
# transformers 4.57.6 has CVE-2026-1839; force 5.4+ (docling 2.84 allows huggingface-hub>=1).
|
||||
# cryptography 46.0.6 has CVE-2026-39892; force 46.0.7+.
|
||||
override-dependencies = [
|
||||
"rich>=13.7.1",
|
||||
"onnxruntime<1.24; python_version < '3.11'",
|
||||
"pillow>=12.1.1",
|
||||
"langchain-core>=1.2.11,<2",
|
||||
"langchain-core>=1.2.28,<2",
|
||||
"urllib3>=2.6.3",
|
||||
"transformers>=5.4.0; python_version >= '3.10'",
|
||||
"cryptography>=46.0.7",
|
||||
|
||||
Reference in New Issue
Block a user