Compare commits

..

2 Commits

Author SHA1 Message Date
lorenzejay
3855076670 self improve with skills 2026-05-05 14:45:52 -07:00
lorenzejay
4af40c64f2 self improve with skills 2026-05-05 14:42:25 -07:00
340 changed files with 10263 additions and 12166 deletions

View File

@@ -5,10 +5,6 @@ on:
- cron: '0 6 * * *' # daily at 6am UTC
workflow_dispatch:
concurrency:
group: nightly-publish
cancel-in-progress: false
jobs:
check:
name: Check for new commits
@@ -22,11 +18,10 @@ jobs:
with:
fetch-depth: 0
- name: Check for recent commits
- name: Check for commits in last 24h
id: check
run: |
# 25h window absorbs cron-vs-commit timing skew at the boundary.
RECENT=$(git log --since="25 hours ago" --oneline | head -1)
RECENT=$(git log --since="24 hours ago" --oneline | head -1)
if [ -n "$RECENT" ]; then
echo "has_changes=true" >> "$GITHUB_OUTPUT"
else
@@ -43,42 +38,34 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v6
- name: Set up Python
uses: actions/setup-python@v5
with:
version: "0.11.3"
python-version: "3.12"
enable-cache: false
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Stamp nightly versions
run: |
DATE=$(date +%Y%m%d)
# All workspace packages share the same base version and are released together.
BASE=$(python -c "
import re
print(re.search(r'__version__\s*=\s*\"(.*?)\"', open('lib/crewai/src/crewai/__init__.py').read()).group(1))
")
NIGHTLY="${BASE}.dev${DATE}"
echo "Nightly version: ${NIGHTLY}"
for init_file in \
lib/crewai/src/crewai/__init__.py \
lib/crewai-core/src/crewai_core/__init__.py \
lib/crewai-tools/src/crewai_tools/__init__.py \
lib/crewai-files/src/crewai_files/__init__.py \
lib/cli/src/crewai_cli/__init__.py; do
lib/crewai-files/src/crewai_files/__init__.py; do
CURRENT=$(python -c "
import re
text = open('$init_file').read()
print(re.search(r'__version__\s*=\s*\"(.*?)\"\s*$', text, re.MULTILINE).group(1))
")
NIGHTLY="${CURRENT}.dev${DATE}"
sed -i "s/__version__ = .*/__version__ = \"${NIGHTLY}\"/" "$init_file"
echo "Stamped $init_file -> $NIGHTLY"
echo "$init_file: $CURRENT -> $NIGHTLY"
done
# Update all cross-package dependency pins to the nightly version.
sed -i "s/\"crewai==[^\"]*\"/\"crewai==${NIGHTLY}\"/" lib/crewai-tools/pyproject.toml
sed -i "s/\"crewai-core==[^\"]*\"/\"crewai-core==${NIGHTLY}\"/" lib/crewai/pyproject.toml
sed -i "s/\"crewai-cli==[^\"]*\"/\"crewai-cli==${NIGHTLY}\"/" lib/crewai/pyproject.toml
# Update cross-package dependency pins to nightly versions
sed -i "s/\"crewai-tools==[^\"]*\"/\"crewai-tools==${NIGHTLY}\"/" lib/crewai/pyproject.toml
sed -i "s/\"crewai-files==[^\"]*\"/\"crewai-files==${NIGHTLY}\"/" lib/crewai/pyproject.toml
sed -i "s/\"crewai-core==[^\"]*\"/\"crewai-core==${NIGHTLY}\"/" lib/cli/pyproject.toml
sed -i "s/\"crewai==[^\"]*\"/\"crewai==${NIGHTLY}\"/" lib/crewai-tools/pyproject.toml
echo "Updated cross-package dependency pins to ${NIGHTLY}"
- name: Build packages
@@ -98,10 +85,13 @@ jobs:
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/p/crewai
permissions:
id-token: write
contents: read
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
@@ -126,8 +116,7 @@ jobs:
continue
fi
echo "Publishing $package"
# --check-url skips files already on PyPI so manual re-runs on the same day are idempotent.
if ! uv publish --check-url https://pypi.org/simple/ "$package"; then
if ! uv publish "$package"; then
echo "Failed to publish $package"
failed=1
fi

View File

@@ -46,11 +46,9 @@ jobs:
- name: Run pip-audit
run: |
uv run pip-audit --desc --aliases --skip-editable --format json --output pip-audit-report.json \
--ignore-vuln CVE-2026-3219 \
--ignore-vuln GHSA-r374-rxx8-8654
--ignore-vuln CVE-2026-3219
# Ignored CVEs:
# CVE-2026-3219 - pip 26.0.1 (GHSA-58qw-9mgm-455v): no fix available, archive handling issue
# GHSA-r374-rxx8-8654 - paramiko 4.0.0 (SHA-1 in rsakey.py): no fix available; transitive via composio-core
# CVE-2026-3219 - pip 26.0.1 (GHSA-58qw-9mgm-455v): no fix available, archive handling issue
continue-on-error: true
- name: Display results

View File

@@ -19,7 +19,7 @@ repos:
language: system
pass_filenames: true
types: [python]
exclude: ^(lib/crewai/src/crewai/cli/templates/|lib/cli/src/crewai_cli/templates/|lib/cli/tests/|lib/crewai/tests/|lib/crewai-tools/tests/|lib/crewai-files/tests/|lib/devtools/tests/)
exclude: ^(lib/crewai/src/crewai/cli/templates/|lib/crewai/tests/|lib/crewai-tools/tests/|lib/crewai-files/tests/)
- repo: https://github.com/astral-sh/uv-pre-commit
rev: 0.11.3
hooks:

View File

@@ -54,13 +54,12 @@ _original_from_serialized_response = getattr(
)
if _original_from_serialized_response is not None:
_from_serialized: Any = _original_from_serialized_response
def _patched_from_serialized_response(
request: Any, serialized_response: Any, history: Any = None
) -> Any:
"""Patched version that ensures response._content is properly set."""
response = _from_serialized(request, serialized_response, history)
response = _original_from_serialized_response(request, serialized_response, history)
# Explicitly set _content to avoid ResponseNotRead errors
# The content was passed to the constructor but the mocked read() prevents
# proper initialization of the internal state
@@ -256,8 +255,7 @@ def vcr_cassette_dir(request: Any) -> str:
for parent in test_file.parents:
if (
parent.name
in ("crewai", "crewai-tools", "crewai-files", "cli", "crewai-core")
parent.name in ("crewai", "crewai-tools", "crewai-files")
and parent.parent.name == "lib"
):
package_root = parent

View File

@@ -26,7 +26,7 @@ mode: "wide"
</Step>
<Step title="مراقبة التقدم">
استخدم `GET /status/{kickoff_id}` للتحقق من حالة التنفيذ واسترجاع النتائج.
استخدم `GET /{kickoff_id}/status` للتحقق من حالة التنفيذ واسترجاع النتائج.
</Step>
</Steps>
@@ -65,7 +65,7 @@ https://your-crew-name.crewai.com
1. **الاكتشاف**: استدعِ `GET /inputs` لفهم ما يحتاجه طاقمك
2. **التنفيذ**: أرسل المدخلات عبر `POST /kickoff` لبدء المعالجة
3. **المراقبة**: استعلم عن `GET /status/{kickoff_id}` حتى الاكتمال
3. **المراقبة**: استعلم عن `GET /{kickoff_id}/status` حتى الاكتمال
4. **النتائج**: استخرج المخرجات النهائية من الاستجابة المكتملة
## معالجة الأخطاء

View File

@@ -1,6 +1,6 @@
---
title: "GET /status/{kickoff_id}"
title: "GET /{kickoff_id}/status"
description: "الحصول على حالة التنفيذ"
openapi: "/enterprise-api.en.yaml GET /status/{kickoff_id}"
openapi: "/enterprise-api.en.yaml GET /{kickoff_id}/status"
mode: "wide"
---

View File

@@ -4,80 +4,6 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="13 مايو 2026">
## v1.14.5a5
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a5)
## ما الذي تغير
### الميزات
- إلغاء استخدام CrewAgentExecutor، وتعيين وكلاء Crew الافتراضيين إلى AgentExecutor
- تحسين أدوات صندوق الرمل Daytona
### إصلاحات الأخطاء
- إصلاح كتلة الكود المفقودة في دليل التدفق الأول باللغة البرتغالية (pt-BR)
- تسجيل أخطاء المراجعة المسبقة والتقطير HITL، إضافة learn_strict
- تصحيح urllib3 للثغرات الأمنية
- تصحيح gitpython و langchain-core؛ تجاهل CVE paramiko غير المصححة
- تحديث جميع حزم مساحة العمل المنشورة على uv lock/sync
### الوثائق
- إضافة دليل ترحيل لـ `inputs.id` إلى `restoreFromStateId`
- إضافة دليل ترقية OSS ودليل ترحيل crew-to-flow
- تحديث سجل التغييرات والإصدار لـ v1.14.5a4
## المساهمون
@akaKuruma, @greysonlalonde, @iris-clawd, @lorenzejay, @mislavivanda
</Update>
<Update label="9 مايو 2026">
## v1.14.5a4
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a4)
## ما الذي تغير
### الميزات
- تحديث قوائم LLM
### إصلاحات الأخطاء
- إصلاح مشكلة الاعتماد من خلال نقل `textual` إلى `crewai-cli` وإضافة `certifi`
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.5a3
## المساهمون
@cgoeppinger, @greysonlalonde
</Update>
<Update label="7 مايو 2026">
## v1.14.5a3
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a3)
## ما الذي تغير
### إصلاحات الأخطاء
- إصلاح مسار نقطة النهاية للحالة من /{kickoff_id}/status إلى /status/{kickoff_id}
- تحديث تبعية gitpython إلى الإصدار >=3.1.47 للامتثال الأمني
### إعادة هيكلة
- استخراج واجهة سطر الأوامر إلى حزمة crewai-cli المستقلة
### الوثائق
- تحديث سجل التغييرات والإصدار للإصدار v1.14.5a2
## المساهمون
@greysonlalonde, @iris-clawd
</Update>
<Update label="4 مايو 2026">
## v1.14.5a2

View File

@@ -29,7 +29,6 @@ from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
load_dotenv()
class ExampleFlow(Flow):
model = "gpt-4o-mini"

View File

@@ -1,102 +0,0 @@
---
title: "الانتقال من inputs.id إلى restore_from_state_id"
description: "نقل تدفقات @persist من ترطيب inputs.id المهجور إلى حقل restore_from_state_id المدعوم"
icon: "arrow-right-arrow-left"
---
<Warning>
تمرير `id` داخل `inputs` لترطيب تدفق `@persist` هو **مهجور** ومقرر إزالته في إصدار مستقبلي. البديل، `restore_from_state_id`، متاح في CrewAI **v1.14.5 وما بعده** — الخطوات أدناه تنطبق بمجرد أن تقوم بالتحديث.
</Warning>
## نظرة عامة
الطريقة الموثقة لترطيب تدفق `@persist` من تنفيذ سابق هي تمرير UUID لذلك التنفيذ كـ `inputs.id`. الآن، تكشف CrewAI عن حقل مخصص، `restore_from_state_id`، الذي يقوم بنفس الترطيب دون تحميل حمولة `inputs` — ودون ربط مفتاح الترطيب بهوية التنفيذ الجديد.
## الانتقال
إذا كنت حالياً تبدأ تدفق `@persist` باستخدام `inputs={"id": ...}`:
```python
# مهجور
flow = CounterFlow()
flow.kickoff(inputs={"id": "abcd1234-5678-90ef-ghij-klmnopqrstuv"})
```
انتقل إلى `restore_from_state_id`:
```python
# مدعوم
flow = CounterFlow()
flow.kickoff(restore_from_state_id="abcd1234-5678-90ef-ghij-klmnopqrstuv")
```
تتمتع الوضعيتان بمعاني سلالة مختلفة:
- `inputs={"id": <uuid>}` (مهجور) — **استئناف**: تكتب الكتابات تحت المعرف المقدم، مما يمدد نفس تاريخ `flow_uuid`.
- `restore_from_state_id=<uuid>` — **تفرع**: يترطب الحالة من اللقطة، ثم يكتب تحت `state.id` جديدة. يتم الحفاظ على تاريخ التدفق المصدر.
لأغلب سيناريوهات الإنتاج — إعادة تشغيل تدفق تم تهيئته من حالة سابقة — فإن التفرع هو ما تريده. راجع [إتقان حالة التدفق](/ar/guides/flows/mastering-flow-state) للحصول على النموذج الذهني الكامل.
إذا كنت تبدأ تدفقك عبر واجهة برمجة تطبيقات CrewAI AMP REST، راجع [AMP](#amp) أدناه لهجرة الحمولة المعادلة.
## لماذا نقوم بإهمال `inputs.id` لـ `@persist`
`inputs.id` هو حالياً الطريقة الموثقة لاستئناف تدفق `@persist` من تنفيذ سابق. المشكلة هي أن نفس UUID يقوم بوظيفتين في وقت واحد:
1. **يحدد أي لقطة يترطب منها `@persist`** — تحميل الحالة المحفوظة تحت ذلك UUID.
2. **يصبح معرف تنفيذ التدفق الجديد** (`state.id` في SDK؛ يظهر كـ `flow_id` في بعض السياقات) — كل كتابة `@persist` من هذه البداية أيضاً تقع تحت نفس UUID.
هذه الوظيفة المزدوجة هي السبب الجذري للمشاكل التي يصفها هذا الدليل. لأن UUID المقدم هو أيضاً معرف التنفيذ الجديد، فإن بدايتين تمرران نفس `inputs.id` ليست تنفيذين متميزين — إنهما تشتركان في معرف، وتشاركان في سجل الاستمرارية، و(على AMP) تشتركان في صف في قائمة التنفيذات. لا توجد طريقة للقول "ترطب من هذه اللقطة، ولكن سجل هذا التشغيل بشكل منفصل" دون تقسيم المسؤوليتين.
`restore_from_state_id` هو هذا الانقسام. إنه يخبر `@persist` من أي لقطة يترطب، بينما يترك التنفيذ الجديد حراً لاستلام `state.id` جديدة. لم يعد مصدر الترطيب والتشغيل المسجل نفس UUID — وهو ما تريده معظم سيناريوهات الإنتاج فعلياً.
## جدول إزالة
من المقرر إزالة `inputs.id` لترطيب `@persist` في إصدار مستقبلي من CrewAI. لا يوجد قطع صارم فوري — تظل التدفقات الحالية تعمل — ولكن بمجرد أن تقوم بالتحديث إلى v1.14.5 أو ما بعده، يجب أن يستخدم الكود الجديد `restore_from_state_id`، ويجب أن تهاجر التدفقات الحالية في الفرصة المناسبة التالية.
## AMP
إذا كنت تنشر تدفقك إلى CrewAI AMP، فإن الهجرة تمتد إلى الحمولة التي تبدأ بها المرسلة إلى طاقمك المنشور، وتظهر الأعراض المرئية لإعادة استخدام `inputs.id` على لوحة معلومات النشر. تغطي القسمان الفرعيان أدناه كلاهما.
### هجرة حمولة البداية
إذا كنت حالياً تبدأ تدفقاً منشوراً عن طريق تضمين `id` في `inputs`:
```bash
# مهجور
curl -X POST \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_CREW_TOKEN" \
-d '{"inputs": {"id": "abcd1234-5678-90ef-ghij-klmnopqrstuv", "topic": "AI Agent Frameworks"}}' \
https://your-crew-url.crewai.com/kickoff
```
نقل UUID إلى حقل `restoreFromStateId` في المستوى الأعلى:
```bash
# مدعوم
curl -X POST \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_CREW_TOKEN" \
-d '{
"inputs": {"topic": "AI Agent Frameworks"},
"restoreFromStateId": "abcd1234-5678-90ef-ghij-klmnopqrstuv"
}' \
https://your-crew-url.crewai.com/kickoff
```
يجلس `restoreFromStateId` بجانب `inputs` في حمولة البداية، وليس داخلها. الآن، يحمل كائن `inputs` فقط القيم التي تستهلكها تدفقك فعلياً.
### ماذا يحدث عند إعادة استخدام `inputs.id`
عندما تتلقى AMP بداية لتدفق يتطابق `inputs.id` الخاص به مع تنفيذ موجود، فإنه يحل إلى السجل الموجود بدلاً من إنشاء سجل جديد. من لوحة معلومات النشر سترى:
- **حالة التنفيذ** — حالة التشغيل الجديد تحل محل حالة التشغيل السابق. يمكن أن تعود تنفيذات مكتملة إلى `جارية`، أو يمكن أن تتحول تشغيلات `مكتملة` إلى `خطأ` إذا فشلت البداية الجديدة — في كلتا الحالتين، لم تعد لوحة المعلومات تعكس التشغيل الأصلي.
- **التتبع** — تتراكم تتبعات OTel عبر البدايات لأنها تشترك في نفس معرف التنفيذ؛ تتبعات التشغيل السابق إما تُستبدل بـ، أو تُخلط مع، تشغيل الجديد. لم يعد إعادة التشغيل خطوة بخطوة يتوافق مع تنفيذ واحد.
- **قائمة التنفيذات** — البدايات التي يجب أن تظهر كصفوف منفصلة تتقلص إلى إدخال واحد، مما يخفي التاريخ.
تساعد الهجرة إلى `restoreFromStateId` في الحفاظ على كل بداية كتنفيذ خاص بها — مع حالتها الخاصة، وتتبعها، وصفها في القائمة — بينما لا تزال ترطب الحالة من تشغيل سابق.
<Card title="هل تحتاج مساعدة؟" icon="headset" href="mailto:support@crewai.com">
اتصل بفريق الدعم لدينا إذا لم تكن متأكداً من أي وضع يحتاجه تدفقك أو واجهت مشاكل أثناء الهجرة.
</Card>

View File

@@ -1,190 +0,0 @@
---
title: "ترقية CrewAI"
description: "كيفية ترقية CrewAI في مشروعك والتكيّف مع التغييرات الجذرية بين الإصدارات."
icon: "arrow-up-circle"
---
## نظرة عامة
تجلب إصدارات CrewAI قدرات جديدة بانتظام. يرشدك هذا الدليل خلال الخطوات العملية للحفاظ على تثبيتك محدّثًا — سواء أداة سطر الأوامر أو البيئة الافتراضية لمشروعك.
إذا كنت تبدأ من الصفر، راجع [التثبيت](/ar/installation). إذا كنت قادمًا من إطار عمل آخر، راجع [الترحيل من LangGraph](/ar/guides/migration/migrating-from-langgraph).
---
## الشيئان اللذان قد ترغب في ترقيتهما
يوجد CrewAI في مكانين على جهازك، ويتم ترقيتهما بشكل مستقل:
| ماذا | كيف يُثبَّت | كيف تتم الترقية |
|---|---|---|
| **أداة سطر الأوامر العامة `crewai`** | `uv tool install crewai` | `uv tool install crewai --upgrade` |
| **بيئة venv للمشروع** (حيث يعمل الكود) | `crewai install` / `uv sync` | `uv add "crewai[...]>=X.Y.Z"` ثم `crewai install` |
يمكن لهما — وغالبًا ما يحدث — أن يخرجا عن التزامن. تشغيل `crewai --version` يُظهر إصدار سطر الأوامر. تشغيل `uv pip show crewai` داخل مشروعك يُظهر إصدار venv. إذا اختلفا، فهذا طبيعي؛ ما يهم بالنسبة للكود قيد التشغيل هو إصدار venv.
## لماذا لا يقوم `crewai install` وحده بالترقية
`crewai install` هو غلاف رفيع حول `uv sync`. يُثبّت بالضبط ما يقوله ملف `uv.lock` الحالي — وهو **لا** يرفع أي قيود إصدار.
إذا كان `pyproject.toml` يقول `crewai>=1.11.1` وقد قام ملف القفل بحلّه إلى `1.11.1`، فإن تشغيل `crewai install` سيُبقيك على `1.11.1` للأبد، حتى وإن كان الإصدار `1.14.4` متاحًا.
للترقية فعلًا، عليك:
1. تحديث قيد الإصدار في `pyproject.toml`
2. إعادة حلّ ملف القفل
3. مزامنة venv
`uv add` يقوم بالثلاثة في خطوة واحدة.
## كيفية ترقية مشروعك
```bash
# يرفع القيد ويعيد القفل في أمر واحد
uv add "crewai[tools]>=1.14.4"
# يزامن venv (crewai install يستدعي uv sync تحت الغطاء)
crewai install
# تحقّق
uv pip show crewai
# → Version: 1.14.4
```
استبدل `[tools]` بأي إضافات يستخدمها مشروعك (مثلًا `[tools,anthropic]`). تحقّق من قائمة `dependencies` في `pyproject.toml` إن لم تكن متأكدًا.
<Note>
يحدّث `uv add` كلا من `pyproject.toml` **و** `uv.lock` بشكل ذرّي. إذا قمت بتحرير `pyproject.toml` يدويًا، فإنك لا تزال بحاجة إلى تشغيل `uv lock --upgrade-package crewai` لإعادة حلّ ملف القفل قبل أن يلتقط `crewai install` الإصدار الجديد.
</Note>
## ترقية أداة سطر الأوامر العامة
أداة سطر الأوامر العامة منفصلة عن مشروعك. قم بترقيتها عبر:
```bash
uv tool install crewai --upgrade
```
إذا حذّرك الـ shell بشأن `PATH` بعد الترقية، قم بتحديثه:
```bash
uv tool update-shell
```
هذا **لا** يمسّ بيئة venv الخاصة بمشروعك — لا تزال بحاجة إلى `uv add` + `crewai install` داخل المشروع.
## التحقق من تزامن الاثنين
```bash
# إصدار سطر الأوامر العام
crewai --version
# إصدار venv للمشروع
uv pip show crewai | grep Version
```
ليس من الضروري أن يتطابقا — لكن إصدار venv للمشروع هو ما يهم لسلوك التشغيل.
<Note>
يتطلب CrewAI `Python >=3.10, <3.14`. إذا كان `uv` مثبَّتًا مقابل مفسّر أقدم، فأعد إنشاء venv للمشروع باستخدام إصدار Python مدعوم قبل تشغيل `crewai install`.
</Note>
---
## التغييرات الجذرية وملاحظات الترحيل
تتطلب معظم الترقيات تعديلات صغيرة فقط. المناطق أدناه هي تلك التي تنكسر بصمت أو بتتبعات مكدّس مربكة.
### مسارات الاستيراد: tools و`BaseTool`
الموقع الرسمي لاستيراد الـ tools هو `crewai.tools`. لا تزال المسارات القديمة تظهر في الدروس لكن يجب تحديثها.
```python
# قبل
from crewai_tools import BaseTool
from crewai.agents.tools import tool
# بعد
from crewai.tools import BaseTool, tool
```
كلٌ من المُزخرف `@tool` والفئة الفرعية `BaseTool` يقعان في `crewai.tools`. `AgentFinish` والرموز الأخرى الداخلية للوكيل لم تعد جزءًا من السطح العام — إذا كنت تستوردها، فانتقل إلى event listeners أو callbacks الـ `Task` بدلًا منها.
### تغييرات معاملات `Agent`
```python
from crewai import Agent
agent = Agent(
role="Researcher",
goal="Find authoritative sources on {topic}",
backstory="You are a careful, source-driven researcher.",
llm="gpt-4o-mini", # اسم نموذج كسلسلة نصية أو كائن LLM
verbose=True, # bool وليس مستوى عددي صحيح
max_iter=15, # تغيّر الافتراضي بين الإصدارات — حدّده بشكل صريح
allow_delegation=False,
)
```
- يقبل `llm` إما اسم نموذج كسلسلة نصية (يُحلَّ عبر المزوّد المهيّأ) أو كائن `LLM` للتحكم الدقيق.
- `verbose` هو `bool` بسيط. تمرير عدد صحيح لم يعد يبدّل مستويات السجل.
- تغيّرت افتراضات `max_iter` بين الإصدارات. إذا توقف وكيلك بصمت عن التكرار بعد أول استدعاء tool، فحدّد `max_iter` صراحةً.
### معاملات `Crew`
```python
from crewai import Crew, Process
crew = Crew(
agents=[...],
tasks=[...],
process=Process.sequential, # أو Process.hierarchical
memory=True,
cache=True,
embedder={"provider": "openai", "config": {"model": "text-embedding-3-small"}},
)
```
- يتطلب `process=Process.hierarchical` إما `manager_llm=` أو `manager_agent=`. بدون أحدهما، يرفع kickoff خطأً عند التحقّق.
- `memory=True` مع مزوّد embedding غير افتراضي يحتاج إلى قاموس `embedder` — راجع [إعداد الذاكرة وembedder](#memory-embedder-config) أدناه.
### الإخراج المُهيكل لـ `Task`
استخدم `output_pydantic` أو `output_json` أو `output_file` لإلزام نتيجة المهمة بشكل مكتوب الأنواع:
```python
from pydantic import BaseModel
from crewai import Task
class Article(BaseModel):
title: str
body: str
write = Task(
description="Write an article about {topic}",
expected_output="A short article with a title and body",
agent=writer,
output_pydantic=Article, # الفئة، وليس مثيلًا منها
output_file="output/article.md",
)
```
`output_pydantic` يأخذ **الفئة** نفسها. تمرير `Article(title="", body="")` خطأ شائع ويفشل بخطأ تحقّق مربك.
### إعداد الذاكرة وembedder {#memory-embedder-config}
إذا كان `memory=True` وأنت لا تستخدم embeddings الافتراضية الخاصة بـ OpenAI، فيجب أن تمرّر `embedder`:
```python
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
embedder={
"provider": "ollama",
"config": {"model": "nomic-embed-text"},
},
)
```
ضع بيانات اعتماد المزوّد المعنيّة (`OPENAI_API_KEY`, `OLLAMA_HOST`, إلخ) في ملف `.env`. مسارات تخزين الذاكرة محلية بالنسبة للمشروع افتراضيًا — احذف مجلد ذاكرة المشروع إذا غيّرت embedders، لأن الأبعاد لا تختلط.

View File

@@ -13,7 +13,7 @@ The Daytona sandbox tools give CrewAI agents access to isolated, ephemeral compu
- **`DaytonaExecTool`** — run any shell command inside a sandbox.
- **`DaytonaPythonTool`** — execute a block of Python source code inside a sandbox.
- **`DaytonaFileTool`** — read, write, append, list, delete, and inspect files inside a sandbox; also supports `move`, `find` (content grep), `search` (filename glob), `chmod` (permissions), `replace` (bulk find-and-replace), and `exists`.
- **`DaytonaFileTool`** — read, write, append, list, delete, and inspect files inside a sandbox.
All three tools share the same sandbox lifecycle controls, so you can mix and match them while keeping state in a single persistent sandbox.
@@ -55,7 +55,7 @@ from crewai_tools import DaytonaPythonTool
tool = DaytonaPythonTool()
result = tool.run(code="print(sum(range(10)))")
print(result)
# {"exit_code": 0, "result": "45\n", "artifacts": ExecutionArtifacts(stdout="45\n", charts=[])}
# {"exit_code": 0, "result": "45\n", "artifacts": None}
```
### Multi-step shell session (persistent)
@@ -63,22 +63,17 @@ print(result)
```python Code
from crewai_tools import DaytonaExecTool, DaytonaFileTool
# Create the persistent sandbox via the first tool, then attach the second
# tool to it so both share state (installed packages, files, env vars).
exec_tool = DaytonaExecTool(persistent=True)
exec_tool.run(command="pip install httpx -q")
file_tool = DaytonaFileTool(sandbox_id=exec_tool.active_sandbox_id)
file_tool = DaytonaFileTool(persistent=True)
file_tool.run(
action="write",
path="workspace/script.py",
content="import httpx; print(f'httpx loaded, version {httpx.__version__}')",
)
exec_tool.run(command="python workspace/script.py")
# Install a package, then write and run a script — all in the same sandbox
exec_tool.run(command="pip install httpx -q")
file_tool.run(action="write", path="/workspace/fetch.py", content="import httpx; print(httpx.get('https://httpbin.org/get').status_code)")
exec_tool.run(command="python /workspace/fetch.py")
```
<Note>
By default, each tool with `persistent=True` lazily creates its **own** sandbox on first use. The pattern above shares a single sandbox across multiple tools by reading the first tool's `active_sandbox_id` after a `.run()` call and passing it to the others via `sandbox_id=...`. With `persistent=False` (the default), every `.run()` call gets a fresh sandbox that's deleted at the end of that call.
Each tool instance maintains its own persistent sandbox. To share **one** sandbox across two tools, create the first tool, grab its sandbox id via `tool._persistent_sandbox.id`, and pass it to the second tool via `sandbox_id=...`.
</Note>
### Attach to an existing sandbox
@@ -87,7 +82,7 @@ By default, each tool with `persistent=True` lazily creates its **own** sandbox
from crewai_tools import DaytonaExecTool
tool = DaytonaExecTool(sandbox_id="my-long-lived-sandbox")
result = tool.run(command="ls workspace")
result = tool.run(command="ls /workspace")
```
### Custom sandbox parameters
@@ -107,41 +102,6 @@ tool = DaytonaExecTool(
)
```
### Searching, moving, and modifying files
```python Code
from crewai_tools import DaytonaFileTool
file_tool = DaytonaFileTool(persistent=True)
# Find every TODO in the source tree (grep file contents recursively)
file_tool.run(action="find", path="workspace/src", pattern="TODO:")
# Find all Python files (glob match on filenames)
file_tool.run(action="search", path="workspace", pattern="*.py")
# Make a script executable
file_tool.run(action="chmod", path="workspace/run.sh", mode="755")
# Rename or move a file
file_tool.run(
action="move",
path="workspace/draft.md",
destination="workspace/final.md",
)
# Bulk find-and-replace across multiple files
file_tool.run(
action="replace",
paths=["workspace/src/a.py", "workspace/src/b.py"],
pattern="old_function",
replacement="new_function",
)
# Quick existence check before a destructive op
file_tool.run(action="exists", path="workspace/cache.db")
```
### Agent integration
```python Code
@@ -161,7 +121,7 @@ coder = Agent(
)
task = Task(
description="Write a Python script that prints the first 10 Fibonacci numbers, save it to workspace/fib.py, and run it.",
description="Write a Python script that prints the first 10 Fibonacci numbers, save it to /workspace/fib.py, and run it.",
expected_output="The first 10 Fibonacci numbers printed to stdout.",
agent=coder,
)
@@ -208,22 +168,12 @@ All three tools accept these parameters at initialization:
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `action` | `str` | ✓ | One of: `read`, `write`, `append`, `list`, `delete`, `mkdir`, `info`, `exists`, `move`, `find`, `search`, `chmod`, `replace`. |
| `path` | `str \| None` | ✓ for all actions except `replace` | Absolute path inside the sandbox. |
| `content` | `str \| None` | ✓ for `append` | Content to write or append. |
| `action` | `str` | ✓ | One of: `read`, `write`, `append`, `list`, `delete`, `mkdir`, `info`. |
| `path` | `str` | ✓ | Absolute path inside the sandbox. |
| `content` | `str \| None` | | Content to write or append. Required for `append`. |
| `binary` | `bool` | | If `True`, `content` is base64 on write; returns base64 on read. |
| `recursive` | `bool` | | For `delete`: remove directories recursively. |
| `mode` | `str \| None` | | For `mkdir`: octal permissions for the new directory (defaults to `"0755"`). For `chmod`: octal permissions to apply to the target. |
| `destination` | `str \| None` | ✓ for `move` | Destination path for `move`. |
| `pattern` | `str \| None` | ✓ for `find`, `search`, `replace` | For `find`: substring matched against file CONTENTS. For `search`: glob matched against file NAMES (e.g. `*.py`). For `replace`: text to replace inside files. |
| `replacement` | `str \| None` | ✓ for `replace` | Replacement text for `pattern`. |
| `paths` | `list[str] \| None` | ✓ for `replace` | List of file paths in which to replace text. |
| `owner` | `str \| None` | | For `chmod`: new file owner. |
| `group` | `str \| None` | | For `chmod`: new file group. |
<Note>
For `chmod`, pass at least one of `mode`, `owner`, or `group` — any field left as `None` is left unchanged on the target.
</Note>
| `mode` | `str` | | For `mkdir`: octal permission string (default `"0755"`). |
<Tip>
For files larger than a few KB, create the file first with `action="write"` and empty content, then send the body via multiple `action="append"` calls of ~4 KB each to stay within tool-call payload limits.

File diff suppressed because it is too large Load Diff

View File

@@ -26,7 +26,7 @@ Welcome to the CrewAI AMP API reference. This API allows you to programmatically
</Step>
<Step title="Monitor Progress">
Use `GET /status/{kickoff_id}` to check execution status and retrieve results.
Use `GET /{kickoff_id}/status` to check execution status and retrieve results.
</Step>
</Steps>
@@ -65,7 +65,7 @@ Replace `your-crew-name` with your actual crew's URL from the dashboard.
1. **Discovery**: Call `GET /inputs` to understand what your crew needs
2. **Execution**: Submit inputs via `POST /kickoff` to start processing
3. **Monitoring**: Poll `GET /status/{kickoff_id}` until completion
3. **Monitoring**: Poll `GET /{kickoff_id}/status` until completion
4. **Results**: Extract the final output from the completed response
## Error Handling

View File

@@ -1,6 +1,6 @@
---
title: "GET /status/{kickoff_id}"
title: "GET /{kickoff_id}/status"
description: "Get execution status"
openapi: "/enterprise-api.en.yaml GET /status/{kickoff_id}"
openapi: "/enterprise-api.en.yaml GET /{kickoff_id}/status"
mode: "wide"
---

View File

@@ -4,80 +4,6 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="May 13, 2026">
## v1.14.5a5
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a5)
## What's Changed
### Features
- Deprecate CrewAgentExecutor, default Crew agents to AgentExecutor
- Improve Daytona sandbox tools
### Bug Fixes
- Fix missing code block in pt-BR first-flow guide
- Log HITL pre-review and distillation failures, add learn_strict
- Patch urllib3 for security vulnerabilities
- Patch gitpython and langchain-core; ignore unpatched paramiko CVE
- Refresh all published workspace packages on uv lock/sync
### Documentation
- Add migration guide for `inputs.id` to `restoreFromStateId`
- Add OSS upgrade and crew-to-flow migration guide
- Update changelog and version for v1.14.5a4
## Contributors
@akaKuruma, @greysonlalonde, @iris-clawd, @lorenzejay, @mislavivanda
</Update>
<Update label="May 09, 2026">
## v1.14.5a4
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a4)
## What's Changed
### Features
- Update LLM listings
### Bug Fixes
- Fix dependency issue by moving `textual` to `crewai-cli` and adding `certifi`
### Documentation
- Update changelog and version for v1.14.5a3
## Contributors
@cgoeppinger, @greysonlalonde
</Update>
<Update label="May 07, 2026">
## v1.14.5a3
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a3)
## What's Changed
### Bug Fixes
- Fix status endpoint path from /{kickoff_id}/status to /status/{kickoff_id}
- Bump gitpython dependency to version >=3.1.47 for security compliance
### Refactoring
- Extract CLI into standalone crewai-cli package
### Documentation
- Update changelog and version for v1.14.5a2
## Contributors
@greysonlalonde, @iris-clawd
</Update>
<Update label="May 04, 2026">
## v1.14.5a2

View File

@@ -29,7 +29,6 @@ from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
load_dotenv()
class ExampleFlow(Flow):
model = "gpt-4o-mini"

View File

@@ -1,143 +0,0 @@
---
title: "Migrating from inputs.id to restore_from_state_id"
description: "Move @persist flows off the deprecated inputs.id hydration onto the supported restore_from_state_id field"
icon: "arrow-right-arrow-left"
---
<Warning>
Passing `id` inside `inputs` to hydrate a `@persist` flow is **deprecated** and
scheduled for removal in a future release. The replacement, `restore_from_state_id`,
is available in CrewAI **v1.14.5 and later** — the steps below apply once you
upgrade.
</Warning>
## Overview
The documented way to hydrate a `@persist` flow from a previous execution is to pass
that execution's UUID as `inputs.id`. CrewAI now exposes a dedicated field,
`restore_from_state_id`, that performs the same hydration without overloading the
`inputs` payload — and without coupling the hydration key to the new execution's
identity.
## Migration
If you currently kickoff a `@persist` flow with `inputs={"id": ...}`:
```python
# Deprecated
flow = CounterFlow()
flow.kickoff(inputs={"id": "abcd1234-5678-90ef-ghij-klmnopqrstuv"})
```
Switch to `restore_from_state_id`:
```python
# Supported
flow = CounterFlow()
flow.kickoff(restore_from_state_id="abcd1234-5678-90ef-ghij-klmnopqrstuv")
```
The two modes have different lineage semantics:
- `inputs={"id": <uuid>}` (deprecated) — **resume**: writes land under the supplied
id, extending the same `flow_uuid` history.
- `restore_from_state_id=<uuid>` — **fork**: hydrates state from the snapshot, then
writes under a fresh `state.id`. The source flow's history is preserved.
For most production scenarios — re-running a flow seeded from a previous state — fork
is what you want. See [Mastering Flow State](/en/guides/flows/mastering-flow-state)
for the full mental model.
If you kickoff your flow over the CrewAI AMP REST API, see [AMP](#amp) below for the
equivalent payload migration.
## Why we are deprecating `inputs.id` for `@persist`
`inputs.id` is currently the documented way to resume a `@persist` flow from a
previous execution. The problem is that the same UUID does two jobs at once:
1. **It selects which snapshot `@persist` hydrates from** — load the state saved
under that UUID.
2. **It becomes the new execution's Flow Execution ID** (`state.id` in the SDK;
surfaced as `flow_id` in some contexts) — every `@persist` write from this
kickoff also lands under that same UUID.
This dual role is the root cause of the issues this guide describes. Because the
supplied UUID is also the new execution's id, two kickoffs that pass the same
`inputs.id` are not two distinct executions — they share an id, share a persistence
record, and (on AMP) share a row in the executions list. There is no way to say
"hydrate from this snapshot, but record this run separately" without splitting the
two responsibilities.
`restore_from_state_id` is that split. It tells `@persist` which snapshot to hydrate
from, while leaving the new execution free to receive a fresh `state.id`. The
hydration source and the recorded run are no longer the same UUID — which is what
most production scenarios actually want.
## Removal timeline
`inputs.id` for `@persist` hydration is scheduled for removal in a future release of
CrewAI. There is no immediate hard cut-off — existing flows continue to work — but
once you upgrade to v1.14.5 or later, new code should use `restore_from_state_id`, and
existing flows should migrate at the next convenient opportunity.
## AMP
If you deploy your flow to CrewAI AMP, the migration extends to the kickoff payload
sent to your deployed crew, and the visible symptoms of reusing `inputs.id` show up
on the deployment dashboard. The two subsections below cover both.
### Migrating the kickoff payload
If you currently kickoff a deployed flow by embedding `id` in `inputs`:
```bash
# Deprecated
curl -X POST \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_CREW_TOKEN" \
-d '{"inputs": {"id": "abcd1234-5678-90ef-ghij-klmnopqrstuv", "topic": "AI Agent Frameworks"}}' \
https://your-crew-url.crewai.com/kickoff
```
Move the UUID to the top-level `restoreFromStateId` field:
```bash
# Supported
curl -X POST \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_CREW_TOKEN" \
-d '{
"inputs": {"topic": "AI Agent Frameworks"},
"restoreFromStateId": "abcd1234-5678-90ef-ghij-klmnopqrstuv"
}' \
https://your-crew-url.crewai.com/kickoff
```
`restoreFromStateId` sits next to `inputs` in the kickoff payload, not inside it. The
`inputs` object now only carries values your flow actually consumes.
### What happens when `inputs.id` is reused
When AMP receives a kickoff for a flow whose `inputs.id` matches an existing
execution, it resolves to the existing record rather than creating a new one. From
the deployment dashboard you'll see:
- **Execution status** — the new run's status overwrites the previous run's. A
finished execution can flip back to `running`, or a `completed` run can flip to
`error` if the new kickoff fails — either way the dashboard no longer reflects
the original run.
- **Traces** — OTel traces stack across kickoffs because they share the same
execution id; the previous run's traces are either replaced by, or mixed with,
the new run's. A step-by-step replay no longer corresponds to a single execution.
- **Executions list** — kickoffs that should appear as separate rows collapse into
a single entry, hiding history.
Migrating to `restoreFromStateId` keeps every kickoff as its own execution — with
its own status, traces, and row in the list — while still hydrating state from a
previous run.
<Card title="Need Help?" icon="headset" href="mailto:support@crewai.com">
Contact our support team if you're unsure which mode your flow needs or hit issues
during the migration.
</Card>

View File

@@ -313,9 +313,9 @@ flow1 = PersistentCounterFlow()
result1 = flow1.kickoff()
print(f"First run result: {result1}")
# Second run - pass the ID to load the persisted state
# Second run - state is automatically loaded
flow2 = PersistentCounterFlow()
result2 = flow2.kickoff(inputs={"id": flow1.state.id})
result2 = flow2.kickoff()
print(f"Second run result: {result2}") # Will be higher due to persisted state
```

View File

@@ -1,190 +0,0 @@
---
title: "Upgrading CrewAI"
description: "How to upgrade CrewAI in your project and adapt to breaking changes between versions."
icon: "arrow-up-circle"
---
## Overview
CrewAI releases ship new capabilities regularly. This guide walks you through the practical steps to keep your installation up to date — both the CLI and your project's virtual environment.
If you're starting fresh, see [Installation](/en/installation). If you're coming from another framework, see [Migrating from LangGraph](/en/guides/migration/migrating-from-langgraph).
---
## The Two Things You Might Want to Upgrade
CrewAI lives in two places on your machine, and they upgrade independently:
| What | How it's installed | How to upgrade |
|---|---|---|
| The **global `crewai` CLI** | `uv tool install crewai` | `uv tool install crewai --upgrade` |
| The **project venv** (what your code runs) | `crewai install` / `uv sync` | `uv add "crewai[...]>=X.Y.Z"` then `crewai install` |
These can — and often do — get out of sync. Running `crewai --version` tells you the CLI version. Running `uv pip show crewai` inside your project tells you the venv version. If they differ, that's normal; what matters for your running code is the venv version.
## Why `crewai install` Alone Doesn't Upgrade
`crewai install` is a thin wrapper around `uv sync`. It installs exactly what the current `uv.lock` file says — it does **not** bump any version constraints.
If your `pyproject.toml` says `crewai>=1.11.1` and the lock file resolved to `1.11.1`, running `crewai install` will keep you on `1.11.1` forever, even if `1.14.4` is available.
To actually upgrade, you need to:
1. Update the version constraint in `pyproject.toml`
2. Re-solve the lock file
3. Sync the venv
`uv add` does all three in one shot.
## How to Upgrade Your Project
```bash
# Bump the constraint and re-lock in one command
uv add "crewai[tools]>=1.14.4"
# Sync the venv (crewai install calls uv sync under the hood)
crewai install
# Verify
uv pip show crewai
# → Version: 1.14.4
```
Replace `[tools]` with whatever extras your project uses (e.g. `[tools,anthropic]`). Check your `pyproject.toml` `dependencies` list if you're unsure.
<Note>
`uv add` updates both `pyproject.toml` **and** `uv.lock` atomically. If you edit `pyproject.toml` manually, you still need to run `uv lock --upgrade-package crewai` to re-solve the lock file before `crewai install` will pick up the new version.
</Note>
## Upgrading the Global CLI
The global CLI is separate from your project. Upgrade it with:
```bash
uv tool install crewai --upgrade
```
If your shell warns about `PATH` after the upgrade, refresh it:
```bash
uv tool update-shell
```
This does **not** touch your project's venv — you still need `uv add` + `crewai install` inside the project.
## Verify Both Are in Sync
```bash
# Global CLI version
crewai --version
# Project venv version
uv pip show crewai | grep Version
```
They don't need to match — but your project venv version is what matters for runtime behavior.
<Note>
CrewAI requires `Python >=3.10, <3.14`. If `uv` was installed against an older interpreter, recreate the project venv with a supported Python before running `crewai install`.
</Note>
---
## Breaking Changes & Migration Notes
Most upgrades only require small adjustments. The areas below are the ones that break silently or with confusing tracebacks.
### Import paths: tools and `BaseTool`
The canonical import location for tools is `crewai.tools`. Older paths still surface in tutorials but should be updated.
```python
# Before
from crewai_tools import BaseTool
from crewai.agents.tools import tool
# After
from crewai.tools import BaseTool, tool
```
The `@tool` decorator and `BaseTool` subclass both live in `crewai.tools`. `AgentFinish` and other internal-agent symbols are no longer part of the public surface — if you were importing them, switch to event listeners or `Task` callbacks instead.
### `Agent` parameter changes
```python
from crewai import Agent
agent = Agent(
role="Researcher",
goal="Find authoritative sources on {topic}",
backstory="You are a careful, source-driven researcher.",
llm="gpt-4o-mini", # string model name OR an LLM object
verbose=True, # bool, not an int level
max_iter=15, # default has changed across versions — set explicitly
allow_delegation=False,
)
```
- `llm` accepts either a string model name (resolved via the configured provider) or an `LLM` object for fine-grained control.
- `verbose` is a plain `bool`. Passing an integer no longer toggles log levels.
- `max_iter` defaults have shifted between releases. If your agent silently stops looping after the first tool call, set `max_iter` explicitly.
### `Crew` parameters
```python
from crewai import Crew, Process
crew = Crew(
agents=[...],
tasks=[...],
process=Process.sequential, # or Process.hierarchical
memory=True,
cache=True,
embedder={"provider": "openai", "config": {"model": "text-embedding-3-small"}},
)
```
- `process=Process.hierarchical` requires either `manager_llm=` or `manager_agent=`. Without one, kickoff raises at validation time.
- `memory=True` with a non-default embedding provider needs an `embedder` dict — see [Memory & embedder config](#memory-embedder-config) below.
### `Task` structured output
Use `output_pydantic`, `output_json`, or `output_file` to coerce a task's result into a typed shape:
```python
from pydantic import BaseModel
from crewai import Task
class Article(BaseModel):
title: str
body: str
write = Task(
description="Write an article about {topic}",
expected_output="A short article with a title and body",
agent=writer,
output_pydantic=Article, # the class, NOT an instance
output_file="output/article.md",
)
```
`output_pydantic` takes the **class** itself. Passing `Article(title="", body="")` is a common mistake and fails with a confusing validation error.
### Memory & embedder config {#memory-embedder-config}
If `memory=True` and you're not using the default OpenAI embeddings, you must pass an `embedder`:
```python
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
embedder={
"provider": "ollama",
"config": {"model": "nomic-embed-text"},
},
)
```
Set the relevant provider credentials (`OPENAI_API_KEY`, `OLLAMA_HOST`, etc.) in your `.env` file. Memory storage paths are project-local by default — delete the project's memory directory if you change embedders, since dimensions don't mix.

View File

@@ -1,131 +0,0 @@
---
title: Platform Tools CLI
description: Create, publish, and install custom tools on the CrewAI platform using the CLI.
icon: terminal
mode: "wide"
---
## Overview
The CrewAI CLI provides commands to manage custom tools on the **CrewAI platform** — a hosted tool registry that lets you share tools within your organization without publishing to PyPI.
| Command | Purpose |
|---------|---------|
| `crewai tool create <handle>` | Scaffold a new tool project |
| `crewai tool publish` | Publish the tool to the CrewAI platform |
| `crewai tool install <handle>` | Install a platform tool into your crew project |
<Note type="info" title="Platform vs PyPI">
These commands manage tools on the **CrewAI platform registry**. If you want to publish a standalone Python package to PyPI instead, see the [Publish Custom Tools to PyPI](/en/guides/tools/publish-custom-tools) guide.
</Note>
## Prerequisites
- **CrewAI CLI** installed (`pip install crewai`)
- **Authenticated** with the platform — run `crewai login` first
---
## Step 1: Create a Tool Project
Scaffold a new tool project:
```bash
crewai tool create my_custom_tool
```
This generates a project structure with the boilerplate you need to start building your tool.
<Tip>
The `handle` is the unique identifier for your tool on the platform. Choose something descriptive and specific to what the tool does.
</Tip>
### Implement Your Tool
Edit the generated tool file to add your logic. The tool follows the standard CrewAI tools contract — you can subclass `BaseTool` or use the `@tool` decorator:
```python
from crewai.tools import BaseTool
class MyCustomTool(BaseTool):
name: str = "My Custom Tool"
description: str = "Description of what this tool does — be specific so agents know when to use it."
def _run(self, argument: str) -> str:
# Your tool logic here
return "result"
```
For the full tools API reference (input schemas, caching, async support, error handling), see the [Create Custom Tools](/en/learn/create-custom-tools) guide.
---
## Step 2: Publish to the Platform
From your tool project directory, publish it to the CrewAI platform:
```bash
crewai tool publish
```
### Options
| Flag | Description |
|------|-------------|
| `--force` | Bypass Git remote validations |
Tools are published privately to your organization by default.
---
## Step 3: Install a Platform Tool
To install a tool that's been published to the platform:
```bash
crewai tool install my_custom_tool
```
Once installed, you can use the tool in your crew like any other tool — assign it to an agent via the `tools` parameter.
---
## Full Lifecycle Example
```bash
# 1. Authenticate with the platform
crewai login
# 2. Scaffold a new tool
crewai tool create weather_lookup
# 3. Implement your logic in the generated project
cd weather_lookup
# ... edit the tool file ...
# 4. Publish to the platform
crewai tool publish
# 5. In another project, install and use it
crewai tool install weather_lookup
```
---
## Platform Tools vs PyPI Packages
| | Platform Tools | PyPI Packages |
|---|---|---|
| **Publish** | `crewai tool publish` | `uv build` + `uv publish` |
| **Registry** | CrewAI platform | PyPI |
| **Install** | `crewai tool install <handle>` | `pip install <package>` |
| **Auth** | `crewai login` | PyPI account + token |
| **Visibility** | Organization-scoped (private) | Always public |
| **Guide** | This page | [Publish Custom Tools](/en/guides/tools/publish-custom-tools) |
---
## Related
- [Create Custom Tools](/en/learn/create-custom-tools) — Python API reference for building tools (BaseTool, @tool decorator)
- [Publish Custom Tools to PyPI](/en/guides/tools/publish-custom-tools) — package and distribute tools as standalone Python libraries

View File

@@ -106,9 +106,6 @@ If you haven't installed `uv` yet, follow **step 1** to quickly get it set up on
```shell
uv tool install crewai --upgrade
```
<Note>
This upgrades the **global `crewai` CLI tool** only. To upgrade the `crewai` version inside your project's virtual environment, see [Upgrading CrewAI in a project](/en/guides/migration/upgrading-crewai).
</Note>
<Check>Installation successful! You're ready to create your first crew! 🎉</Check>
</Step>

View File

@@ -12,9 +12,7 @@ incorporating the latest functionalities such as tool delegation, error handling
enabling agents to perform a wide range of actions.
<Tip>
**Want to publish your tool to the CrewAI platform?** Use the CLI to scaffold, publish, and share tools directly on the platform — see the [Platform Tools CLI](/en/guides/tools/platform-tools-cli) guide.
**Prefer publishing to PyPI?** Check out the [Publish Custom Tools](/en/guides/tools/publish-custom-tools) guide to package and distribute your tool as a standalone Python library.
**Want to publish your tool for the community?** If you're building a tool that others could benefit from, check out the [Publish Custom Tools](/en/guides/tools/publish-custom-tools) guide to learn how to package and distribute your tool on PyPI.
</Tip>
### Subclassing `BaseTool`

View File

@@ -13,7 +13,7 @@ The Daytona sandbox tools give CrewAI agents access to isolated, ephemeral compu
- **`DaytonaExecTool`** — run any shell command inside a sandbox.
- **`DaytonaPythonTool`** — execute a block of Python source code inside a sandbox.
- **`DaytonaFileTool`** — read, write, append, list, delete, and inspect files inside a sandbox; also supports `move`, `find` (content grep), `search` (filename glob), `chmod` (permissions), `replace` (bulk find-and-replace), and `exists`.
- **`DaytonaFileTool`** — read, write, append, list, delete, and inspect files inside a sandbox.
All three tools share the same sandbox lifecycle controls, so you can mix and match them while keeping state in a single persistent sandbox.
@@ -55,7 +55,7 @@ from crewai_tools import DaytonaPythonTool
tool = DaytonaPythonTool()
result = tool.run(code="print(sum(range(10)))")
print(result)
# {"exit_code": 0, "result": "45\n", "artifacts": ExecutionArtifacts(stdout="45\n", charts=[])}
# {"exit_code": 0, "result": "45\n", "artifacts": None}
```
### Multi-step shell session (persistent)
@@ -63,22 +63,17 @@ print(result)
```python Code
from crewai_tools import DaytonaExecTool, DaytonaFileTool
# Create the persistent sandbox via the first tool, then attach the second
# tool to it so both share state (installed packages, files, env vars).
exec_tool = DaytonaExecTool(persistent=True)
exec_tool.run(command="pip install httpx -q")
file_tool = DaytonaFileTool(sandbox_id=exec_tool.active_sandbox_id)
file_tool = DaytonaFileTool(persistent=True)
file_tool.run(
action="write",
path="workspace/script.py",
content="import httpx; print(f'httpx loaded, version {httpx.__version__}')",
)
exec_tool.run(command="python workspace/script.py")
# Install a package, then write and run a script — all in the same sandbox
exec_tool.run(command="pip install httpx -q")
file_tool.run(action="write", path="/workspace/fetch.py", content="import httpx; print(httpx.get('https://httpbin.org/get').status_code)")
exec_tool.run(command="python /workspace/fetch.py")
```
<Note>
By default, each tool with `persistent=True` lazily creates its **own** sandbox on first use. The pattern above shares a single sandbox across multiple tools by reading the first tool's `active_sandbox_id` after a `.run()` call and passing it to the others via `sandbox_id=...`. With `persistent=False` (the default), every `.run()` call gets a fresh sandbox that's deleted at the end of that call.
Each tool instance maintains its own persistent sandbox. To share **one** sandbox across two tools, create the first tool, grab its sandbox id via `tool._persistent_sandbox.id`, and pass it to the second tool via `sandbox_id=...`.
</Note>
### Attach to an existing sandbox
@@ -87,7 +82,7 @@ By default, each tool with `persistent=True` lazily creates its **own** sandbox
from crewai_tools import DaytonaExecTool
tool = DaytonaExecTool(sandbox_id="my-long-lived-sandbox")
result = tool.run(command="ls workspace")
result = tool.run(command="ls /workspace")
```
### Custom sandbox parameters
@@ -107,41 +102,6 @@ tool = DaytonaExecTool(
)
```
### Searching, moving, and modifying files
```python Code
from crewai_tools import DaytonaFileTool
file_tool = DaytonaFileTool(persistent=True)
# Find every TODO in the source tree (grep file contents recursively)
file_tool.run(action="find", path="workspace/src", pattern="TODO:")
# Find all Python files (glob match on filenames)
file_tool.run(action="search", path="workspace", pattern="*.py")
# Make a script executable
file_tool.run(action="chmod", path="workspace/run.sh", mode="755")
# Rename or move a file
file_tool.run(
action="move",
path="workspace/draft.md",
destination="workspace/final.md",
)
# Bulk find-and-replace across multiple files
file_tool.run(
action="replace",
paths=["workspace/src/a.py", "workspace/src/b.py"],
pattern="old_function",
replacement="new_function",
)
# Quick existence check before a destructive op
file_tool.run(action="exists", path="workspace/cache.db")
```
### Agent integration
```python Code
@@ -161,7 +121,7 @@ coder = Agent(
)
task = Task(
description="Write a Python script that prints the first 10 Fibonacci numbers, save it to workspace/fib.py, and run it.",
description="Write a Python script that prints the first 10 Fibonacci numbers, save it to /workspace/fib.py, and run it.",
expected_output="The first 10 Fibonacci numbers printed to stdout.",
agent=coder,
)
@@ -208,22 +168,12 @@ All three tools accept these parameters at initialization:
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `action` | `str` | ✓ | One of: `read`, `write`, `append`, `list`, `delete`, `mkdir`, `info`, `exists`, `move`, `find`, `search`, `chmod`, `replace`. |
| `path` | `str \| None` | ✓ for all actions except `replace` | Absolute path inside the sandbox. |
| `content` | `str \| None` | ✓ for `append` | Content to write or append. |
| `action` | `str` | ✓ | One of: `read`, `write`, `append`, `list`, `delete`, `mkdir`, `info`. |
| `path` | `str` | ✓ | Absolute path inside the sandbox. |
| `content` | `str \| None` | | Content to write or append. Required for `append`. |
| `binary` | `bool` | | If `True`, `content` is base64 on write; returns base64 on read. |
| `recursive` | `bool` | | For `delete`: remove directories recursively. |
| `mode` | `str \| None` | | For `mkdir`: octal permissions for the new directory (defaults to `"0755"`). For `chmod`: octal permissions to apply to the target. |
| `destination` | `str \| None` | ✓ for `move` | Destination path for `move`. |
| `pattern` | `str \| None` | ✓ for `find`, `search`, `replace` | For `find`: substring matched against file CONTENTS. For `search`: glob matched against file NAMES (e.g. `*.py`). For `replace`: text to replace inside files. |
| `replacement` | `str \| None` | ✓ for `replace` | Replacement text for `pattern`. |
| `paths` | `list[str] \| None` | ✓ for `replace` | List of file paths in which to replace text. |
| `owner` | `str \| None` | | For `chmod`: new file owner. |
| `group` | `str \| None` | | For `chmod`: new file group. |
<Note>
For `chmod`, pass at least one of `mode`, `owner`, or `group` — any field left as `None` is left unchanged on the target.
</Note>
| `mode` | `str` | | For `mkdir`: octal permission string (default `"0755"`). |
<Tip>
For files larger than a few KB, create the file first with `action="write"` and empty content, then send the body via multiple `action="append"` calls of ~4 KB each to stay within tool-call payload limits.

View File

@@ -54,14 +54,6 @@ These tools enable your agents to search the web, research topics, and find info
Extract structured content from web pages using the Tavily API.
</Card>
<Card title="Tavily Research Tool" icon="flask" href="/en/tools/search-research/tavilyresearchtool">
Run multi-step research tasks and get cited reports using the Tavily Research API.
</Card>
<Card title="Tavily Get Research Tool" icon="clipboard-list" href="/en/tools/search-research/tavilygetresearchtool">
Retrieve the status and results of an existing Tavily research task.
</Card>
<Card title="Arxiv Paper Tool" icon="box-archive" href="/en/tools/search-research/arxivpapertool">
Search arXiv and optionally download PDFs.
</Card>
@@ -84,15 +76,7 @@ These tools enable your agents to search the web, research topics, and find info
- **Academic Research**: Find scholarly articles and technical papers
```python
from crewai_tools import (
GitHubSearchTool,
SerperDevTool,
TavilyExtractorTool,
TavilyGetResearchTool,
TavilyResearchTool,
TavilySearchTool,
YoutubeVideoSearchTool,
)
from crewai_tools import SerperDevTool, GitHubSearchTool, YoutubeVideoSearchTool, TavilySearchTool, TavilyExtractorTool
# Create research tools
web_search = SerperDevTool()
@@ -100,21 +84,11 @@ code_search = GitHubSearchTool()
video_research = YoutubeVideoSearchTool()
tavily_search = TavilySearchTool()
content_extractor = TavilyExtractorTool()
tavily_research = TavilyResearchTool()
tavily_get_research = TavilyGetResearchTool()
# Add to your agent
agent = Agent(
role="Research Analyst",
tools=[
web_search,
code_search,
video_research,
tavily_search,
content_extractor,
tavily_research,
tavily_get_research,
],
tools=[web_search, code_search, video_research, tavily_search, content_extractor],
goal="Gather comprehensive information on any topic"
)
```

View File

@@ -1,85 +0,0 @@
---
title: "Tavily Get Research Tool"
description: "Retrieve the status and results of an existing Tavily research task"
icon: "clipboard-list"
mode: "wide"
---
The `TavilyGetResearchTool` lets CrewAI agents check an existing Tavily research task by `request_id`. Use it when a research task was started earlier and you need to retrieve its current status or final results.
If you need to start a new research job, use the [Tavily Research Tool](/en/tools/search-research/tavilyresearchtool). This tool is specifically for looking up an existing Tavily research request after you already have its `request_id`.
## Installation
To use the `TavilyGetResearchTool`, install the `tavily-python` library alongside `crewai-tools`:
```shell
uv add 'crewai[tools]' tavily-python
```
## Environment Variables
Set your Tavily API key:
```bash
export TAVILY_API_KEY='your_tavily_api_key'
```
Get an API key at [https://app.tavily.com/](https://app.tavily.com/) (sign up, then create a key).
## Example Usage
```python
from crewai_tools import TavilyGetResearchTool
tavily_get_research_tool = TavilyGetResearchTool()
status_result = tavily_get_research_tool.run(
request_id="your-research-request-id"
)
print(status_result)
```
## Common Workflow
Use `TavilyGetResearchTool` when your application or another service has already created a Tavily research task and saved its `request_id`.
Typical cases include:
- Polling for completion after kicking off research in a background job.
- Looking up the latest status of a long-running research task.
- Fetching final research output from a previously created Tavily request.
## Configuration Options
The `TavilyGetResearchTool` accepts the following argument when calling the `run` method:
- `request_id` (str): **Required.** The existing Tavily research request ID to retrieve.
## Async Usage
Use `_arun` when your application is already running inside an async event loop:
```python
from crewai_tools import TavilyGetResearchTool
tavily_get_research_tool = TavilyGetResearchTool()
status_result = await tavily_get_research_tool._arun(
request_id="your-research-request-id"
)
```
## Features
- **Research status retrieval**: Fetch the current status of an existing Tavily research task.
- **Result retrieval**: Return available research output once Tavily has completed the task.
- **Sync and async**: Use either `_run`/`run` or `_arun` depending on your application's runtime.
- **JSON output**: Returns Tavily responses as formatted JSON strings.
## Response Format
The tool returns a JSON string containing the current research task status and any available results from Tavily. The exact response shape depends on the task state returned by Tavily, so incomplete tasks may return status information before the final research output is available.
Refer to the [Tavily API documentation](https://docs.tavily.com/) for full details on the Research API.

View File

@@ -35,7 +35,7 @@ info:
1. **Discover inputs** using `GET /inputs`
2. **Start execution** using `POST /kickoff`
3. **Monitor progress** using `GET /status/{kickoff_id}`
3. **Monitor progress** using `GET /{kickoff_id}/status`
version: 1.0.0
contact:
name: CrewAI Support
@@ -207,7 +207,7 @@ paths:
"500":
$ref: "#/components/responses/ServerError"
/status/{kickoff_id}:
/{kickoff_id}/status:
get:
summary: Get Execution Status
description: |

View File

@@ -35,7 +35,7 @@ info:
1. **Discover inputs** using `GET /inputs`
2. **Start execution** using `POST /kickoff`
3. **Monitor progress** using `GET /status/{kickoff_id}`
3. **Monitor progress** using `GET /{kickoff_id}/status`
version: 1.0.0
contact:
name: CrewAI Support
@@ -207,7 +207,7 @@ paths:
"500":
$ref: "#/components/responses/ServerError"
/status/{kickoff_id}:
/{kickoff_id}/status:
get:
summary: Get Execution Status
description: |

View File

@@ -84,7 +84,7 @@ paths:
'500':
$ref: '#/components/responses/ServerError'
/status/{kickoff_id}:
/{kickoff_id}/status:
get:
summary: 실행 상태 조회
description: |

View File

@@ -35,7 +35,7 @@ info:
1. **Descubra os inputs** usando `GET /inputs`
2. **Inicie a execução** usando `POST /kickoff`
3. **Monitore o progresso** usando `GET /status/{kickoff_id}`
3. **Monitore o progresso** usando `GET /{kickoff_id}/status`
version: 1.0.0
contact:
name: CrewAI Suporte
@@ -120,7 +120,7 @@ paths:
"500":
$ref: "#/components/responses/ServerError"
/status/{kickoff_id}:
/{kickoff_id}/status:
get:
summary: Obter Status da Execução
description: |

View File

@@ -26,7 +26,7 @@ CrewAI 엔터프라이즈 API 참고 자료에 오신 것을 환영합니다.
</Step>
<Step title="진행 상황 모니터링">
`GET /status/{kickoff_id}`를 사용하여 실행 상태를 확인하고 결과를 조회하세요.
`GET /{kickoff_id}/status`를 사용하여 실행 상태를 확인하고 결과를 조회하세요.
</Step>
</Steps>
@@ -65,7 +65,7 @@ https://your-crew-name.crewai.com
1. **탐색**: `GET /inputs`를 호출하여 crew가 필요한 것을 파악합니다.
2. **실행**: `POST /kickoff`를 통해 입력값을 제출하여 처리를 시작합니다.
3. **모니터링**: 완료될 때까지 `GET /status/{kickoff_id}`를 주기적으로 조회합니다.
3. **모니터링**: 완료될 때까지 `GET /{kickoff_id}/status`를 주기적으로 조회합니다.
4. **결과**: 완료된 응답에서 최종 출력을 추출합니다.
## 오류 처리

View File

@@ -1,6 +1,6 @@
---
title: "GET /status/{kickoff_id}"
title: "GET /{kickoff_id}/status"
description: "실행 상태 조회"
openapi: "/enterprise-api.ko.yaml GET /status/{kickoff_id}"
openapi: "/enterprise-api.ko.yaml GET /{kickoff_id}/status"
mode: "wide"
---

View File

@@ -4,80 +4,6 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 5월 13일">
## v1.14.5a5
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a5)
## 변경 사항
### 기능
- CrewAgentExecutor 사용 중단, 기본 Crew 에이전트를 AgentExecutor로 설정
- Daytona 샌드박스 도구 개선
### 버그 수정
- pt-BR 첫 번째 흐름 가이드에서 누락된 코드 블록 수정
- HITL 사전 검토 및 증류 실패 로그 기록, learn_strict 추가
- 보안 취약점을 위한 urllib3 패치
- gitpython 및 langchain-core 패치; 패치되지 않은 paramiko CVE 무시
- uv 잠금/동기화 시 모든 게시된 작업공간 패키지 새로 고침
### 문서
- `inputs.id`에서 `restoreFromStateId`로의 마이그레이션 가이드 추가
- OSS 업그레이드 및 crew-to-flow 마이그레이션 가이드 추가
- v1.14.5a4의 변경 로그 및 버전 업데이트
## 기여자
@akaKuruma, @greysonlalonde, @iris-clawd, @lorenzejay, @mislavivanda
</Update>
<Update label="2026년 5월 9일">
## v1.14.5a4
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a4)
## 변경 사항
### 기능
- LLM 목록 업데이트
### 버그 수정
- `textual`을 `crewai-cli`로 이동하고 `certifi`를 추가하여 의존성 문제 수정
### 문서
- v1.14.5a3의 변경 로그 및 버전 업데이트
## 기여자
@cgoeppinger, @greysonlalonde
</Update>
<Update label="2026년 5월 7일">
## v1.14.5a3
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a3)
## 변경 사항
### 버그 수정
- 상태 엔드포인트 경로를 /{kickoff_id}/status에서 /status/{kickoff_id}로 수정
- 보안 준수를 위해 gitpython 의존성을 버전 >=3.1.47로 업데이트
### 리팩토링
- CLI를 독립형 crewai-cli 패키지로 분리
### 문서
- v1.14.5a2에 대한 변경 로그 및 버전 업데이트
## 기여자
@greysonlalonde, @iris-clawd
</Update>
<Update label="2026년 5월 4일">
## v1.14.5a2

View File

@@ -29,7 +29,6 @@ from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
load_dotenv()
class ExampleFlow(Flow):
model = "gpt-4o-mini"

View File

@@ -1,125 +0,0 @@
---
title: "inputs.id에서 restore_from_state_id로 마이그레이션"
description: "더 이상 지원되지 않는 inputs.id 하이드레이션에서 지원되는 restore_from_state_id 필드로 @persist 흐름을 이동"
icon: "arrow-right-arrow-left"
---
<Warning>
`inputs` 내에서 `id`를 전달하여 `@persist` 흐름을 하이드레이트하는 것은 **더 이상 지원되지 않으며**
향후 릴리스에서 제거될 예정입니다. 대체품인 `restore_from_state_id`는 CrewAI **v1.14.5 이상**에서 사용할 수 있으며,
아래 단계는 업그레이드 후 적용됩니다.
</Warning>
## 개요
이전 실행에서 `@persist` 흐름을 하이드레이트하는 문서화된 방법은
해당 실행의 UUID를 `inputs.id`로 전달하는 것입니다. CrewAI는 이제
`inputs` 페이로드를 과부하하지 않고 동일한 하이드레이션을 수행하는 전용 필드인
`restore_from_state_id`를 제공합니다 — 그리고 하이드레이션 키를 새로운 실행의
정체성과 결합하지 않습니다.
## 마이그레이션
현재 `inputs={"id": ...}`로 `@persist` 흐름을 시작하는 경우:
```python
# 더 이상 지원되지 않음
flow = CounterFlow()
flow.kickoff(inputs={"id": "abcd1234-5678-90ef-ghij-klmnopqrstuv"})
```
`restore_from_state_id`로 전환하십시오:
```python
# 지원됨
flow = CounterFlow()
flow.kickoff(restore_from_state_id="abcd1234-5678-90ef-ghij-klmnopqrstuv")
```
두 모드는 서로 다른 계보 의미론을 가지고 있습니다:
- `inputs={"id": <uuid>}` (더 이상 지원되지 않음) — **재개**: 제공된
id 아래에 기록이 작성되어 동일한 `flow_uuid` 이력이 확장됩니다.
- `restore_from_state_id=<uuid>` — **분기**: 스냅샷에서 상태를 하이드레이트한 후
새로운 `state.id` 아래에 기록합니다. 원본 흐름의 이력은 보존됩니다.
대부분의 프로덕션 시나리오에서는 — 이전 상태에서 시드된 흐름을 다시 실행하는 경우 — 분기가
필요합니다. 전체 정신 모델은 [Flow State 마스터링](/ko/guides/flows/mastering-flow-state)을 참조하십시오.
CrewAI AMP REST API를 통해 흐름을 시작하는 경우, 아래 [AMP](#amp)에서
동일한 페이로드 마이그레이션을 참조하십시오.
## 왜 `@persist`에 대해 `inputs.id`를 더 이상 지원하지 않습니까?
`inputs.id`는 현재 이전 실행에서 `@persist` 흐름을 재개하는 문서화된 방법입니다. 문제는
동일한 UUID가 두 가지 작업을 동시에 수행한다는 것입니다:
1. **어떤 스냅샷에서 `@persist`가 하이드레이트되는지를 선택합니다** — 해당 UUID 아래에 저장된 상태를 로드합니다.
2. **새 실행의 흐름 실행 ID가 됩니다** (`state.id`는 SDK에서; 일부 컨텍스트에서는 `flow_id`로 표시됨) — 이
시작에서의 모든 `@persist` 기록도 동일한 UUID 아래에 작성됩니다.
이 이중 역할이 이 가이드에서 설명하는 문제의 근본 원인입니다. 제공된 UUID가 새 실행의 id이기도 하므로,
동일한 `inputs.id`를 전달하는 두 번의 시작은 두 개의 별도 실행이 아닙니다 — 그들은 id를 공유하고,
지속성 기록을 공유하며, (AMP에서) 실행 목록에서 행을 공유합니다. "이 스냅샷에서 하이드레이트하지만,
이 실행을 별도로 기록하십시오"라고 말할 방법이 없습니다.
`restore_from_state_id`가 그 분리입니다. 이는 `@persist`에 어떤 스냅샷에서 하이드레이트할지를 알려주며,
새 실행이 새로운 `state.id`를 받을 수 있도록 합니다. 하이드레이션 소스와 기록된 실행은 더 이상 동일한 UUID가 아닙니다 — 이는 대부분의 프로덕션 시나리오에서 실제로 원하는 것입니다.
## 제거 일정
`@persist` 하이드레이션을 위한 `inputs.id`는 CrewAI의 향후 릴리스에서 제거될 예정입니다. 즉각적인 강제 종료는 없으며 — 기존 흐름은 계속 작동합니다 — 하지만 v1.14.5 이상으로 업그레이드하면,
새 코드에서는 `restore_from_state_id`를 사용해야 하며, 기존 흐름은 다음 편리한 기회에 마이그레이션해야 합니다.
## AMP
흐름을 CrewAI AMP에 배포하는 경우, 마이그레이션은 배포된 팀에 전송되는 시작 페이로드로 확장되며,
`inputs.id`를 재사용하는 가시적인 증상은 배포 대시보드에 나타납니다. 아래 두 개의 하위 섹션이 이를 다룹니다.
### 시작 페이로드 마이그레이션
현재 `inputs`에 `id`를 포함하여 배포된 흐름을 시작하는 경우:
```bash
# 더 이상 지원되지 않음
curl -X POST \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_CREW_TOKEN" \
-d '{"inputs": {"id": "abcd1234-5678-90ef-ghij-klmnopqrstuv", "topic": "AI Agent Frameworks"}}' \
https://your-crew-url.crewai.com/kickoff
```
UUID를 최상위 `restoreFromStateId` 필드로 이동하십시오:
```bash
# 지원됨
curl -X POST \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_CREW_TOKEN" \
-d '{
"inputs": {"topic": "AI Agent Frameworks"},
"restoreFromStateId": "abcd1234-5678-90ef-ghij-klmnopqrstuv"
}' \
https://your-crew-url.crewai.com/kickoff
```
`restoreFromStateId`는 시작 페이로드에서 `inputs` 옆에 위치하며, 내부에 있지 않습니다.
`inputs` 객체는 이제 흐름이 실제로 소비하는 값만 포함합니다.
### `inputs.id`가 재사용될 때 발생하는 일
AMP가 기존 실행과 `inputs.id`가 일치하는 흐름의 시작을 수신하면,
새로운 기록을 생성하는 대신 기존 기록으로 해결됩니다. 배포 대시보드에서 다음을 확인할 수 있습니다:
- **실행 상태** — 새로운 실행의 상태가 이전 실행의 상태를 덮어씁니다. 완료된 실행은
다시 `실행 중`으로 전환되거나, `완료`된 실행은 새로운 시작이 실패할 경우 `오류`로 전환될 수 있습니다 — 어쨌든 대시보드는 더 이상
원래 실행을 반영하지 않습니다.
- **추적** — OTel 추적이 시작 간에 쌓이기 때문에 동일한 실행 id를 공유합니다; 이전 실행의 추적은
새로운 실행의 추적과 교체되거나 혼합됩니다. 단계별 재생은 더 이상 단일 실행에 해당하지 않습니다.
- **실행 목록** — 별도의 행으로 나타나야 할 시작이 단일 항목으로 축소되어 이력을 숨깁니다.
`restoreFromStateId`로 마이그레이션하면 모든 시작이 자체 실행으로 유지됩니다 — 각자의 상태, 추적 및 목록의 행을 가지며 — 여전히 이전 실행에서 상태를 하이드레이트합니다.
<Card title="도움이 필요하신가요?" icon="headset" href="mailto:support@crewai.com">
흐름이 어떤 모드가 필요한지 확실하지 않거나 마이그레이션 중 문제가 발생하면 지원 팀에 문의하십시오.
</Card>

View File

@@ -1,190 +0,0 @@
---
title: "CrewAI 업그레이드"
description: "프로젝트에서 CrewAI를 업그레이드하고 버전 간 브레이킹 체인지에 적응하는 방법."
icon: "arrow-up-circle"
---
## 개요
CrewAI 릴리스는 정기적으로 새로운 기능을 제공합니다. 이 가이드는 CLI와 프로젝트의 가상 환경을 모두 최신 상태로 유지하기 위한 실용적인 단계를 안내합니다.
새로 시작한다면 [설치](/ko/installation)를 참고하세요. 다른 프레임워크에서 옮겨오는 경우라면 [LangGraph에서 마이그레이션](/ko/guides/migration/migrating-from-langgraph)을 참고하세요.
---
## 업그레이드할 수 있는 두 가지
CrewAI는 사용자의 머신에 두 곳에 존재하며, 각각 독립적으로 업그레이드됩니다:
| 무엇 | 설치 방법 | 업그레이드 방법 |
|---|---|---|
| **전역 `crewai` CLI** | `uv tool install crewai` | `uv tool install crewai --upgrade` |
| **프로젝트 venv** (코드가 실행되는 곳) | `crewai install` / `uv sync` | `uv add "crewai[...]>=X.Y.Z"` 후 `crewai install` |
이 둘은 — 그리고 자주 — 동기화가 어긋날 수 있습니다. `crewai --version`은 CLI 버전을 알려줍니다. 프로젝트 안에서 `uv pip show crewai`를 실행하면 venv 버전을 알려줍니다. 둘이 다른 것은 정상이며, 실행 중인 코드에 중요한 것은 venv 버전입니다.
## 왜 `crewai install`만으로는 업그레이드되지 않는가
`crewai install`은 `uv sync`를 감싼 얇은 래퍼입니다. 현재 `uv.lock` 파일이 지시하는 것 그대로를 설치할 뿐이며 — 어떤 버전 제약도 올리지 **않습니다**.
`pyproject.toml`이 `crewai>=1.11.1`이라 적혀 있고 lock 파일이 `1.11.1`로 해소되었다면, `crewai install`을 실행해도 `1.14.4`가 사용 가능하더라도 영원히 `1.11.1`에 머무릅니다.
실제로 업그레이드하려면 다음을 해야 합니다:
1. `pyproject.toml`의 버전 제약 업데이트
2. lock 파일 재해소
3. venv 동기화
`uv add`는 이 세 가지를 한 번에 처리합니다.
## 프로젝트 업그레이드 방법
```bash
# 제약을 올리고 lock을 다시 만드는 한 번의 명령
uv add "crewai[tools]>=1.14.4"
# venv 동기화 (crewai install은 내부적으로 uv sync를 호출)
crewai install
# 확인
uv pip show crewai
# → Version: 1.14.4
```
`[tools]`를 프로젝트에서 사용하는 extras로 바꾸세요 (예: `[tools,anthropic]`). 잘 모르겠다면 `pyproject.toml`의 `dependencies` 목록을 확인하세요.
<Note>
`uv add`는 `pyproject.toml`과 `uv.lock`을 **둘 다** 원자적으로 업데이트합니다. `pyproject.toml`을 수동으로 편집하는 경우, `crewai install`이 새 버전을 가져가도록 하기 전에 `uv lock --upgrade-package crewai`를 실행해 lock 파일을 다시 해소해야 합니다.
</Note>
## 전역 CLI 업그레이드
전역 CLI는 프로젝트와 분리되어 있습니다. 다음 명령으로 업그레이드하세요:
```bash
uv tool install crewai --upgrade
```
업그레이드 후 셸이 `PATH`에 대해 경고하면 새로고침하세요:
```bash
uv tool update-shell
```
이 명령은 프로젝트의 venv를 **건드리지 않습니다** — 프로젝트 내부에서는 여전히 `uv add` + `crewai install`이 필요합니다.
## 둘이 동기화되었는지 확인
```bash
# 전역 CLI 버전
crewai --version
# 프로젝트 venv 버전
uv pip show crewai | grep Version
```
둘이 일치할 필요는 없지만 — 런타임 동작에 중요한 것은 프로젝트 venv 버전입니다.
<Note>
CrewAI는 `Python >=3.10, <3.14`를 요구합니다. `uv`가 더 오래된 인터프리터로 설치되어 있다면, `crewai install`을 실행하기 전에 지원되는 Python으로 프로젝트 venv를 다시 만드세요.
</Note>
---
## 브레이킹 체인지 및 마이그레이션 노트
대부분의 업그레이드는 작은 조정만 필요합니다. 아래 항목들은 조용히 깨지거나 헷갈리는 트레이스백을 내는 영역들입니다.
### Import 경로: tools와 `BaseTool`
tools의 정식 import 위치는 `crewai.tools`입니다. 옛 경로들이 아직 튜토리얼에 등장하지만 업데이트해야 합니다.
```python
# 이전
from crewai_tools import BaseTool
from crewai.agents.tools import tool
# 이후
from crewai.tools import BaseTool, tool
```
`@tool` 데코레이터와 `BaseTool` 서브클래스는 모두 `crewai.tools`에 있습니다. `AgentFinish` 등 내부 에이전트 심볼들은 더 이상 공개 표면이 아닙니다 — import 중이었다면 event listener나 `Task` 콜백으로 전환하세요.
### `Agent` 파라미터 변경
```python
from crewai import Agent
agent = Agent(
role="Researcher",
goal="Find authoritative sources on {topic}",
backstory="You are a careful, source-driven researcher.",
llm="gpt-4o-mini", # 모델명 문자열 또는 LLM 객체
verbose=True, # 정수 레벨이 아닌 bool
max_iter=15, # 버전마다 기본값이 바뀌었음 — 명시적으로 지정
allow_delegation=False,
)
```
- `llm`은 문자열 모델명(설정된 provider를 통해 해소)이나 세밀한 제어를 위한 `LLM` 객체를 받습니다.
- `verbose`는 일반 `bool`입니다. 정수를 전달해도 더 이상 로그 레벨을 토글하지 않습니다.
- `max_iter`의 기본값은 릴리스 사이에 변경되었습니다. 첫 tool 호출 후 에이전트가 조용히 반복을 멈춘다면 `max_iter`를 명시적으로 지정하세요.
### `Crew` 파라미터
```python
from crewai import Crew, Process
crew = Crew(
agents=[...],
tasks=[...],
process=Process.sequential, # 또는 Process.hierarchical
memory=True,
cache=True,
embedder={"provider": "openai", "config": {"model": "text-embedding-3-small"}},
)
```
- `process=Process.hierarchical`은 `manager_llm=` 또는 `manager_agent=` 중 하나가 필요합니다. 둘 다 없으면 kickoff 시 검증 단계에서 오류가 발생합니다.
- 기본이 아닌 임베딩 provider와 함께 `memory=True`를 쓰려면 `embedder` dict가 필요합니다 — 아래의 [메모리와 embedder 설정](#memory-embedder-config)을 참고하세요.
### `Task` 구조화된 출력
`output_pydantic`, `output_json`, 또는 `output_file`을 사용해 task 결과를 타입이 지정된 형태로 강제할 수 있습니다:
```python
from pydantic import BaseModel
from crewai import Task
class Article(BaseModel):
title: str
body: str
write = Task(
description="Write an article about {topic}",
expected_output="A short article with a title and body",
agent=writer,
output_pydantic=Article, # 인스턴스가 아닌 클래스
output_file="output/article.md",
)
```
`output_pydantic`은 **클래스** 자체를 받습니다. `Article(title="", body="")`을 전달하는 것은 흔한 실수이며 헷갈리는 검증 오류로 실패합니다.
### 메모리와 embedder 설정 {#memory-embedder-config}
`memory=True`이고 OpenAI의 기본 임베딩을 사용하지 않는다면, `embedder`를 반드시 전달해야 합니다:
```python
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
embedder={
"provider": "ollama",
"config": {"model": "nomic-embed-text"},
},
)
```
해당 provider의 자격 증명(`OPENAI_API_KEY`, `OLLAMA_HOST` 등)을 `.env` 파일에 설정하세요. 메모리 저장 경로는 기본적으로 프로젝트-로컬입니다 — embedder를 바꾸면 차원이 호환되지 않으므로 프로젝트의 메모리 디렉터리를 삭제하세요.

View File

@@ -13,7 +13,7 @@ The Daytona sandbox tools give CrewAI agents access to isolated, ephemeral compu
- **`DaytonaExecTool`** — run any shell command inside a sandbox.
- **`DaytonaPythonTool`** — execute a block of Python source code inside a sandbox.
- **`DaytonaFileTool`** — read, write, append, list, delete, and inspect files inside a sandbox; also supports `move`, `find` (content grep), `search` (filename glob), `chmod` (permissions), `replace` (bulk find-and-replace), and `exists`.
- **`DaytonaFileTool`** — read, write, append, list, delete, and inspect files inside a sandbox.
All three tools share the same sandbox lifecycle controls, so you can mix and match them while keeping state in a single persistent sandbox.
@@ -55,7 +55,7 @@ from crewai_tools import DaytonaPythonTool
tool = DaytonaPythonTool()
result = tool.run(code="print(sum(range(10)))")
print(result)
# {"exit_code": 0, "result": "45\n", "artifacts": ExecutionArtifacts(stdout="45\n", charts=[])}
# {"exit_code": 0, "result": "45\n", "artifacts": None}
```
### Multi-step shell session (persistent)
@@ -63,22 +63,17 @@ print(result)
```python Code
from crewai_tools import DaytonaExecTool, DaytonaFileTool
# Create the persistent sandbox via the first tool, then attach the second
# tool to it so both share state (installed packages, files, env vars).
exec_tool = DaytonaExecTool(persistent=True)
exec_tool.run(command="pip install httpx -q")
file_tool = DaytonaFileTool(sandbox_id=exec_tool.active_sandbox_id)
file_tool = DaytonaFileTool(persistent=True)
file_tool.run(
action="write",
path="workspace/script.py",
content="import httpx; print(f'httpx loaded, version {httpx.__version__}')",
)
exec_tool.run(command="python workspace/script.py")
# Install a package, then write and run a script — all in the same sandbox
exec_tool.run(command="pip install httpx -q")
file_tool.run(action="write", path="/workspace/fetch.py", content="import httpx; print(httpx.get('https://httpbin.org/get').status_code)")
exec_tool.run(command="python /workspace/fetch.py")
```
<Note>
By default, each tool with `persistent=True` lazily creates its **own** sandbox on first use. The pattern above shares a single sandbox across multiple tools by reading the first tool's `active_sandbox_id` after a `.run()` call and passing it to the others via `sandbox_id=...`. With `persistent=False` (the default), every `.run()` call gets a fresh sandbox that's deleted at the end of that call.
Each tool instance maintains its own persistent sandbox. To share **one** sandbox across two tools, create the first tool, grab its sandbox id via `tool._persistent_sandbox.id`, and pass it to the second tool via `sandbox_id=...`.
</Note>
### Attach to an existing sandbox
@@ -87,7 +82,7 @@ By default, each tool with `persistent=True` lazily creates its **own** sandbox
from crewai_tools import DaytonaExecTool
tool = DaytonaExecTool(sandbox_id="my-long-lived-sandbox")
result = tool.run(command="ls workspace")
result = tool.run(command="ls /workspace")
```
### Custom sandbox parameters
@@ -107,41 +102,6 @@ tool = DaytonaExecTool(
)
```
### Searching, moving, and modifying files
```python Code
from crewai_tools import DaytonaFileTool
file_tool = DaytonaFileTool(persistent=True)
# Find every TODO in the source tree (grep file contents recursively)
file_tool.run(action="find", path="workspace/src", pattern="TODO:")
# Find all Python files (glob match on filenames)
file_tool.run(action="search", path="workspace", pattern="*.py")
# Make a script executable
file_tool.run(action="chmod", path="workspace/run.sh", mode="755")
# Rename or move a file
file_tool.run(
action="move",
path="workspace/draft.md",
destination="workspace/final.md",
)
# Bulk find-and-replace across multiple files
file_tool.run(
action="replace",
paths=["workspace/src/a.py", "workspace/src/b.py"],
pattern="old_function",
replacement="new_function",
)
# Quick existence check before a destructive op
file_tool.run(action="exists", path="workspace/cache.db")
```
### Agent integration
```python Code
@@ -161,7 +121,7 @@ coder = Agent(
)
task = Task(
description="Write a Python script that prints the first 10 Fibonacci numbers, save it to workspace/fib.py, and run it.",
description="Write a Python script that prints the first 10 Fibonacci numbers, save it to /workspace/fib.py, and run it.",
expected_output="The first 10 Fibonacci numbers printed to stdout.",
agent=coder,
)
@@ -208,22 +168,12 @@ All three tools accept these parameters at initialization:
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `action` | `str` | ✓ | One of: `read`, `write`, `append`, `list`, `delete`, `mkdir`, `info`, `exists`, `move`, `find`, `search`, `chmod`, `replace`. |
| `path` | `str \| None` | ✓ for all actions except `replace` | Absolute path inside the sandbox. |
| `content` | `str \| None` | ✓ for `append` | Content to write or append. |
| `action` | `str` | ✓ | One of: `read`, `write`, `append`, `list`, `delete`, `mkdir`, `info`. |
| `path` | `str` | ✓ | Absolute path inside the sandbox. |
| `content` | `str \| None` | | Content to write or append. Required for `append`. |
| `binary` | `bool` | | If `True`, `content` is base64 on write; returns base64 on read. |
| `recursive` | `bool` | | For `delete`: remove directories recursively. |
| `mode` | `str \| None` | | For `mkdir`: octal permissions for the new directory (defaults to `"0755"`). For `chmod`: octal permissions to apply to the target. |
| `destination` | `str \| None` | ✓ for `move` | Destination path for `move`. |
| `pattern` | `str \| None` | ✓ for `find`, `search`, `replace` | For `find`: substring matched against file CONTENTS. For `search`: glob matched against file NAMES (e.g. `*.py`). For `replace`: text to replace inside files. |
| `replacement` | `str \| None` | ✓ for `replace` | Replacement text for `pattern`. |
| `paths` | `list[str] \| None` | ✓ for `replace` | List of file paths in which to replace text. |
| `owner` | `str \| None` | | For `chmod`: new file owner. |
| `group` | `str \| None` | | For `chmod`: new file group. |
<Note>
For `chmod`, pass at least one of `mode`, `owner`, or `group` — any field left as `None` is left unchanged on the target.
</Note>
| `mode` | `str` | | For `mkdir`: octal permission string (default `"0755"`). |
<Tip>
For files larger than a few KB, create the file first with `action="write"` and empty content, then send the body via multiple `action="append"` calls of ~4 KB each to stay within tool-call payload limits.

View File

@@ -26,7 +26,7 @@ Bem-vindo à referência da API do CrewAI AMP. Esta API permite que você intera
</Step>
<Step title="Monitore o Progresso">
Use `GET /status/{kickoff_id}` para checar o status da execução e recuperar os resultados.
Use `GET /{kickoff_id}/status` para checar o status da execução e recuperar os resultados.
</Step>
</Steps>
@@ -65,7 +65,7 @@ Substitua `your-crew-name` pela URL real do seu crew no painel.
1. **Descoberta**: Chame `GET /inputs` para entender o que seu crew precisa
2. **Execução**: Envie os inputs via `POST /kickoff` para iniciar o processamento
3. **Monitoramento**: Faça polling em `GET /status/{kickoff_id}` até a conclusão
3. **Monitoramento**: Faça polling em `GET /{kickoff_id}/status` até a conclusão
4. **Resultados**: Extraia o output final da resposta concluída
## Tratamento de Erros

View File

@@ -1,6 +1,6 @@
---
title: "GET /status/{kickoff_id}"
title: "GET /{kickoff_id}/status"
description: "Obter o status da execução"
openapi: "/enterprise-api.pt-BR.yaml GET /status/{kickoff_id}"
openapi: "/enterprise-api.pt-BR.yaml GET /{kickoff_id}/status"
mode: "wide"
---

View File

@@ -4,80 +4,6 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="13 mai 2026">
## v1.14.5a5
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a5)
## O Que Mudou
### Recursos
- Deprecar CrewAgentExecutor, definir agentes Crew como AgentExecutor
- Melhorar ferramentas de sandbox Daytona
### Correções de Bugs
- Corrigir bloco de código ausente no guia de primeiro fluxo em pt-BR
- Registrar falhas de pré-revisão e destilação HITL, adicionar learn_strict
- Corrigir urllib3 para vulnerabilidades de segurança
- Corrigir gitpython e langchain-core; ignorar CVE paramiko não corrigido
- Atualizar todos os pacotes de workspace publicados no bloqueio/sincronização uv
### Documentação
- Adicionar guia de migração de `inputs.id` para `restoreFromStateId`
- Adicionar guia de atualização OSS e migração de crew para flow
- Atualizar changelog e versão para v1.14.5a4
## Contribuidores
@akaKuruma, @greysonlalonde, @iris-clawd, @lorenzejay, @mislavivanda
</Update>
<Update label="09 mai 2026">
## v1.14.5a4
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a4)
## O que Mudou
### Funcionalidades
- Atualizar listagens de LLM
### Correções de Bugs
- Corrigir problema de dependência movendo `textual` para `crewai-cli` e adicionando `certifi`
### Documentação
- Atualizar changelog e versão para v1.14.5a3
## Contribuidores
@cgoeppinger, @greysonlalonde
</Update>
<Update label="07 mai 2026">
## v1.14.5a3
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a3)
## O que Mudou
### Correções de Bugs
- Corrigir o caminho do endpoint de status de /{kickoff_id}/status para /status/{kickoff_id}
- Atualizar a dependência gitpython para a versão >=3.1.47 para conformidade de segurança
### Refatoração
- Extrair CLI para o pacote independente crewai-cli
### Documentação
- Atualizar o changelog e a versão para v1.14.5a2
## Contributors
@greysonlalonde, @iris-clawd
</Update>
<Update label="04 mai 2026">
## v1.14.5a2

View File

@@ -24,63 +24,7 @@ Os flows permitem que você crie fluxos de trabalho estruturados e orientados po
Vamos criar um Flow simples no qual você usará a OpenAI para gerar uma cidade aleatória em uma tarefa e, em seguida, usará essa cidade para gerar uma curiosidade em outra tarefa.
```python Code
from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
load_dotenv()
class ExampleFlow(Flow):
model = "gpt-4o-mini"
@start()
def generate_city(self):
print("Starting flow")
# Cada estado do flow recebe automaticamente um ID único
print(f"Flow State ID: {self.state['id']}")
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": "Return the name of a random city in the world.",
},
],
)
random_city = response["choices"][0]["message"]["content"]
# Armazena a cidade no nosso estado
self.state["city"] = random_city
print(f"Random City: {random_city}")
return random_city
@listen(generate_city)
def generate_fun_fact(self, random_city):
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": f"Tell me a fun fact about {random_city}",
},
],
)
fun_fact = response["choices"][0]["message"]["content"]
# Armazena a curiosidade no nosso estado
self.state["fun_fact"] = fun_fact
return fun_fact
flow = ExampleFlow()
flow.plot()
result = flow.kickoff()
print(f"Generated fun fact: {result}")
# (O código não é traduzido)
```
Na ilustração acima, criamos um Flow simples que gera uma cidade aleatória usando a OpenAI e depois cria uma curiosidade sobre essa cidade. O Flow consiste em duas tarefas: `generate_city` e `generate_fun_fact`. A tarefa `generate_city` é o ponto de início do Flow, enquanto a tarefa `generate_fun_fact` fica escutando o resultado da tarefa `generate_city`.
@@ -112,16 +56,12 @@ O decorador `@listen()` pode ser usado de várias formas:
1. **Escutando um Método pelo Nome**: Você pode passar o nome do método ao qual deseja escutar como string. Quando esse método concluir, o método ouvinte será chamado.
```python Code
@listen("generate_city")
def generate_fun_fact(self, random_city):
# Implementação
# (O código não é traduzido)
```
2. **Escutando um Método Diretamente**: Você pode passar o próprio método. Quando esse método concluir, o método ouvinte será chamado.
```python Code
@listen(generate_city)
def generate_fun_fact(self, random_city):
# Implementação
# (O código não é traduzido)
```
### Saída de um Flow
@@ -136,24 +76,7 @@ Veja como acessar a saída final:
<CodeGroup>
```python Code
from crewai.flow.flow import Flow, listen, start
class OutputExampleFlow(Flow):
@start()
def first_method(self):
return "Output from first_method"
@listen(first_method)
def second_method(self, first_output):
return f"Second method received: {first_output}"
flow = OutputExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print("---- Final Output ----")
print(final_output)
# (O código não é traduzido)
```
```text Output
@@ -174,34 +97,8 @@ Além de recuperar a saída final, você pode acessar e atualizar o estado dentr
Veja um exemplo de como atualizar e acessar o estado:
<CodeGroup>
```python Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
counter: int = 0
message: str = ""
class StateExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
self.state.message = "Hello from first_method"
self.state.counter += 1
@listen(first_method)
def second_method(self):
self.state.message += " - updated by second_method"
self.state.counter += 1
return self.state.message
flow = StateExampleFlow()
flow.plot("my_flow_plot")
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)
# (O código não é traduzido)
```
```text Output
@@ -231,33 +128,7 @@ Essa abordagem oferece flexibilidade, permitindo que o desenvolvedor adicione ou
Mesmo com estados não estruturados, os flows do CrewAI geram e mantêm automaticamente um identificador único (UUID) para cada instância de estado.
```python Code
from crewai.flow.flow import Flow, listen, start
class UnstructuredExampleFlow(Flow):
@start()
def first_method(self):
# O estado inclui automaticamente um campo 'id'
print(f"State ID: {self.state['id']}")
self.state['counter'] = 0
self.state['message'] = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state['counter'] += 1
self.state['message'] += " - updated"
@listen(second_method)
def third_method(self):
self.state['counter'] += 1
self.state['message'] += " - updated again"
print(f"State after third_method: {self.state}")
flow = UnstructuredExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
# (O código não é traduzido)
```
![Flow Visual image](/images/crewai-flow-3.png)
@@ -277,39 +148,7 @@ Ao usar modelos como o `BaseModel` da Pydantic, os desenvolvedores podem definir
Cada estado nos flows do CrewAI recebe automaticamente um identificador único (UUID) para ajudar no rastreamento e gerenciamento. Esse ID é gerado e mantido automaticamente pelo sistema de flows.
```python Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
# Nota: o campo 'id' é adicionado automaticamente a todos os estados
counter: int = 0
message: str = ""
class StructuredExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
# Acesse o ID gerado automaticamente, se necessário
print(f"State ID: {self.state.id}")
self.state.message = "Hello from structured flow"
@listen(first_method)
def second_method(self):
self.state.counter += 1
self.state.message += " - updated"
@listen(second_method)
def third_method(self):
self.state.counter += 1
self.state.message += " - updated again"
print(f"State after third_method: {self.state}")
flow = StructuredExampleFlow()
flow.kickoff()
# (O código não é traduzido)
```
![Flow Visual image](/images/crewai-flow-3.png)
@@ -343,19 +182,7 @@ O decorador @persist permite a persistência automática do estado nos flows do
Quando aplicado no nível da classe, o decorador @persist garante a persistência automática de todos os estados dos métodos do flow:
```python
@persist # Usa SQLiteFlowPersistence por padrão
class MyFlow(Flow[MyState]):
@start()
def initialize_flow(self):
# Este método terá seu estado persistido automaticamente
self.state.counter = 1
print("Initialized flow. State ID:", self.state.id)
@listen(initialize_flow)
def next_step(self):
# O estado (incluindo self.state.id) é recarregado automaticamente
self.state.counter += 1
print("Flow state is persisted. Counter:", self.state.counter)
# (O código não é traduzido)
```
### Persistência no Nível de Método
@@ -363,14 +190,7 @@ class MyFlow(Flow[MyState]):
Para um controle mais granular, você pode aplicar @persist em métodos específicos:
```python
class AnotherFlow(Flow[dict]):
@persist # Persiste apenas o estado deste método
@start()
def begin(self):
if "runs" not in self.state:
self.state["runs"] = 0
self.state["runs"] += 1
print("Method-level persisted runs:", self.state["runs"])
# (O código não é traduzido)
```
### Forking de Estado Persistido
@@ -462,29 +282,8 @@ A arquitetura de persistência enfatiza precisão técnica e opções de persona
A função `or_` nos flows permite escutar múltiplos métodos e acionar o método ouvinte quando qualquer um dos métodos especificados gerar uma saída.
<CodeGroup>
```python Code
from crewai.flow.flow import Flow, listen, or_, start
class OrExampleFlow(Flow):
@start()
def start_method(self):
return "Hello from the start method"
@listen(start_method)
def second_method(self):
return "Hello from the second method"
@listen(or_(start_method, second_method))
def logger(self, result):
print(f"Logger: {result}")
flow = OrExampleFlow()
flow.plot("my_flow_plot")
flow.kickoff()
# (O código não é traduzido)
```
```text Output
@@ -503,28 +302,8 @@ A função `or_` serve para escutar vários métodos e disparar o método ouvint
A função `and_` nos flows permite escutar múltiplos métodos e acionar o método ouvinte apenas quando todos os métodos especificados emitirem uma saída.
<CodeGroup>
```python Code
from crewai.flow.flow import Flow, and_, listen, start
class AndExampleFlow(Flow):
@start()
def start_method(self):
self.state["greeting"] = "Hello from the start method"
@listen(start_method)
def second_method(self):
self.state["joke"] = "What do computers eat? Microchips."
@listen(and_(start_method, second_method))
def logger(self):
print("---- Logger ----")
print(self.state)
flow = AndExampleFlow()
flow.plot()
flow.kickoff()
# (O código não é traduzido)
```
```text Output
@@ -544,42 +323,8 @@ O decorador `@router()` nos flows permite definir lógica de roteamento condicio
Você pode especificar diferentes rotas conforme a saída do método, permitindo controlar o fluxo de execução de forma dinâmica.
<CodeGroup>
```python Code
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class ExampleState(BaseModel):
success_flag: bool = False
class RouterFlow(Flow[ExampleState]):
@start()
def start_method(self):
print("Starting the structured flow")
random_boolean = random.choice([True, False])
self.state.success_flag = random_boolean
@router(start_method)
def second_method(self):
if self.state.success_flag:
return "success"
else:
return "failed"
@listen("success")
def third_method(self):
print("Third method running")
@listen("failed")
def fourth_method(self):
print("Fourth method running")
flow = RouterFlow()
flow.plot("my_flow_plot")
flow.kickoff()
# (O código não é traduzido)
```
```text Output
@@ -656,105 +401,7 @@ Para um guia completo sobre feedback humano em flows, incluindo feedback assínc
Os agentes podem ser integrados facilmente aos seus flows, oferecendo uma alternativa leve às crews completas quando você precisar executar tarefas simples e focadas. Veja um exemplo de como utilizar um agente em um flow para realizar uma pesquisa de mercado:
```python
import asyncio
from typing import Any, Dict, List
from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start
# Define um formato de saída estruturado
class MarketAnalysis(BaseModel):
key_trends: List[str] = Field(description="List of identified market trends")
market_size: str = Field(description="Estimated market size")
competitors: List[str] = Field(description="Major competitors in the space")
# Define o estado do flow
class MarketResearchState(BaseModel):
product: str = ""
analysis: MarketAnalysis | None = None
# Cria uma classe de flow
class MarketResearchFlow(Flow[MarketResearchState]):
@start()
def initialize_research(self) -> Dict[str, Any]:
print(f"Starting market research for {self.state.product}")
return {"product": self.state.product}
@listen(initialize_research)
async def analyze_market(self) -> Dict[str, Any]:
# Cria um agente para pesquisa de mercado
analyst = Agent(
role="Market Research Analyst",
goal=f"Analyze the market for {self.state.product}",
backstory="You are an experienced market analyst with expertise in "
"identifying market trends and opportunities.",
tools=[SerperDevTool()],
verbose=True,
)
# Define a consulta de pesquisa
query = f"""
Research the market for {self.state.product}. Include:
1. Key market trends
2. Market size
3. Major competitors
Format your response according to the specified structure.
"""
# Executa a análise com formato de saída estruturado
result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
if result.pydantic:
print("result", result.pydantic)
else:
print("result", result)
# Retorna a análise para atualizar o estado
return {"analysis": result.pydantic}
@listen(analyze_market)
def present_results(self, analysis) -> None:
print("\nMarket Analysis Results")
print("=====================")
if isinstance(analysis, dict):
# Se recebemos um dict com a chave 'analysis', extrai o objeto de análise real
market_analysis = analysis.get("analysis")
else:
market_analysis = analysis
if market_analysis and isinstance(market_analysis, MarketAnalysis):
print("\nKey Market Trends:")
for trend in market_analysis.key_trends:
print(f"- {trend}")
print(f"\nMarket Size: {market_analysis.market_size}")
print("\nMajor Competitors:")
for competitor in market_analysis.competitors:
print(f"- {competitor}")
else:
print("No structured analysis data available.")
print("Raw analysis:", analysis)
# Exemplo de uso
async def run_flow():
flow = MarketResearchFlow()
flow.plot("MarketResearchFlowPlot")
result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
return result
# Executa o flow
if __name__ == "__main__":
asyncio.run(run_flow())
# (O código não é traduzido)
```
![Flow Visual image](/images/crewai-flow-7.png)
@@ -816,50 +463,7 @@ No arquivo `main.py`, você cria seu flow e conecta as crews. É possível defin
Veja um exemplo de como conectar a `poem_crew` no arquivo `main.py`:
```python Code
#!/usr/bin/env python
from random import randint
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew
class PoemState(BaseModel):
sentence_count: int = 1
poem: str = ""
class PoemFlow(Flow[PoemState]):
@start()
def generate_sentence_count(self):
print("Generating sentence count")
self.state.sentence_count = randint(1, 5)
@listen(generate_sentence_count)
def generate_poem(self):
print("Generating poem")
result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})
print("Poem generated", result.raw)
self.state.poem = result.raw
@listen(generate_poem)
def save_poem(self):
print("Saving poem")
with open("poem.txt", "w") as f:
f.write(self.state.poem)
def kickoff():
poem_flow = PoemFlow()
poem_flow.kickoff()
def plot():
poem_flow = PoemFlow()
poem_flow.plot("PoemFlowPlot")
if __name__ == "__main__":
kickoff()
plot()
# (O código não é traduzido)
```
Neste exemplo, a classe `PoemFlow` define um fluxo que gera a quantidade de frases, usa a `PoemCrew` para gerar um poema e, depois, salva o poema em um arquivo. O flow inicia com o método `kickoff()`, e o gráfico é gerado pelo método `plot()`.
@@ -911,8 +515,7 @@ O CrewAI oferece duas formas práticas de gerar plots dos seus flows:
Se estiver trabalhando diretamente com uma instância do flow, basta chamar o método `plot()` do objeto. Isso criará um arquivo HTML com o plot interativo do seu flow.
```python Code
# Considerando que você já tem uma instância do flow
flow.plot("my_flow_plot")
# (O código não é traduzido)
```
Esse comando gera um arquivo chamado `my_flow_plot.html` no diretório atual. Abra esse arquivo em um navegador para visualizar o plot interativo.

View File

@@ -266,165 +266,7 @@ Nosso flow irá:
Vamos criar nosso flow no arquivo `main.py`:
```python
#!/usr/bin/env python
import json
import os
from typing import List, Dict
from pydantic import BaseModel, Field
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
from guide_creator_flow.crews.content_crew.content_crew import ContentCrew
# Definir nossos modelos para dados estruturados
class Section(BaseModel):
title: str = Field(description="Title of the section")
description: str = Field(description="Brief description of what the section should cover")
class GuideOutline(BaseModel):
title: str = Field(description="Title of the guide")
introduction: str = Field(description="Introduction to the topic")
target_audience: str = Field(description="Description of the target audience")
sections: List[Section] = Field(description="List of sections in the guide")
conclusion: str = Field(description="Conclusion or summary of the guide")
# Definir o estado do nosso flow
class GuideCreatorState(BaseModel):
topic: str = ""
audience_level: str = ""
guide_outline: GuideOutline = None
sections_content: Dict[str, str] = {}
class GuideCreatorFlow(Flow[GuideCreatorState]):
"""Flow para criar um guia abrangente sobre qualquer tópico"""
@start()
def get_user_input(self):
"""Obter entrada do usuário sobre o tópico e público do guia"""
print("\n=== Create Your Comprehensive Guide ===\n")
# Obter entrada do usuário
self.state.topic = input("What topic would you like to create a guide for? ")
# Obter nível do público com validação
while True:
audience = input("Who is your target audience? (beginner/intermediate/advanced) ").lower()
if audience in ["beginner", "intermediate", "advanced"]:
self.state.audience_level = audience
break
print("Please enter 'beginner', 'intermediate', or 'advanced'")
print(f"\nCreating a guide on {self.state.topic} for {self.state.audience_level} audience...\n")
return self.state
@listen(get_user_input)
def create_guide_outline(self, state):
"""Criar um esboço estruturado para o guia usando uma chamada direta ao LLM"""
print("Creating guide outline...")
# Inicializar o LLM
llm = LLM(model="openai/gpt-4o-mini", response_format=GuideOutline)
# Criar as mensagens para o esboço
messages = [
{"role": "system", "content": "You are a helpful assistant designed to output JSON."},
{"role": "user", "content": f"""
Create a detailed outline for a comprehensive guide on "{state.topic}" for {state.audience_level} level learners.
The outline should include:
1. A compelling title for the guide
2. An introduction to the topic
3. 4-6 main sections that cover the most important aspects of the topic
4. A conclusion or summary
For each section, provide a clear title and a brief description of what it should cover.
"""}
]
# Fazer a chamada ao LLM com formato de resposta JSON
response = llm.call(messages=messages)
# Analisar a resposta JSON
outline_dict = json.loads(response)
self.state.guide_outline = GuideOutline(**outline_dict)
# Garantir que o diretório de saída exista antes de salvar
os.makedirs("output", exist_ok=True)
# Salvar o esboço em um arquivo
with open("output/guide_outline.json", "w") as f:
json.dump(outline_dict, f, indent=2)
print(f"Guide outline created with {len(self.state.guide_outline.sections)} sections")
return self.state.guide_outline
@listen(create_guide_outline)
def write_and_compile_guide(self, outline):
"""Escrever todas as seções e compilar o guia"""
print("Writing guide sections and compiling...")
completed_sections = []
# Processar seções uma por uma para manter o fluxo de contexto
for section in outline.sections:
print(f"Processing section: {section.title}")
# Construir contexto a partir das seções anteriores
previous_sections_text = ""
if completed_sections:
previous_sections_text = "# Previously Written Sections\n\n"
for title in completed_sections:
previous_sections_text += f"## {title}\n\n"
previous_sections_text += self.state.sections_content.get(title, "") + "\n\n"
else:
previous_sections_text = "No previous sections written yet."
# Executar a crew de conteúdo para esta seção
result = ContentCrew().crew().kickoff(inputs={
"section_title": section.title,
"section_description": section.description,
"audience_level": self.state.audience_level,
"previous_sections": previous_sections_text,
"draft_content": ""
})
# Armazenar o conteúdo
self.state.sections_content[section.title] = result.raw
completed_sections.append(section.title)
print(f"Section completed: {section.title}")
# Compilar o guia final
guide_content = f"# {outline.title}\n\n"
guide_content += f"## Introduction\n\n{outline.introduction}\n\n"
# Adicionar cada seção em ordem
for section in outline.sections:
section_content = self.state.sections_content.get(section.title, "")
guide_content += f"\n\n{section_content}\n\n"
# Adicionar conclusão
guide_content += f"## Conclusion\n\n{outline.conclusion}\n\n"
# Salvar o guia
with open("output/complete_guide.md", "w") as f:
f.write(guide_content)
print("\nComplete guide compiled and saved to output/complete_guide.md")
return "Guide creation completed successfully"
def kickoff():
"""Executar o flow criador de guias"""
GuideCreatorFlow().kickoff()
print("\n=== Flow Complete ===")
print("Your comprehensive guide is ready in the output directory.")
print("Open output/complete_guide.md to view it.")
def plot():
"""Gerar uma visualização do flow"""
flow = GuideCreatorFlow()
flow.plot("guide_creator_flow")
print("Flow visualization saved to guide_creator_flow.html")
if __name__ == "__main__":
kickoff()
# [CÓDIGO NÃO TRADUZIDO, MANTER COMO ESTÁ]
```
Vamos analisar o que está acontecendo neste flow:

View File

@@ -1,142 +0,0 @@
---
title: "Migrando de inputs.id para restore_from_state_id"
description: "Mover fluxos @persist da hidratação obsoleta inputs.id para o campo suportado restore_from_state_id"
icon: "arrow-right-arrow-left"
---
<Warning>
Passar `id` dentro de `inputs` para hidratar um fluxo `@persist` é **obsoleto** e
programado para remoção em uma versão futura. A substituição, `restore_from_state_id`,
está disponível no CrewAI **v1.14.5 e posterior** — os passos abaixo se aplicam uma vez que você
faça a atualização.
</Warning>
## Visão Geral
A maneira documentada de hidratar um fluxo `@persist` de uma execução anterior é passar
o UUID dessa execução como `inputs.id`. O CrewAI agora expõe um campo dedicado,
`restore_from_state_id`, que realiza a mesma hidratação sem sobrecarregar a
carga útil de `inputs` — e sem acoplar a chave de hidratação à identidade da nova execução.
## Migração
Se você atualmente inicia um fluxo `@persist` com `inputs={"id": ...}`:
```python
# Obsoleto
flow = CounterFlow()
flow.kickoff(inputs={"id": "abcd1234-5678-90ef-ghij-klmnopqrstuv"})
```
Mude para `restore_from_state_id`:
```python
# Suportado
flow = CounterFlow()
flow.kickoff(restore_from_state_id="abcd1234-5678-90ef-ghij-klmnopqrstuv")
```
Os dois modos têm semânticas de linhagem diferentes:
- `inputs={"id": <uuid>}` (obsoleto) — **retomar**: as gravações são feitas sob o id fornecido,
estendendo a mesma história de `flow_uuid`.
- `restore_from_state_id=<uuid>` — **dividir**: hidrata o estado a partir de um snapshot, então
grava sob um novo `state.id`. A história do fluxo de origem é preservada.
Para a maioria dos cenários de produção — reexecutar um fluxo hidratado de um estado anterior — criar um fork
é o que você deseja. Veja [Dominando o Estado do Fluxo](/pt-BR/guides/flows/mastering-flow-state)
para o modelo mental completo.
Se você iniciar seu fluxo pela API REST do CrewAI AMP, veja [AMP](#amp) abaixo para a
migração equivalente da carga útil.
## Por que estamos descontinuando `inputs.id` para `@persist`?
`inputs.id` é atualmente a maneira documentada de retomar um fluxo `@persist` de uma
execução anterior. O problema é que o mesmo UUID faz duas funções ao mesmo tempo:
1. **Seleciona qual snapshot o `@persist` usa para hidratar** — carrega o estado salvo
sob aquele UUID.
2. **Torna-se o ID de Execução do Fluxo da nova execução** (`state.id` no SDK;
apresentado como `flow_id` em alguns contextos) — cada gravação `@persist` a partir desta
inicialização também cai sob aquele mesmo UUID.
Esse papel duplo é a causa raiz dos problemas que este guia descreve. Como o
UUID fornecido também é o id da nova execução, duas inicializações que passam o mesmo
`inputs.id` não são duas execuções distintas — elas compartilham um id, compartilham um registro
de persistência e (no AMP) compartilham uma linha na lista de execuções. Não há como dizer
"hidratar a partir deste snapshot, mas registrar esta execução separadamente" sem dividir as
duas responsabilidades.
`restore_from_state_id` é essa divisão. Ele informa ao `@persist` de qual snapshot hidratar,
enquanto deixa a nova execução livre para receber um novo `state.id`. A
fonte de hidratação e a execução registrada não são mais o mesmo UUID — que é o que
a maioria dos cenários de produção realmente deseja.
## Cronograma de remoção
`inputs.id` para hidratação `@persist` está programado para remoção em uma versão futura do
CrewAI. Não há um corte imediato — fluxos existentes continuam a funcionar — mas
uma vez que você atualize para v1.14.5 ou posterior, novo código deve usar `restore_from_state_id`, e
fluxos existentes devem migrar na próxima oportunidade conveniente.
## AMP
Se você implantar seu fluxo no CrewAI AMP, a migração se estende à carga útil de inicialização
enviada para sua Crew implantada, e os sintomas visíveis de reutilização de `inputs.id` aparecem
no painel de controle de implantação. As duas subseções abaixo cobrem ambos.
### Migrando a carga útil de inicialização
Se você atualmente inicia um fluxo implantado incorporando `id` em `inputs`:
```bash
# Obsoleto
curl -X POST \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_CREW_TOKEN" \
-d '{"inputs": {"id": "abcd1234-5678-90ef-ghij-klmnopqrstuv", "topic": "AI Agent Frameworks"}}' \
https://your-crew-url.crewai.com/kickoff
```
Mova o UUID para o campo `restoreFromStateId` de nível superior:
```bash
# Suportado
curl -X POST \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_CREW_TOKEN" \
-d '{
"inputs": {"topic": "AI Agent Frameworks"},
"restoreFromStateId": "abcd1234-5678-90ef-ghij-klmnopqrstuv"
}' \
https://your-crew-url.crewai.com/kickoff
```
`restoreFromStateId` fica ao lado de `inputs` na carga útil de inicialização, não dentro dela. O
objeto `inputs` agora carrega apenas valores que seu fluxo realmente consome.
### O que acontece quando `inputs.id` é reutilizado
Quando o AMP recebe um kickoff para um fluxo cujo `inputs.id` corresponde a uma execução
existente, ele resolve para o registro existente em vez de criar um novo. A partir
do painel de controle de implantação, você verá:
- **Status da execução** — o status da nova execução sobrescreve o status da execução anterior. Uma
execução finalizada pode voltar para `running`, ou uma execução `completed` pode mudar para
`error` se a nova inicialização falhar — de qualquer forma, o painel não reflete mais
a execução original.
- **Rastros** — Os OTel traces se acumulam entre as inicializações porque compartilham o mesmo
id de execução; os traces da execução anterior são substituídos ou misturados
com os da nova execução. Uma reprodução passo a passo não corresponde mais a uma única execução.
- **Lista de execuções** — kickoffs que deveriam aparecer como linhas separadas colapsam em
uma única entrada, ocultando o histórico.
Migrar para `restoreFromStateId` mantém cada kickoff como sua própria execução — com
seu próprio status, traces e entrada na lista — enquanto ainda hidrata o estado de uma
execução anterior.
<Card title="Precisa de Ajuda?" icon="headset" href="mailto:support@crewai.com">
Entre em contato com nossa equipe de suporte se você não tiver certeza de qual modo seu fluxo precisa ou se encontrar problemas
durante a migração.
</Card>

View File

@@ -63,60 +63,7 @@ Com estado não estruturado:
Veja um exemplo simples de gerenciamento de estado não estruturado:
```python
from crewai.flow.flow import Flow, listen, start
class UnstructuredStateFlow(Flow):
@start()
def initialize_data(self):
print("Initializing flow data")
# Adiciona pares chave-valor ao estado
self.state["user_name"] = "Alex"
self.state["preferences"] = {
"theme": "dark",
"language": "English"
}
self.state["items"] = []
# O estado do flow recebe automaticamente um ID único
print(f"Flow ID: {self.state['id']}")
return "Initialized"
@listen(initialize_data)
def process_data(self, previous_result):
print(f"Previous step returned: {previous_result}")
# Acessa e modifica o estado
user = self.state["user_name"]
print(f"Processing data for {user}")
# Adiciona itens a uma lista no estado
self.state["items"].append("item1")
self.state["items"].append("item2")
# Adiciona um novo par chave-valor
self.state["processed"] = True
return "Processed"
@listen(process_data)
def generate_summary(self, previous_result):
# Acessa múltiplos valores do estado
user = self.state["user_name"]
theme = self.state["preferences"]["theme"]
items = self.state["items"]
processed = self.state.get("processed", False)
summary = f"User {user} has {len(items)} items with {theme} theme. "
summary += "Data is processed." if processed else "Data is not processed."
return summary
# Executa o flow
flow = UnstructuredStateFlow()
result = flow.kickoff()
print(f"Final result: {result}")
print(f"Final state: {flow.state}")
# código não traduzido
```
### Quando Usar Estado Não Estruturado
@@ -147,63 +94,7 @@ Ao utilizar estado estruturado:
Veja como implementar o gerenciamento de estado estruturado:
```python
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
# Define o modelo de estado
class UserPreferences(BaseModel):
theme: str = "light"
language: str = "English"
class AppState(BaseModel):
user_name: str = ""
preferences: UserPreferences = UserPreferences()
items: List[str] = []
processed: bool = False
completion_percentage: float = 0.0
# Cria um flow com estado tipado
class StructuredStateFlow(Flow[AppState]):
@start()
def initialize_data(self):
print("Initializing flow data")
# Define valores do estado (com checagem de tipo)
self.state.user_name = "Taylor"
self.state.preferences.theme = "dark"
# O campo ID está disponível automaticamente
print(f"Flow ID: {self.state.id}")
return "Initialized"
@listen(initialize_data)
def process_data(self, previous_result):
print(f"Processing data for {self.state.user_name}")
# Modifica o estado (com checagem de tipo)
self.state.items.append("item1")
self.state.items.append("item2")
self.state.processed = True
self.state.completion_percentage = 50.0
return "Processed"
@listen(process_data)
def generate_summary(self, previous_result):
# Acessa o estado (com autocompletar)
summary = f"User {self.state.user_name} has {len(self.state.items)} items "
summary += f"with {self.state.preferences.theme} theme. "
summary += "Data is processed." if self.state.processed else "Data is not processed."
summary += f" Completion: {self.state.completion_percentage}%"
return summary
# Executa o flow
flow = StructuredStateFlow()
result = flow.kickoff()
print(f"Final result: {result}")
print(f"Final state: {flow.state}")
# código não traduzido
```
### Benefícios do Estado Estruturado
@@ -247,29 +138,7 @@ Independente de você usar estado estruturado ou não estruturado, é possível
Métodos do flow podem retornar valores que serão passados como argumento para métodos listeners:
```python
from crewai.flow.flow import Flow, listen, start
class DataPassingFlow(Flow):
@start()
def generate_data(self):
# Este valor de retorno será passado para os métodos listeners
return "Generated data"
@listen(generate_data)
def process_data(self, data_from_previous_step):
print(f"Received: {data_from_previous_step}")
# Você pode modificar os dados e repassá-los adiante
processed_data = f"{data_from_previous_step} - processed"
# Também atualiza o estado
self.state["last_processed"] = processed_data
return processed_data
@listen(process_data)
def finalize_data(self, processed_data):
print(f"Received processed data: {processed_data}")
# Acessa tanto os dados passados quanto o estado
last_processed = self.state.get("last_processed", "")
return f"Final: {processed_data} (from state: {last_processed})"
# código não traduzido
```
Esse padrão permite combinar passagem de dados direta com atualizações de estado para obter máxima flexibilidade.
@@ -287,36 +156,7 @@ O decorador `@persist()` automatiza a persistência de estado, salvando o estado
Ao aplicar em nível de classe, `@persist()` salva o estado após cada execução de método:
```python
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
value: int = 0
@persist() # Aplica à classe inteira do flow
class PersistentCounterFlow(Flow[CounterState]):
@start()
def increment(self):
self.state.value += 1
print(f"Incremented to {self.state.value}")
return self.state.value
@listen(increment)
def double(self, value):
self.state.value = value * 2
print(f"Doubled to {self.state.value}")
return self.state.value
# Primeira execução
flow1 = PersistentCounterFlow()
result1 = flow1.kickoff()
print(f"First run result: {result1}")
# Segunda execução - passa o ID para carregar o estado persistido
flow2 = PersistentCounterFlow()
result2 = flow2.kickoff(inputs={"id": flow1.state.id})
print(f"Second run result: {result2}") # Será maior devido ao estado persistido
# código não traduzido
```
#### Persistência em Nível de Método
@@ -324,26 +164,7 @@ print(f"Second run result: {result2}") # Será maior devido ao estado persistid
Para mais controle, você pode aplicar `@persist()` em métodos específicos:
```python
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
class SelectivePersistFlow(Flow):
@start()
def first_step(self):
self.state["count"] = 1
return "First step"
@persist() # Persiste apenas após este método
@listen(first_step)
def important_step(self, prev_result):
self.state["count"] += 1
self.state["important_data"] = "This will be persisted"
return "Important step completed"
@listen(important_step)
def final_step(self, prev_result):
self.state["count"] += 1
return f"Complete with count {self.state['count']}"
# código não traduzido
```
#### Forking de Estado Persistido
@@ -395,45 +216,7 @@ Notas sobre o comportamento:
Você pode usar o estado para implementar lógicas condicionais complexas em seus flows:
```python
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class PaymentState(BaseModel):
amount: float = 0.0
is_approved: bool = False
retry_count: int = 0
class PaymentFlow(Flow[PaymentState]):
@start()
def process_payment(self):
# Simula o processamento do pagamento
self.state.amount = 100.0
self.state.is_approved = self.state.amount < 1000
return "Payment processed"
@router(process_payment)
def check_approval(self, previous_result):
if self.state.is_approved:
return "approved"
elif self.state.retry_count < 3:
return "retry"
else:
return "rejected"
@listen("approved")
def handle_approval(self):
return f"Payment of ${self.state.amount} approved!"
@listen("retry")
def handle_retry(self):
self.state.retry_count += 1
print(f"Retrying payment (attempt {self.state.retry_count})...")
# Aqui poderia ser implementada a lógica de retry
return "Retry initiated"
@listen("rejected")
def handle_rejection(self):
return f"Payment of ${self.state.amount} rejected after {self.state.retry_count} retries."
# código não traduzido
```
### Manipulações Complexas de Estado
@@ -441,60 +224,7 @@ class PaymentFlow(Flow[PaymentState]):
Para transformar estados complexos, você pode criar métodos dedicados:
```python
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
from typing import List, Dict
class UserData(BaseModel):
name: str
active: bool = True
login_count: int = 0
class ComplexState(BaseModel):
users: Dict[str, UserData] = {}
active_user_count: int = 0
class TransformationFlow(Flow[ComplexState]):
@start()
def initialize(self):
# Adiciona alguns usuários
self.add_user("alice", "Alice")
self.add_user("bob", "Bob")
self.add_user("charlie", "Charlie")
return "Initialized"
@listen(initialize)
def process_users(self, _):
# Incrementa contagens de login
for user_id in self.state.users:
self.increment_login(user_id)
# Desativa um usuário
self.deactivate_user("bob")
# Atualiza a contagem de ativos
self.update_active_count()
return f"Processed {len(self.state.users)} users"
# Métodos auxiliares para transformações de estado
def add_user(self, user_id: str, name: str):
self.state.users[user_id] = UserData(name=name)
self.update_active_count()
def increment_login(self, user_id: str):
if user_id in self.state.users:
self.state.users[user_id].login_count += 1
def deactivate_user(self, user_id: str):
if user_id in self.state.users:
self.state.users[user_id].active = False
self.update_active_count()
def update_active_count(self):
self.state.active_user_count = sum(
1 for user in self.state.users.values() if user.active
)
# código não traduzido
```
Esse padrão de criar métodos auxiliares mantém seus métodos de flow limpos, enquanto permite manipulações complexas de estado.
@@ -508,71 +238,7 @@ Um dos padrões mais poderosos na CrewAI é combinar o gerenciamento de estado d
Você pode usar o estado do flow para parametrizar crews:
```python
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Process, Task
from pydantic import BaseModel
class ResearchState(BaseModel):
topic: str = ""
depth: str = "medium"
results: str = ""
class ResearchFlow(Flow[ResearchState]):
@start()
def get_parameters(self):
# Em uma aplicação real, isso pode vir da entrada do usuário
self.state.topic = "Artificial Intelligence Ethics"
self.state.depth = "deep"
return "Parameters set"
@listen(get_parameters)
def execute_research(self, _):
# Cria os agentes
researcher = Agent(
role="Research Specialist",
goal=f"Research {self.state.topic} in {self.state.depth} detail",
backstory="You are an expert researcher with a talent for finding accurate information."
)
writer = Agent(
role="Content Writer",
goal="Transform research into clear, engaging content",
backstory="You excel at communicating complex ideas clearly and concisely."
)
# Cria as tarefas
research_task = Task(
description=f"Research {self.state.topic} with {self.state.depth} analysis",
expected_output="Comprehensive research notes in markdown format",
agent=researcher
)
writing_task = Task(
description=f"Create a summary on {self.state.topic} based on the research",
expected_output="Well-written article in markdown format",
agent=writer,
context=[research_task]
)
# Cria e executa a crew
research_crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
verbose=True
)
# Executa a crew e armazena o resultado no estado
result = research_crew.kickoff()
self.state.results = result.raw
return "Research completed"
@listen(execute_research)
def summarize_results(self, _):
# Acessa os resultados armazenados
result_length = len(self.state.results)
return f"Research on {self.state.topic} completed with {result_length} characters of results."
# código não traduzido
```
### Manipulando Saídas de Crews no Estado
@@ -580,21 +246,7 @@ class ResearchFlow(Flow[ResearchState]):
Quando um crew finaliza, é possível processar sua saída e armazená-la no estado do flow:
```python
@listen(execute_crew)
def process_crew_results(self, _):
# Faz parsing dos resultados brutos (assumindo saída em JSON)
import json
try:
results_dict = json.loads(self.state.raw_results)
self.state.processed_results = {
"title": results_dict.get("title", ""),
"main_points": results_dict.get("main_points", []),
"conclusion": results_dict.get("conclusion", "")
}
return "Results processed successfully"
except json.JSONDecodeError:
self.state.error = "Failed to parse crew results as JSON"
return "Error processing results"
# código não traduzido
```
## Boas Práticas para Gerenciamento de Estado
@@ -604,19 +256,7 @@ def process_crew_results(self, _):
Projete seu estado para conter somente o necessário:
```python
# Abrangente demais
class BloatedState(BaseModel):
user_data: Dict = {}
system_settings: Dict = {}
temporary_calculations: List = []
debug_info: Dict = {}
# ...muitos outros campos
# Melhor: estado focado
class FocusedState(BaseModel):
user_id: str
preferences: Dict[str, str]
completion_status: Dict[str, bool]
# Exemplo não traduzido
```
### 2. Use Estado Estruturado em Flows Complexos
@@ -624,23 +264,7 @@ class FocusedState(BaseModel):
À medida que seus flows evoluem em complexidade, o estado estruturado se torna cada vez mais valioso:
```python
# Flow simples pode usar estado não estruturado
class SimpleGreetingFlow(Flow):
@start()
def greet(self):
self.state["name"] = "World"
return f"Hello, {self.state['name']}!"
# Flow complexo se beneficia de estado estruturado
class UserRegistrationState(BaseModel):
username: str
email: str
verification_status: bool = False
registration_date: datetime = Field(default_factory=datetime.now)
last_login: Optional[datetime] = None
class RegistrationFlow(Flow[UserRegistrationState]):
# Métodos com acesso ao estado fortemente tipado
# Exemplo não traduzido
```
### 3. Documente Transições de Estado
@@ -648,18 +272,7 @@ class RegistrationFlow(Flow[UserRegistrationState]):
Para flows complexos, documente como o estado muda ao longo da execução:
```python
@start()
def initialize_order(self):
"""
Initialize order state with empty values.
State before: {}
State after: {order_id: str, items: [], status: 'new'}
"""
self.state.order_id = str(uuid.uuid4())
self.state.items = []
self.state.status = "new"
return "Order initialized"
# Exemplo não traduzido
```
### 4. Trate Erros de Estado de Forma Elegante
@@ -667,18 +280,7 @@ def initialize_order(self):
Implemente tratamento de erros ao acessar o estado:
```python
@listen(previous_step)
def process_data(self, _):
try:
# Tenta acessar um valor que pode não existir
user_preference = self.state.preferences.get("theme", "default")
except (AttributeError, KeyError):
# Trata o erro de forma elegante
self.state.errors = self.state.get("errors", [])
self.state.errors.append("Failed to access preferences")
user_preference = "default"
return f"Used preference: {user_preference}"
# Exemplo não traduzido
```
### 5. Use o Estado Para Acompanhar o Progresso
@@ -686,30 +288,7 @@ def process_data(self, _):
Aproveite o estado para monitorar o progresso em flows de longa duração:
```python
class ProgressTrackingFlow(Flow):
@start()
def initialize(self):
self.state["total_steps"] = 3
self.state["current_step"] = 0
self.state["progress"] = 0.0
self.update_progress()
return "Initialized"
def update_progress(self):
"""Helper method to calculate and update progress"""
if self.state.get("total_steps", 0) > 0:
self.state["progress"] = (self.state.get("current_step", 0) /
self.state["total_steps"]) * 100
print(f"Progress: {self.state['progress']:.1f}%")
@listen(initialize)
def step_one(self, _):
# Realiza o trabalho...
self.state["current_step"] = 1
self.update_progress()
return "Step 1 complete"
# Etapas adicionais...
# Exemplo não traduzido
```
### 6. Prefira Operações Imutáveis Quando Possível
@@ -717,22 +296,7 @@ class ProgressTrackingFlow(Flow):
Especialmente com estado estruturado, prefira operações imutáveis para maior clareza:
```python
# Em vez de modificar listas no local:
self.state.items.append(new_item) # Operação mutável
# Considere criar um novo estado:
from pydantic import BaseModel
from typing import List
class ItemState(BaseModel):
items: List[str] = []
class ImmutableFlow(Flow[ItemState]):
@start()
def add_item(self):
# Cria uma nova lista com o item adicionado
self.state.items = [*self.state.items, "new item"]
return "Item added"
# Exemplo não traduzido
```
## Depurando o Estado do Flow
@@ -742,24 +306,7 @@ class ImmutableFlow(Flow[ItemState]):
Ao desenvolver, adicione logs para acompanhar mudanças no estado:
```python
import logging
logging.basicConfig(level=logging.INFO)
class LoggingFlow(Flow):
def log_state(self, step_name):
logging.info(f"State after {step_name}: {self.state}")
@start()
def initialize(self):
self.state["counter"] = 0
self.log_state("initialize")
return "Initialized"
@listen(initialize)
def increment(self, _):
self.state["counter"] += 1
self.log_state("increment")
return f"Incremented to {self.state['counter']}"
# Exemplo não traduzido
```
### Visualizando o Estado
@@ -767,30 +314,7 @@ class LoggingFlow(Flow):
Você pode adicionar métodos para visualizar seu estado durante o debug:
```python
def visualize_state(self):
"""Create a simple visualization of the current state"""
import json
from rich.console import Console
from rich.panel import Panel
console = Console()
if hasattr(self.state, "model_dump"):
# Pydantic v2
state_dict = self.state.model_dump()
elif hasattr(self.state, "dict"):
# Pydantic v1
state_dict = self.state.dict()
else:
# Estado não estruturado
state_dict = dict(self.state)
# Remove o id para uma saída mais limpa
if "id" in state_dict:
state_dict.pop("id")
state_json = json.dumps(state_dict, indent=2, default=str)
console.print(Panel(state_json, title="Current Flow State"))
# Exemplo não traduzido
```
## Conclusão

View File

@@ -1,190 +0,0 @@
---
title: "Atualizando o CrewAI"
description: "Como atualizar o CrewAI no seu projeto e adaptar-se a breaking changes entre versões."
icon: "arrow-up-circle"
---
## Visão Geral
Os lançamentos do CrewAI trazem novos recursos regularmente. Este guia mostra os passos práticos para manter sua instalação atualizada — tanto a CLI quanto o ambiente virtual do seu projeto.
Se você está começando do zero, veja [Instalação](/pt-BR/installation). Se está vindo de outro framework, veja [Migrando do LangGraph](/pt-BR/guides/migration/migrating-from-langgraph).
---
## As Duas Coisas Que Você Pode Querer Atualizar
O CrewAI vive em dois lugares na sua máquina, e cada um se atualiza de forma independente:
| O quê | Como é instalado | Como atualizar |
|---|---|---|
| A **CLI global `crewai`** | `uv tool install crewai` | `uv tool install crewai --upgrade` |
| O **venv do projeto** (onde seu código roda) | `crewai install` / `uv sync` | `uv add "crewai[...]>=X.Y.Z"` e depois `crewai install` |
Esses dois podem — e frequentemente ficam — fora de sincronia. Rodar `crewai --version` mostra a versão da CLI. Rodar `uv pip show crewai` dentro do seu projeto mostra a versão do venv. Se forem diferentes, isso é normal; o que importa para o código em execução é a versão do venv.
## Por Que `crewai install` Sozinho Não Atualiza
`crewai install` é um wrapper fino em torno de `uv sync`. Ele instala exatamente o que o arquivo `uv.lock` atual diz — ele **não** muda nenhuma restrição de versão.
Se seu `pyproject.toml` diz `crewai>=1.11.1` e o lock file resolveu para `1.11.1`, executar `crewai install` vai te manter em `1.11.1` para sempre, mesmo que `1.14.4` esteja disponível.
Para realmente atualizar, você precisa:
1. Atualizar a restrição de versão em `pyproject.toml`
2. Re-resolver o lock file
3. Sincronizar o venv
`uv add` faz os três de uma vez só.
## Como Atualizar Seu Projeto
```bash
# Aumenta a restrição e re-resolve o lock em um único comando
uv add "crewai[tools]>=1.14.4"
# Sincroniza o venv (crewai install chama uv sync por baixo dos panos)
crewai install
# Verifica
uv pip show crewai
# → Version: 1.14.4
```
Substitua `[tools]` por quaisquer extras que seu projeto utilize (ex.: `[tools,anthropic]`). Verifique a lista de `dependencies` do seu `pyproject.toml` se estiver em dúvida.
<Note>
`uv add` atualiza tanto `pyproject.toml` **quanto** `uv.lock` atomicamente. Se você editar `pyproject.toml` manualmente, ainda precisa rodar `uv lock --upgrade-package crewai` para re-resolver o lock file antes que `crewai install` pegue a nova versão.
</Note>
## Atualizando a CLI Global
A CLI global é separada do seu projeto. Atualize com:
```bash
uv tool install crewai --upgrade
```
Se seu shell avisar sobre o `PATH` após a atualização, recarregue-o:
```bash
uv tool update-shell
```
Isso **não** mexe no venv do seu projeto — você ainda precisa de `uv add` + `crewai install` dentro do projeto.
## Verifique Se Ambos Estão em Sincronia
```bash
# Versão da CLI global
crewai --version
# Versão do venv do projeto
uv pip show crewai | grep Version
```
Eles não precisam coincidir — mas a versão do venv do projeto é o que importa para o comportamento em runtime.
<Note>
CrewAI requer `Python >=3.10, <3.14`. Se o `uv` foi instalado contra um interpretador mais antigo, recrie o venv do projeto com uma versão suportada do Python antes de rodar `crewai install`.
</Note>
---
## Breaking Changes e Notas de Migração
A maioria das atualizações requer apenas pequenos ajustes. As áreas abaixo são as que quebram silenciosamente ou com tracebacks confusos.
### Caminhos de import: tools e `BaseTool`
O caminho canônico para tools é `crewai.tools`. Caminhos antigos ainda aparecem em tutoriais, mas devem ser atualizados.
```python
# Antes
from crewai_tools import BaseTool
from crewai.agents.tools import tool
# Depois
from crewai.tools import BaseTool, tool
```
O decorador `@tool` e a subclasse `BaseTool` ambos vivem em `crewai.tools`. `AgentFinish` e outros símbolos internos do agente não fazem mais parte da superfície pública — se você os estava importando, mude para event listeners ou callbacks de `Task`.
### Mudanças de parâmetros em `Agent`
```python
from crewai import Agent
agent = Agent(
role="Researcher",
goal="Find authoritative sources on {topic}",
backstory="You are a careful, source-driven researcher.",
llm="gpt-4o-mini", # nome do modelo como string OU um objeto LLM
verbose=True, # bool, não um nível inteiro
max_iter=15, # default mudou entre versões — defina explicitamente
allow_delegation=False,
)
```
- `llm` aceita tanto um nome de modelo como string (resolvido pelo provedor configurado) quanto um objeto `LLM` para controle granular.
- `verbose` é um `bool` puro. Passar um inteiro não alterna mais níveis de log.
- Os defaults de `max_iter` mudaram entre releases. Se seu agente para silenciosamente de iterar após a primeira chamada de tool, defina `max_iter` explicitamente.
### Parâmetros de `Crew`
```python
from crewai import Crew, Process
crew = Crew(
agents=[...],
tasks=[...],
process=Process.sequential, # ou Process.hierarchical
memory=True,
cache=True,
embedder={"provider": "openai", "config": {"model": "text-embedding-3-small"}},
)
```
- `process=Process.hierarchical` requer ou `manager_llm=` ou `manager_agent=`. Sem um deles, o kickoff lança erro na validação.
- `memory=True` com um provedor de embedding não-default precisa de um dicionário `embedder` — veja [Configuração de memória e embedder](#memory-embedder-config) abaixo.
### Saída estruturada de `Task`
Use `output_pydantic`, `output_json` ou `output_file` para forçar o resultado de uma task em um formato tipado:
```python
from pydantic import BaseModel
from crewai import Task
class Article(BaseModel):
title: str
body: str
write = Task(
description="Write an article about {topic}",
expected_output="A short article with a title and body",
agent=writer,
output_pydantic=Article, # a classe, NÃO uma instância
output_file="output/article.md",
)
```
`output_pydantic` recebe a **classe** em si. Passar `Article(title="", body="")` é um erro comum e falha com um erro de validação confuso.
### Configuração de memória e embedder {#memory-embedder-config}
Se `memory=True` e você não está usando os embeddings padrão da OpenAI, é preciso passar um `embedder`:
```python
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
embedder={
"provider": "ollama",
"config": {"model": "nomic-embed-text"},
},
)
```
Defina as credenciais do provedor relevante (`OPENAI_API_KEY`, `OLLAMA_HOST`, etc.) no seu arquivo `.env`. Os caminhos de armazenamento de memória são locais ao projeto por default — apague o diretório de memória do projeto se trocar de embedder, já que dimensões diferentes não se misturam.

View File

@@ -13,7 +13,7 @@ The Daytona sandbox tools give CrewAI agents access to isolated, ephemeral compu
- **`DaytonaExecTool`** — run any shell command inside a sandbox.
- **`DaytonaPythonTool`** — execute a block of Python source code inside a sandbox.
- **`DaytonaFileTool`** — read, write, append, list, delete, and inspect files inside a sandbox; also supports `move`, `find` (content grep), `search` (filename glob), `chmod` (permissions), `replace` (bulk find-and-replace), and `exists`.
- **`DaytonaFileTool`** — read, write, append, list, delete, and inspect files inside a sandbox.
All three tools share the same sandbox lifecycle controls, so you can mix and match them while keeping state in a single persistent sandbox.
@@ -55,7 +55,7 @@ from crewai_tools import DaytonaPythonTool
tool = DaytonaPythonTool()
result = tool.run(code="print(sum(range(10)))")
print(result)
# {"exit_code": 0, "result": "45\n", "artifacts": ExecutionArtifacts(stdout="45\n", charts=[])}
# {"exit_code": 0, "result": "45\n", "artifacts": None}
```
### Multi-step shell session (persistent)
@@ -63,22 +63,17 @@ print(result)
```python Code
from crewai_tools import DaytonaExecTool, DaytonaFileTool
# Create the persistent sandbox via the first tool, then attach the second
# tool to it so both share state (installed packages, files, env vars).
exec_tool = DaytonaExecTool(persistent=True)
exec_tool.run(command="pip install httpx -q")
file_tool = DaytonaFileTool(sandbox_id=exec_tool.active_sandbox_id)
file_tool = DaytonaFileTool(persistent=True)
file_tool.run(
action="write",
path="workspace/script.py",
content="import httpx; print(f'httpx loaded, version {httpx.__version__}')",
)
exec_tool.run(command="python workspace/script.py")
# Install a package, then write and run a script — all in the same sandbox
exec_tool.run(command="pip install httpx -q")
file_tool.run(action="write", path="/workspace/fetch.py", content="import httpx; print(httpx.get('https://httpbin.org/get').status_code)")
exec_tool.run(command="python /workspace/fetch.py")
```
<Note>
By default, each tool with `persistent=True` lazily creates its **own** sandbox on first use. The pattern above shares a single sandbox across multiple tools by reading the first tool's `active_sandbox_id` after a `.run()` call and passing it to the others via `sandbox_id=...`. With `persistent=False` (the default), every `.run()` call gets a fresh sandbox that's deleted at the end of that call.
Each tool instance maintains its own persistent sandbox. To share **one** sandbox across two tools, create the first tool, grab its sandbox id via `tool._persistent_sandbox.id`, and pass it to the second tool via `sandbox_id=...`.
</Note>
### Attach to an existing sandbox
@@ -87,7 +82,7 @@ By default, each tool with `persistent=True` lazily creates its **own** sandbox
from crewai_tools import DaytonaExecTool
tool = DaytonaExecTool(sandbox_id="my-long-lived-sandbox")
result = tool.run(command="ls workspace")
result = tool.run(command="ls /workspace")
```
### Custom sandbox parameters
@@ -107,41 +102,6 @@ tool = DaytonaExecTool(
)
```
### Searching, moving, and modifying files
```python Code
from crewai_tools import DaytonaFileTool
file_tool = DaytonaFileTool(persistent=True)
# Find every TODO in the source tree (grep file contents recursively)
file_tool.run(action="find", path="workspace/src", pattern="TODO:")
# Find all Python files (glob match on filenames)
file_tool.run(action="search", path="workspace", pattern="*.py")
# Make a script executable
file_tool.run(action="chmod", path="workspace/run.sh", mode="755")
# Rename or move a file
file_tool.run(
action="move",
path="workspace/draft.md",
destination="workspace/final.md",
)
# Bulk find-and-replace across multiple files
file_tool.run(
action="replace",
paths=["workspace/src/a.py", "workspace/src/b.py"],
pattern="old_function",
replacement="new_function",
)
# Quick existence check before a destructive op
file_tool.run(action="exists", path="workspace/cache.db")
```
### Agent integration
```python Code
@@ -161,7 +121,7 @@ coder = Agent(
)
task = Task(
description="Write a Python script that prints the first 10 Fibonacci numbers, save it to workspace/fib.py, and run it.",
description="Write a Python script that prints the first 10 Fibonacci numbers, save it to /workspace/fib.py, and run it.",
expected_output="The first 10 Fibonacci numbers printed to stdout.",
agent=coder,
)
@@ -208,22 +168,12 @@ All three tools accept these parameters at initialization:
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `action` | `str` | ✓ | One of: `read`, `write`, `append`, `list`, `delete`, `mkdir`, `info`, `exists`, `move`, `find`, `search`, `chmod`, `replace`. |
| `path` | `str \| None` | ✓ for all actions except `replace` | Absolute path inside the sandbox. |
| `content` | `str \| None` | ✓ for `append` | Content to write or append. |
| `action` | `str` | ✓ | One of: `read`, `write`, `append`, `list`, `delete`, `mkdir`, `info`. |
| `path` | `str` | ✓ | Absolute path inside the sandbox. |
| `content` | `str \| None` | | Content to write or append. Required for `append`. |
| `binary` | `bool` | | If `True`, `content` is base64 on write; returns base64 on read. |
| `recursive` | `bool` | | For `delete`: remove directories recursively. |
| `mode` | `str \| None` | | For `mkdir`: octal permissions for the new directory (defaults to `"0755"`). For `chmod`: octal permissions to apply to the target. |
| `destination` | `str \| None` | ✓ for `move` | Destination path for `move`. |
| `pattern` | `str \| None` | ✓ for `find`, `search`, `replace` | For `find`: substring matched against file CONTENTS. For `search`: glob matched against file NAMES (e.g. `*.py`). For `replace`: text to replace inside files. |
| `replacement` | `str \| None` | ✓ for `replace` | Replacement text for `pattern`. |
| `paths` | `list[str] \| None` | ✓ for `replace` | List of file paths in which to replace text. |
| `owner` | `str \| None` | | For `chmod`: new file owner. |
| `group` | `str \| None` | | For `chmod`: new file group. |
<Note>
For `chmod`, pass at least one of `mode`, `owner`, or `group` — any field left as `None` is left unchanged on the target.
</Note>
| `mode` | `str` | | For `mkdir`: octal permission string (default `"0755"`). |
<Tip>
For files larger than a few KB, create the file first with `action="write"` and empty content, then send the body via multiple `action="append"` calls of ~4 KB each to stay within tool-call payload limits.

View File

@@ -1,26 +0,0 @@
# crewai-cli
CLI for CrewAI — scaffold, run, deploy and manage AI agent crews without
installing the full framework.
## Installation
```bash
pip install crewai-cli
```
This pulls in `crewai-core` (shared utilities) but not the `crewai` framework
itself, so commands that don't need a crew loaded — `crewai version`,
`crewai login`, `crewai org list`, `crewai config *`, `crewai traces *`,
`crewai create`, `crewai template *` — work standalone.
Commands that load a user's crew or flow (`crewai run`, `crewai train`,
`crewai test`, `crewai chat`, `crewai replay`, `crewai reset-memories`,
`crewai deploy push`, `crewai tool publish`) require `crewai` to be installed
in the project's environment. They print a clear error if it is missing.
To install both at once:
```bash
pip install crewai[cli]
```

View File

@@ -1,45 +0,0 @@
[project]
name = "crewai-cli"
dynamic = ["version"]
description = "CLI for CrewAI — scaffold, run, deploy and manage AI agent crews."
readme = "README.md"
authors = [
{ name = "Joao Moura", email = "joao@crewai.com" }
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.5a5",
"click~=8.1.7",
"pydantic>=2.11.9,<2.13",
"pydantic-settings~=2.10.1",
"appdirs~=1.4.4",
"cryptography>=42.0",
"httpx~=0.28.1",
"pyjwt>=2.9.0,<3",
"rich>=13.7.1",
"tomli~=2.0.2",
"tomli-w~=1.1.0",
"packaging>=23.0",
"python-dotenv>=1.2.2,<2",
"uv~=0.11.6",
"textual>=7.5.0",
"certifi",
]
[project.urls]
Homepage = "https://crewai.com"
Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.scripts]
crewai = "crewai_cli.cli:crewai"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.version]
path = "src/crewai_cli/__init__.py"
[tool.hatch.build.targets.wheel]
packages = ["src/crewai_cli"]

View File

@@ -1 +0,0 @@
__version__ = "1.14.5a5"

View File

@@ -1,8 +0,0 @@
"""CLI authentication entry point."""
from __future__ import annotations
from crewai_cli.authentication.main import AuthenticationCommand
__all__ = ["AuthenticationCommand"]

View File

@@ -1,8 +0,0 @@
"""Re-export of authentication constants from ``crewai_core.auth.constants``."""
from __future__ import annotations
from crewai_core.auth.constants import ALGORITHMS as ALGORITHMS
__all__ = ["ALGORITHMS"]

View File

@@ -1,60 +0,0 @@
"""CLI-side authentication wiring.
Re-exports the OAuth2 primitives from ``crewai_core.auth`` and overrides the
``_post_login`` hook to also log into the tool repository.
"""
from __future__ import annotations
from crewai_core.auth.oauth2 import (
AuthenticationCommand as _BaseAuthenticationCommand,
Oauth2Settings as Oauth2Settings,
ProviderFactory as ProviderFactory,
console,
)
from crewai_core.settings import Settings
__all__ = ["AuthenticationCommand", "Oauth2Settings", "ProviderFactory"]
class AuthenticationCommand(_BaseAuthenticationCommand):
"""CLI-side login that also signs the user into the tool repository."""
def _post_login(self) -> None:
self._login_to_tool_repository()
def _login_to_tool_repository(self) -> None:
from crewai_cli.tools.main import ToolCommand
try:
console.print(
"Now logging you in to the Tool Repository... ",
style="bold blue",
end="",
)
ToolCommand().login()
console.print(
"Success!\n",
style="bold green",
)
settings = Settings()
console.print(
f"You are now authenticated to the tool repository for organization [bold cyan]'{settings.org_name if settings.org_name else settings.org_uuid}'[/bold cyan]",
style="green",
)
except (Exception, SystemExit):
console.print(
"\n[bold yellow]Warning:[/bold yellow] Authentication with the Tool Repository failed.",
style="yellow",
)
console.print(
"Other features will work normally, but you may experience limitations "
"with downloading and publishing tools."
"\nRun [bold]crewai login[/bold] to try logging in again.\n",
style="yellow",
)

View File

@@ -1 +0,0 @@
"""OAuth2 authentication providers — re-exported from ``crewai_core.auth.providers``."""

View File

@@ -1,8 +0,0 @@
"""Re-export of ``Auth0Provider`` from ``crewai_core.auth.providers.auth0``."""
from __future__ import annotations
from crewai_core.auth.providers.auth0 import Auth0Provider as Auth0Provider
__all__ = ["Auth0Provider"]

View File

@@ -1,8 +0,0 @@
"""Re-export of ``BaseProvider`` from ``crewai_core.auth.providers.base_provider``."""
from __future__ import annotations
from crewai_core.auth.providers.base_provider import BaseProvider as BaseProvider
__all__ = ["BaseProvider"]

View File

@@ -1,8 +0,0 @@
"""Re-export of ``EntraIdProvider`` from ``crewai_core.auth.providers.entra_id``."""
from __future__ import annotations
from crewai_core.auth.providers.entra_id import EntraIdProvider as EntraIdProvider
__all__ = ["EntraIdProvider"]

View File

@@ -1,8 +0,0 @@
"""Re-export of ``KeycloakProvider`` from ``crewai_core.auth.providers.keycloak``."""
from __future__ import annotations
from crewai_core.auth.providers.keycloak import KeycloakProvider as KeycloakProvider
__all__ = ["KeycloakProvider"]

View File

@@ -1,8 +0,0 @@
"""Re-export of ``OktaProvider`` from ``crewai_core.auth.providers.okta``."""
from __future__ import annotations
from crewai_core.auth.providers.okta import OktaProvider as OktaProvider
__all__ = ["OktaProvider"]

View File

@@ -1,8 +0,0 @@
"""Re-export of ``WorkosProvider`` from ``crewai_core.auth.providers.workos``."""
from __future__ import annotations
from crewai_core.auth.providers.workos import WorkosProvider as WorkosProvider
__all__ = ["WorkosProvider"]

View File

@@ -1,11 +0,0 @@
"""Re-exports of authentication token helpers from ``crewai_core.auth.token``."""
from __future__ import annotations
from crewai_core.auth.token import (
AuthError as AuthError,
get_auth_token as get_auth_token,
)
__all__ = ["AuthError", "get_auth_token"]

View File

@@ -1,8 +0,0 @@
"""Re-export of ``validate_jwt_token`` from ``crewai_core.auth.utils``."""
from __future__ import annotations
from crewai_core.auth.utils import validate_jwt_token as validate_jwt_token
__all__ = ["validate_jwt_token"]

View File

@@ -1,30 +0,0 @@
"""Re-exports of shared settings from ``crewai_core.settings``.
Kept as a stable import path for the CLI; new code should import from
``crewai_core.settings`` directly.
"""
from __future__ import annotations
from crewai_core.settings import (
CLI_SETTINGS_KEYS as CLI_SETTINGS_KEYS,
DEFAULT_CLI_SETTINGS as DEFAULT_CLI_SETTINGS,
DEFAULT_CONFIG_PATH as DEFAULT_CONFIG_PATH,
HIDDEN_SETTINGS_KEYS as HIDDEN_SETTINGS_KEYS,
READONLY_SETTINGS_KEYS as READONLY_SETTINGS_KEYS,
USER_SETTINGS_KEYS as USER_SETTINGS_KEYS,
Settings as Settings,
get_writable_config_path as get_writable_config_path,
)
__all__ = [
"CLI_SETTINGS_KEYS",
"DEFAULT_CLI_SETTINGS",
"DEFAULT_CONFIG_PATH",
"HIDDEN_SETTINGS_KEYS",
"READONLY_SETTINGS_KEYS",
"USER_SETTINGS_KEYS",
"Settings",
"get_writable_config_path",
]

View File

@@ -1,23 +0,0 @@
"""Wrapper for the crew chat command.
Delegates to ``crewai.utilities.crew_chat.run_chat`` when the full crewai
package is installed, otherwise prints a helpful error message.
"""
from __future__ import annotations
import click
def run_chat() -> None:
try:
from crewai.utilities.crew_chat import run_chat as _run_chat
except ImportError:
click.secho(
"The 'chat' command requires the full crewai package.\n"
"Install it with: pip install crewai",
fg="red",
)
raise SystemExit(1) from None
_run_chat()

View File

@@ -1,12 +0,0 @@
"""Re-export of ``crewai_core.plus_api.PlusAPI``.
Kept as a stable import path for the CLI; new code should import from
``crewai_core.plus_api`` directly.
"""
from __future__ import annotations
from crewai_core.plus_api import PlusAPI as PlusAPI
__all__ = ["PlusAPI"]

View File

@@ -1,31 +0,0 @@
"""Wrapper for the reset-memories command.
Delegates to ``crewai.utilities.reset_memories`` when the full crewai
package is installed, otherwise prints a helpful error message.
"""
from __future__ import annotations
import click
def reset_memories_command(
memory: bool,
knowledge: bool,
agent_knowledge: bool,
kickoff_outputs: bool,
all: bool,
) -> None:
try:
from crewai.utilities.reset_memories import (
reset_memories_command as _reset,
)
except ImportError:
click.secho(
"The 'reset-memories' command requires the full crewai package.\n"
"Install it with: pip install crewai",
fg="red",
)
raise SystemExit(1) from None
_reset(memory, knowledge, agent_knowledge, kickoff_outputs, all)

View File

@@ -1,12 +0,0 @@
"""Re-export of ``crewai_core.token_manager.TokenManager``.
Kept as a stable import path for the CLI; new code should import from
``crewai_core.token_manager`` directly.
"""
from __future__ import annotations
from crewai_core.token_manager import TokenManager as TokenManager
__all__ = ["TokenManager"]

View File

@@ -1,67 +0,0 @@
"""Lightweight SQLite reader for kickoff task outputs.
Only used by the ``crewai log-tasks-outputs`` CLI command. Depends solely on
the standard library + *appdirs* so crewai-cli can read stored outputs without
importing the full crewai framework.
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
import sqlite3
from typing import Any
from crewai_cli.user_data import _db_storage_path
logger = logging.getLogger(__name__)
def load_task_outputs(db_path: str | None = None) -> list[dict[str, Any]]:
"""Return all rows from the kickoff task outputs database."""
if db_path is None:
db_path = str(Path(_db_storage_path()) / "latest_kickoff_task_outputs.db")
if not Path(db_path).exists():
return []
try:
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT task_id, expected_output, output, task_index,
inputs, was_replayed, timestamp
FROM latest_kickoff_task_outputs
ORDER BY task_index
""")
rows = cursor.fetchall()
except sqlite3.Error as e:
logger.error("Failed to load task outputs: %s", e)
return []
return [
{
"task_id": row["task_id"],
"expected_output": row["expected_output"],
"output": _safe_json_loads(row["output"]),
"task_index": row["task_index"],
"inputs": _safe_json_loads(row["inputs"]),
"was_replayed": row["was_replayed"],
"timestamp": row["timestamp"],
}
for row in rows
]
def _safe_json_loads(value: str | None) -> Any:
"""Decode a JSON column tolerantly: NULL/blank/corrupt → None."""
if not value:
return None
try:
return json.loads(value)
except (json.JSONDecodeError, TypeError) as e:
logger.warning("Failed to decode JSON column: %s", e)
return None

View File

@@ -1,22 +0,0 @@
"""User-data helpers — re-exported from ``crewai_core.user_data``."""
from __future__ import annotations
from crewai_core.paths import db_storage_path as _db_storage_path
from crewai_core.user_data import (
_load_user_data as _load_user_data,
_save_user_data as _save_user_data,
has_user_declined_tracing as has_user_declined_tracing,
is_tracing_enabled as is_tracing_enabled,
update_user_data as update_user_data,
)
__all__ = [
"_db_storage_path",
"_load_user_data",
"_save_user_data",
"has_user_declined_tracing",
"is_tracing_enabled",
"update_user_data",
]

View File

@@ -1,137 +0,0 @@
from __future__ import annotations
import os
from pathlib import Path
import shutil
from typing import Any
import click
from crewai_core.project import (
get_project_description as get_project_description,
get_project_name as get_project_name,
get_project_version as get_project_version,
parse_toml as parse_toml,
read_toml as read_toml,
)
from crewai_core.tool_credentials import (
build_env_with_all_tool_credentials as build_env_with_all_tool_credentials,
build_env_with_tool_repository_credentials as build_env_with_tool_repository_credentials,
)
from rich.console import Console
__all__ = [
"build_env_with_all_tool_credentials",
"build_env_with_tool_repository_credentials",
"copy_template",
"fetch_and_json_env_file",
"get_project_description",
"get_project_name",
"get_project_version",
"load_env_vars",
"parse_toml",
"read_toml",
"tree_copy",
"tree_find_and_replace",
"write_env_file",
]
console = Console()
def copy_template(
src: Path, dst: Path, name: str, class_name: str, folder_name: str
) -> None:
"""Copy a file from src to dst."""
with open(src, "r") as file:
content = file.read()
content = content.replace("{{name}}", name)
content = content.replace("{{crew_name}}", class_name)
content = content.replace("{{folder_name}}", folder_name)
with open(dst, "w") as file:
file.write(content)
click.secho(f" - Created {dst}", fg="green")
def fetch_and_json_env_file(env_file_path: str = ".env") -> dict[str, Any]:
"""Fetch the environment variables from a .env file and return them as a dictionary."""
try:
with open(env_file_path, "r") as f:
env_content = f.read()
env_dict = {}
for line in env_content.splitlines():
if line.strip() and not line.strip().startswith("#"):
key, value = line.split("=", 1)
env_dict[key.strip()] = value.strip()
return env_dict
except FileNotFoundError:
console.print(f"Error: {env_file_path} not found.", style="bold red")
except Exception as e:
console.print(f"Error reading the .env file: {e}", style="bold red")
return {}
def tree_copy(source: Path, destination: Path) -> None:
"""Copies the entire directory structure from the source to the destination."""
for item in os.listdir(source):
source_item = os.path.join(source, item)
destination_item = os.path.join(destination, item)
if os.path.isdir(source_item):
shutil.copytree(source_item, destination_item)
else:
shutil.copy2(source_item, destination_item)
def tree_find_and_replace(directory: Path, find: str, replace: str) -> None:
"""Recursively searches through a directory, replacing a target string in
both file contents and filenames with a specified replacement string.
"""
for path, dirs, files in os.walk(os.path.abspath(directory), topdown=False):
for filename in files:
filepath = os.path.join(path, filename)
with open(filepath, "r", encoding="utf-8", errors="ignore") as file:
contents = file.read()
with open(filepath, "w") as file:
file.write(contents.replace(find, replace))
if find in filename:
new_filename = filename.replace(find, replace)
new_filepath = os.path.join(path, new_filename)
os.rename(filepath, new_filepath)
for dirname in dirs:
if find in dirname:
new_dirname = dirname.replace(find, replace)
new_dirpath = os.path.join(path, new_dirname)
old_dirpath = os.path.join(path, dirname)
os.rename(old_dirpath, new_dirpath)
def load_env_vars(folder_path: Path) -> dict[str, Any]:
"""Loads environment variables from a .env file in the specified folder path."""
env_file_path = folder_path / ".env"
env_vars = {}
if env_file_path.exists():
with open(env_file_path, "r") as file:
for line in file:
key, _, value = line.strip().partition("=")
if key and value:
env_vars[key] = value
return env_vars
def write_env_file(folder_path: Path, env_vars: dict[str, Any]) -> None:
"""Writes environment variables to a .env file in the specified folder."""
env_file_path = folder_path / ".env"
with open(env_file_path, "w") as file:
for key, value in env_vars.items():
file.write(f"{key.upper()}={value}\n")

View File

@@ -1,24 +0,0 @@
"""Re-exports of version utilities from ``crewai_core.version``.
Kept as a stable import path for the CLI; new code should import from
``crewai_core.version`` directly.
"""
from __future__ import annotations
from crewai_core.version import (
check_version as check_version,
get_crewai_version as get_crewai_version,
get_latest_version_from_pypi as get_latest_version_from_pypi,
is_current_version_yanked as is_current_version_yanked,
is_newer_version_available as is_newer_version_available,
)
__all__ = [
"check_version",
"get_crewai_version",
"get_latest_version_from_pypi",
"is_current_version_yanked",
"is_newer_version_available",
]

View File

@@ -1,91 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.auth0 import Auth0Provider
class TestAuth0Provider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="auth0",
domain="test-domain.auth0.com",
client_id="test-client-id",
audience="test-audience"
)
self.provider = Auth0Provider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = Auth0Provider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "auth0"
assert provider.settings.domain == "test-domain.auth0.com"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
def test_get_authorize_url(self):
expected_url = "https://test-domain.auth0.com/oauth/device/code"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
settings = Oauth2Settings(
provider="auth0",
domain="my-company.auth0.com",
client_id="test-client",
audience="test-audience"
)
provider = Auth0Provider(settings)
expected_url = "https://my-company.auth0.com/oauth/device/code"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://test-domain.auth0.com/oauth/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
settings = Oauth2Settings(
provider="auth0",
domain="another-domain.auth0.com",
client_id="test-client",
audience="test-audience"
)
provider = Auth0Provider(settings)
expected_url = "https://another-domain.auth0.com/oauth/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://test-domain.auth0.com/.well-known/jwks.json"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
settings = Oauth2Settings(
provider="auth0",
domain="dev.auth0.com",
client_id="test-client",
audience="test-audience"
)
provider = Auth0Provider(settings)
expected_url = "https://dev.auth0.com/.well-known/jwks.json"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://test-domain.auth0.com/"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
settings = Oauth2Settings(
provider="auth0",
domain="prod.auth0.com",
client_id="test-client",
audience="test-audience"
)
provider = Auth0Provider(settings)
expected_issuer = "https://prod.auth0.com/"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"

View File

@@ -1,141 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.entra_id import EntraIdProvider
class TestEntraIdProvider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="entra_id",
domain="tenant-id-abcdef123456",
client_id="test-client-id",
audience="test-audience",
extra={
"scope": "openid profile email api://crewai-cli-dev/read"
}
)
self.provider = EntraIdProvider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = EntraIdProvider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "entra_id"
assert provider.settings.domain == "tenant-id-abcdef123456"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
def test_get_authorize_url(self):
expected_url = "https://login.microsoftonline.com/tenant-id-abcdef123456/oauth2/v2.0/devicecode"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
# For EntraID, the domain is the tenant ID.
settings = Oauth2Settings(
provider="entra_id",
domain="my-company.entra.id",
client_id="test-client",
audience="test-audience",
)
provider = EntraIdProvider(settings)
expected_url = "https://login.microsoftonline.com/my-company.entra.id/oauth2/v2.0/devicecode"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://login.microsoftonline.com/tenant-id-abcdef123456/oauth2/v2.0/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
# For EntraID, the domain is the tenant ID.
settings = Oauth2Settings(
provider="entra_id",
domain="another-domain.entra.id",
client_id="test-client",
audience="test-audience",
)
provider = EntraIdProvider(settings)
expected_url = "https://login.microsoftonline.com/another-domain.entra.id/oauth2/v2.0/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://login.microsoftonline.com/tenant-id-abcdef123456/discovery/v2.0/keys"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
# For EntraID, the domain is the tenant ID.
settings = Oauth2Settings(
provider="entra_id",
domain="dev.entra.id",
client_id="test-client",
audience="test-audience",
)
provider = EntraIdProvider(settings)
expected_url = "https://login.microsoftonline.com/dev.entra.id/discovery/v2.0/keys"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://login.microsoftonline.com/tenant-id-abcdef123456/v2.0"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
# For EntraID, the domain is the tenant ID.
settings = Oauth2Settings(
provider="entra_id",
domain="other-tenant-id-xpto",
client_id="test-client",
audience="test-audience",
)
provider = EntraIdProvider(settings)
expected_issuer = "https://login.microsoftonline.com/other-tenant-id-xpto/v2.0"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_audience_assertion_error_when_none(self):
settings = Oauth2Settings(
provider="entra_id",
domain="test-tenant-id",
client_id="test-client-id",
audience=None,
)
provider = EntraIdProvider(settings)
with pytest.raises(ValueError, match="Audience is required"):
provider.get_audience()
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"
def test_get_required_fields(self):
assert set(self.provider.get_required_fields()) == set(["scope"])
def test_get_oauth_scopes(self):
settings = Oauth2Settings(
provider="entra_id",
domain="tenant-id-abcdef123456",
client_id="test-client-id",
audience="test-audience",
extra={
"scope": "api://crewai-cli-dev/read"
}
)
provider = EntraIdProvider(settings)
assert provider.get_oauth_scopes() == ["openid", "profile", "email", "api://crewai-cli-dev/read"]
def test_get_oauth_scopes_with_multiple_custom_scopes(self):
settings = Oauth2Settings(
provider="entra_id",
domain="tenant-id-abcdef123456",
client_id="test-client-id",
audience="test-audience",
extra={
"scope": "api://crewai-cli-dev/read api://crewai-cli-dev/write custom-scope1 custom-scope2"
}
)
provider = EntraIdProvider(settings)
assert provider.get_oauth_scopes() == ["openid", "profile", "email", "api://crewai-cli-dev/read", "api://crewai-cli-dev/write", "custom-scope1", "custom-scope2"]
def test_base_url(self):
assert self.provider._base_url() == "https://login.microsoftonline.com/tenant-id-abcdef123456"

View File

@@ -1,138 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.keycloak import KeycloakProvider
class TestKeycloakProvider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="keycloak",
domain="keycloak.example.com",
client_id="test-client-id",
audience="test-audience",
extra={
"realm": "test-realm"
}
)
self.provider = KeycloakProvider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = KeycloakProvider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "keycloak"
assert provider.settings.domain == "keycloak.example.com"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
assert provider.settings.extra.get("realm") == "test-realm"
def test_get_authorize_url(self):
expected_url = "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/auth/device"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
settings = Oauth2Settings(
provider="keycloak",
domain="auth.company.com",
client_id="test-client",
audience="test-audience",
extra={
"realm": "my-realm"
}
)
provider = KeycloakProvider(settings)
expected_url = "https://auth.company.com/realms/my-realm/protocol/openid-connect/auth/device"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
settings = Oauth2Settings(
provider="keycloak",
domain="sso.enterprise.com",
client_id="test-client",
audience="test-audience",
extra={
"realm": "enterprise-realm"
}
)
provider = KeycloakProvider(settings)
expected_url = "https://sso.enterprise.com/realms/enterprise-realm/protocol/openid-connect/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/certs"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
settings = Oauth2Settings(
provider="keycloak",
domain="identity.org",
client_id="test-client",
audience="test-audience",
extra={
"realm": "org-realm"
}
)
provider = KeycloakProvider(settings)
expected_url = "https://identity.org/realms/org-realm/protocol/openid-connect/certs"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://keycloak.example.com/realms/test-realm"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
settings = Oauth2Settings(
provider="keycloak",
domain="login.myapp.io",
client_id="test-client",
audience="test-audience",
extra={
"realm": "app-realm"
}
)
provider = KeycloakProvider(settings)
expected_issuer = "https://login.myapp.io/realms/app-realm"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"
def test_get_required_fields(self):
assert self.provider.get_required_fields() == ["realm"]
def test_oauth2_base_url(self):
assert self.provider._oauth2_base_url() == "https://keycloak.example.com"
def test_oauth2_base_url_strips_https_prefix(self):
settings = Oauth2Settings(
provider="keycloak",
domain="https://keycloak.example.com",
client_id="test-client-id",
audience="test-audience",
extra={
"realm": "test-realm"
}
)
provider = KeycloakProvider(settings)
assert provider._oauth2_base_url() == "https://keycloak.example.com"
def test_oauth2_base_url_strips_http_prefix(self):
settings = Oauth2Settings(
provider="keycloak",
domain="http://keycloak.example.com",
client_id="test-client-id",
audience="test-audience",
extra={
"realm": "test-realm"
}
)
provider = KeycloakProvider(settings)
assert provider._oauth2_base_url() == "https://keycloak.example.com"

View File

@@ -1,257 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.okta import OktaProvider
class TestOktaProvider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience="test-audience",
)
self.provider = OktaProvider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = OktaProvider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "okta"
assert provider.settings.domain == "test-domain.okta.com"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
def test_get_authorize_url(self):
expected_url = "https://test-domain.okta.com/oauth2/default/v1/device/authorize"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
settings = Oauth2Settings(
provider="okta",
domain="my-company.okta.com",
client_id="test-client",
audience="test-audience",
)
provider = OktaProvider(settings)
expected_url = "https://my-company.okta.com/oauth2/default/v1/device/authorize"
assert provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777/v1/device/authorize"
assert provider.get_authorize_url() == expected_url
def test_get_authorize_url_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/v1/device/authorize"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://test-domain.okta.com/oauth2/default/v1/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
settings = Oauth2Settings(
provider="okta",
domain="another-domain.okta.com",
client_id="test-client",
audience="test-audience",
)
provider = OktaProvider(settings)
expected_url = "https://another-domain.okta.com/oauth2/default/v1/token"
assert provider.get_token_url() == expected_url
def test_get_token_url_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777/v1/token"
assert provider.get_token_url() == expected_url
def test_get_token_url_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/v1/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://test-domain.okta.com/oauth2/default/v1/keys"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
settings = Oauth2Settings(
provider="okta",
domain="dev.okta.com",
client_id="test-client",
audience="test-audience",
)
provider = OktaProvider(settings)
expected_url = "https://dev.okta.com/oauth2/default/v1/keys"
assert provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777/v1/keys"
assert provider.get_jwks_url() == expected_url
def test_get_jwks_url_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/v1/keys"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://test-domain.okta.com/oauth2/default"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
settings = Oauth2Settings(
provider="okta",
domain="prod.okta.com",
client_id="test-client",
audience="test-audience",
)
provider = OktaProvider(settings)
expected_issuer = "https://prod.okta.com/oauth2/default"
assert provider.get_issuer() == expected_issuer
def test_get_issuer_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
expected_issuer = "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777"
assert provider.get_issuer() == expected_issuer
def test_get_issuer_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
expected_issuer = "https://test-domain.okta.com"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_audience_assertion_error_when_none(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
)
provider = OktaProvider(settings)
with pytest.raises(ValueError, match="Audience is required"):
provider.get_audience()
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"
def test_get_required_fields(self):
assert set(self.provider.get_required_fields()) == set(["authorization_server_name", "using_org_auth_server"])
def test_oauth2_base_url(self):
assert self.provider._oauth2_base_url() == "https://test-domain.okta.com/oauth2/default"
def test_oauth2_base_url_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
assert provider._oauth2_base_url() == "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777"
def test_oauth2_base_url_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
assert provider._oauth2_base_url() == "https://test-domain.okta.com/oauth2"

View File

@@ -1,100 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.workos import WorkosProvider
class TestWorkosProvider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="workos",
domain="login.company.com",
client_id="test-client-id",
audience="test-audience"
)
self.provider = WorkosProvider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = WorkosProvider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "workos"
assert provider.settings.domain == "login.company.com"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
def test_get_authorize_url(self):
expected_url = "https://login.company.com/oauth2/device_authorization"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
settings = Oauth2Settings(
provider="workos",
domain="login.example.com",
client_id="test-client",
audience="test-audience"
)
provider = WorkosProvider(settings)
expected_url = "https://login.example.com/oauth2/device_authorization"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://login.company.com/oauth2/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
settings = Oauth2Settings(
provider="workos",
domain="api.workos.com",
client_id="test-client",
audience="test-audience"
)
provider = WorkosProvider(settings)
expected_url = "https://api.workos.com/oauth2/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://login.company.com/oauth2/jwks"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
settings = Oauth2Settings(
provider="workos",
domain="auth.enterprise.com",
client_id="test-client",
audience="test-audience"
)
provider = WorkosProvider(settings)
expected_url = "https://auth.enterprise.com/oauth2/jwks"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://login.company.com"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
settings = Oauth2Settings(
provider="workos",
domain="sso.company.com",
client_id="test-client",
audience="test-audience"
)
provider = WorkosProvider(settings)
expected_issuer = "https://sso.company.com"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_audience_fallback_to_default(self):
settings = Oauth2Settings(
provider="workos",
domain="login.company.com",
client_id="test-client-id",
audience=None
)
provider = WorkosProvider(settings)
assert provider.get_audience() == ""
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"

View File

@@ -1,107 +0,0 @@
import unittest
from unittest.mock import MagicMock, patch
import jwt
from crewai_cli.authentication.utils import validate_jwt_token
@patch("crewai_core.auth.utils.PyJWKClient", return_value=MagicMock())
@patch("crewai_core.auth.utils.jwt")
class TestUtils(unittest.TestCase):
def test_validate_jwt_token(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.return_value = {"exp": 1719859200}
# Create signing key object mock with a .key attribute
mock_pyjwkclient.return_value.get_signing_key_from_jwt.return_value = MagicMock(
key="mock_signing_key"
)
jwt_token = "aaaaa.bbbbbb.cccccc" # noqa: S105
decoded_token = validate_jwt_token(
jwt_token=jwt_token,
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
mock_jwt.decode.assert_called_with(
jwt_token,
"mock_signing_key",
algorithms=["RS256"],
audience="app_id_xxxx",
issuer="https://mock_issuer",
leeway=10.0,
options={
"verify_signature": True,
"verify_exp": True,
"verify_nbf": True,
"verify_iat": True,
"require": ["exp", "iat", "iss", "aud", "sub"],
},
)
mock_pyjwkclient.assert_called_once_with("https://mock_jwks_url")
self.assertEqual(decoded_token, {"exp": 1719859200})
def test_validate_jwt_token_expired(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.ExpiredSignatureError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_invalid_audience(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.InvalidAudienceError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_invalid_issuer(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.InvalidIssuerError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_missing_required_claims(
self, mock_jwt, mock_pyjwkclient
):
mock_jwt.decode.side_effect = jwt.MissingRequiredClaimError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_jwks_error(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.exceptions.PyJWKClientError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_invalid_token(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.InvalidTokenError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)

View File

@@ -1,255 +0,0 @@
from pathlib import Path
from unittest import mock
import pytest
from click.testing import CliRunner
from crewai_cli.cli import (
deploy_create,
deploy_list,
deploy_logs,
deploy_push,
deploy_remove,
deply_status,
flow_add_crew,
login,
reset_memories,
test,
train,
version,
)
@pytest.fixture
def runner():
return CliRunner()
@mock.patch("crewai_cli.cli.train_crew")
def test_train_default_iterations(train_crew, runner):
result = runner.invoke(train)
train_crew.assert_called_once_with(5, "trained_agents_data.pkl")
assert result.exit_code == 0
assert "Training the Crew for 5 iterations" in result.output
@mock.patch("crewai_cli.cli.train_crew")
def test_train_custom_iterations(train_crew, runner):
result = runner.invoke(train, ["--n_iterations", "10"])
train_crew.assert_called_once_with(10, "trained_agents_data.pkl")
assert result.exit_code == 0
assert "Training the Crew for 10 iterations" in result.output
@mock.patch("crewai_cli.cli.train_crew")
def test_train_invalid_string_iterations(train_crew, runner):
result = runner.invoke(train, ["--n_iterations", "invalid"])
train_crew.assert_not_called()
assert result.exit_code == 2
assert (
"Usage: train [OPTIONS]\nTry 'train --help' for help.\n\nError: Invalid value for '-n' / '--n_iterations': 'invalid' is not a valid integer.\n"
in result.output
)
def test_reset_no_memory_flags(runner):
result = runner.invoke(
reset_memories,
)
assert (
result.output
== "Please specify at least one memory type to reset using the appropriate flags.\n"
)
def test_version_flag(runner):
result = runner.invoke(version)
assert result.exit_code == 0
assert "crewai version:" in result.output
def test_version_command(runner):
result = runner.invoke(version)
assert result.exit_code == 0
assert "crewai version:" in result.output
def test_version_command_with_tools(runner):
result = runner.invoke(version, ["--tools"])
assert result.exit_code == 0
assert "crewai version:" in result.output
assert (
"crewai tools version:" in result.output
or "crewai tools not installed" in result.output
)
@mock.patch("crewai_cli.cli.evaluate_crew")
def test_test_default_iterations(evaluate_crew, runner):
result = runner.invoke(test)
evaluate_crew.assert_called_once_with(3, "gpt-4o-mini", trained_agents_file=None)
assert result.exit_code == 0
assert "Testing the crew for 3 iterations with model gpt-4o-mini" in result.output
@mock.patch("crewai_cli.cli.evaluate_crew")
def test_test_custom_iterations(evaluate_crew, runner):
result = runner.invoke(test, ["--n_iterations", "5", "--model", "gpt-4o"])
evaluate_crew.assert_called_once_with(5, "gpt-4o", trained_agents_file=None)
assert result.exit_code == 0
assert "Testing the crew for 5 iterations with model gpt-4o" in result.output
@mock.patch("crewai_cli.cli.evaluate_crew")
def test_test_invalid_string_iterations(evaluate_crew, runner):
result = runner.invoke(test, ["--n_iterations", "invalid"])
evaluate_crew.assert_not_called()
assert result.exit_code == 2
assert (
"Usage: test [OPTIONS]\nTry 'test --help' for help.\n\nError: Invalid value for '-n' / '--n_iterations': 'invalid' is not a valid integer.\n"
in result.output
)
@mock.patch("crewai_cli.cli.AuthenticationCommand")
def test_login(command, runner):
mock_auth = command.return_value
result = runner.invoke(login)
assert result.exit_code == 0
mock_auth.login.assert_called_once()
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_create(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_create)
assert result.exit_code == 0
mock_deploy.create_crew.assert_called_once()
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_list(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_list)
assert result.exit_code == 0
mock_deploy.list_crews.assert_called_once()
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_push(command, runner):
mock_deploy = command.return_value
uuid = "test-uuid"
result = runner.invoke(deploy_push, ["-u", uuid])
assert result.exit_code == 0
mock_deploy.deploy.assert_called_once_with(uuid=uuid, skip_validate=False)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_push_no_uuid(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_push)
assert result.exit_code == 0
mock_deploy.deploy.assert_called_once_with(uuid=None, skip_validate=False)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_status(command, runner):
mock_deploy = command.return_value
uuid = "test-uuid"
result = runner.invoke(deply_status, ["-u", uuid])
assert result.exit_code == 0
mock_deploy.get_crew_status.assert_called_once_with(uuid=uuid)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_status_no_uuid(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deply_status)
assert result.exit_code == 0
mock_deploy.get_crew_status.assert_called_once_with(uuid=None)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_logs(command, runner):
mock_deploy = command.return_value
uuid = "test-uuid"
result = runner.invoke(deploy_logs, ["-u", uuid])
assert result.exit_code == 0
mock_deploy.get_crew_logs.assert_called_once_with(uuid=uuid)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_logs_no_uuid(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_logs)
assert result.exit_code == 0
mock_deploy.get_crew_logs.assert_called_once_with(uuid=None)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_remove(command, runner):
mock_deploy = command.return_value
uuid = "test-uuid"
result = runner.invoke(deploy_remove, ["-u", uuid])
assert result.exit_code == 0
mock_deploy.remove_crew.assert_called_once_with(uuid=uuid)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_remove_no_uuid(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_remove)
assert result.exit_code == 0
mock_deploy.remove_crew.assert_called_once_with(uuid=None)
@mock.patch("crewai_cli.add_crew_to_flow.create_embedded_crew")
@mock.patch("pathlib.Path.exists", return_value=True)
def test_flow_add_crew(mock_path_exists, mock_create_embedded_crew, runner):
crew_name = "new_crew"
result = runner.invoke(flow_add_crew, [crew_name])
assert result.exit_code == 0, f"Command failed with output: {result.output}"
assert f"Adding crew {crew_name} to the flow" in result.output
mock_create_embedded_crew.assert_called_once()
call_args, call_kwargs = mock_create_embedded_crew.call_args
assert call_args[0] == crew_name
assert "parent_folder" in call_kwargs
assert isinstance(call_kwargs["parent_folder"], Path)
def test_add_crew_to_flow_not_in_root(runner):
with mock.patch("pathlib.Path.exists", autospec=True) as mock_exists:
def exists_side_effect(self):
if self.name == "pyproject.toml":
return False
return True
mock_exists.side_effect = exists_side_effect
result = runner.invoke(flow_add_crew, ["new_crew"])
assert result.exit_code != 0
assert "This command must be run from the root of a flow project." in str(
result.output
)

View File

@@ -1,148 +0,0 @@
import json
import shutil
import tempfile
import unittest
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from crewai_cli.config import (
CLI_SETTINGS_KEYS,
DEFAULT_CLI_SETTINGS,
USER_SETTINGS_KEYS,
Settings,
)
from crewai_core.token_manager import TokenManager
class TestSettings(unittest.TestCase):
def setUp(self):
self.test_dir = Path(tempfile.mkdtemp())
self.config_path = self.test_dir / "settings.json"
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_empty_initialization(self):
settings = Settings(config_path=self.config_path)
self.assertIsNone(settings.tool_repository_username)
self.assertIsNone(settings.tool_repository_password)
def test_initialization_with_data(self):
settings = Settings(
config_path=self.config_path, tool_repository_username="user1"
)
self.assertEqual(settings.tool_repository_username, "user1")
self.assertIsNone(settings.tool_repository_password)
def test_initialization_with_existing_file(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with self.config_path.open("w") as f:
json.dump({"tool_repository_username": "file_user"}, f)
settings = Settings(config_path=self.config_path)
self.assertEqual(settings.tool_repository_username, "file_user")
def test_merge_file_and_input_data(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with self.config_path.open("w") as f:
json.dump(
{
"tool_repository_username": "file_user",
"tool_repository_password": "file_pass",
},
f,
)
settings = Settings(
config_path=self.config_path, tool_repository_username="new_user"
)
self.assertEqual(settings.tool_repository_username, "new_user")
self.assertEqual(settings.tool_repository_password, "file_pass")
def test_clear_user_settings(self):
user_settings = {key: f"value_for_{key}" for key in USER_SETTINGS_KEYS}
settings = Settings(config_path=self.config_path, **user_settings)
settings.clear_user_settings()
for key in user_settings.keys():
self.assertEqual(getattr(settings, key), None)
@patch("crewai_core.settings.TokenManager")
def test_reset_settings(self, mock_token_manager):
user_settings = {key: f"value_for_{key}" for key in USER_SETTINGS_KEYS}
cli_settings = {key: f"value_for_{key}" for key in CLI_SETTINGS_KEYS if key != "oauth2_extra"}
cli_settings["oauth2_extra"] = {"scope": "xxx", "other": "yyy"}
settings = Settings(
config_path=self.config_path, **user_settings, **cli_settings
)
mock_token_manager.return_value = MagicMock()
TokenManager().save_tokens(
"aaa.bbb.ccc", (datetime.now() + timedelta(seconds=36000)).timestamp()
)
settings.reset()
for key in user_settings.keys():
self.assertEqual(getattr(settings, key), None)
for key in cli_settings.keys():
self.assertEqual(getattr(settings, key), DEFAULT_CLI_SETTINGS.get(key))
mock_token_manager.return_value.clear_tokens.assert_called_once()
def test_dump_new_settings(self):
settings = Settings(
config_path=self.config_path, tool_repository_username="user1"
)
settings.dump()
with self.config_path.open("r") as f:
saved_data = json.load(f)
self.assertEqual(saved_data["tool_repository_username"], "user1")
def test_update_existing_settings(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with self.config_path.open("w") as f:
json.dump({"existing_setting": "value"}, f)
settings = Settings(
config_path=self.config_path, tool_repository_username="user1"
)
settings.dump()
with self.config_path.open("r") as f:
saved_data = json.load(f)
self.assertEqual(saved_data["existing_setting"], "value")
self.assertEqual(saved_data["tool_repository_username"], "user1")
def test_none_values(self):
settings = Settings(config_path=self.config_path, tool_repository_username=None)
settings.dump()
with self.config_path.open("r") as f:
saved_data = json.load(f)
self.assertIsNone(saved_data.get("tool_repository_username"))
def test_invalid_json_in_config(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with self.config_path.open("w") as f:
f.write("invalid json")
try:
settings = Settings(config_path=self.config_path)
self.assertIsNone(settings.tool_repository_username)
except json.JSONDecodeError:
self.fail("Settings initialization should handle invalid JSON")
def test_empty_config_file(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
self.config_path.touch()
settings = Settings(config_path=self.config_path)
self.assertIsNone(settings.tool_repository_username)

View File

@@ -1,20 +0,0 @@
from crewai_cli.constants import ENV_VARS, MODELS, PROVIDERS
def test_huggingface_in_providers():
"""Test that Huggingface is in the PROVIDERS list."""
assert "huggingface" in PROVIDERS
def test_huggingface_env_vars():
"""Test that Huggingface environment variables are properly configured."""
assert "huggingface" in ENV_VARS
assert any(
detail.get("key_name") == "HF_TOKEN" for detail in ENV_VARS["huggingface"]
)
def test_huggingface_models():
"""Test that Huggingface models are properly configured."""
assert "huggingface" in MODELS
assert len(MODELS["huggingface"]) > 0

View File

@@ -1,359 +0,0 @@
import os
import unittest
from unittest.mock import ANY, AsyncMock, MagicMock, patch
import pytest
from crewai_cli.plus_api import PlusAPI
class TestPlusAPI(unittest.TestCase):
def setUp(self):
self.api_key = "test_api_key"
self.api = PlusAPI(self.api_key)
self.org_uuid = "test-org-uuid"
def test_init(self):
self.assertEqual(self.api.api_key, self.api_key)
self.assertEqual(self.api.headers["Authorization"], f"Bearer {self.api_key}")
self.assertEqual(self.api.headers["Content-Type"], "application/json")
self.assertIn("CrewAI-CLI/", self.api.headers["User-Agent"])
self.assertTrue(self.api.headers["X-Crewai-Version"])
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_login_to_tool_repository(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
response = self.api.login_to_tool_repository()
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools/login", json={}
)
self.assertEqual(response, mock_response)
def assert_request_with_org_id(
self, mock_client_instance, method: str, endpoint: str, **kwargs
):
mock_client_instance.request.assert_called_once_with(
method,
f"{os.getenv('CREWAI_PLUS_URL')}{endpoint}",
headers={
"Authorization": ANY,
"Content-Type": ANY,
"User-Agent": ANY,
"X-Crewai-Version": ANY,
"X-Crewai-Organization-Id": self.org_uuid,
},
**kwargs,
)
@patch("crewai_core.plus_api.Settings")
@patch("crewai_core.plus_api.httpx.Client")
def test_login_to_tool_repository_with_org_uuid(
self, mock_client_class, mock_settings_class
):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings_class.return_value = mock_settings
self.api = PlusAPI(self.api_key)
mock_client_instance = MagicMock()
mock_response = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client_instance
response = self.api.login_to_tool_repository()
self.assert_request_with_org_id(
mock_client_instance, "POST", "/crewai_plus/api/v1/tools/login", json={}
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_get_tool(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
response = self.api.get_tool("test_tool_handle")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/tools/test_tool_handle"
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.Settings")
@patch("crewai_core.plus_api.httpx.Client")
def test_get_tool_with_org_uuid(self, mock_client_class, mock_settings_class):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings_class.return_value = mock_settings
self.api = PlusAPI(self.api_key)
mock_client_instance = MagicMock()
mock_response = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client_instance
response = self.api.get_tool("test_tool_handle")
self.assert_request_with_org_id(
mock_client_instance, "GET", "/crewai_plus/api/v1/tools/test_tool_handle"
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_publish_tool(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
handle = "test_tool_handle"
public = True
version = "1.0.0"
description = "Test tool description"
encoded_file = "encoded_test_file"
response = self.api.publish_tool(
handle, public, version, description, encoded_file
)
params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
"available_exports": None,
"tools_metadata": None,
}
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools", json=params
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.Settings")
@patch("crewai_core.plus_api.httpx.Client")
def test_publish_tool_with_org_uuid(self, mock_client_class, mock_settings_class):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings_class.return_value = mock_settings
self.api = PlusAPI(self.api_key)
mock_client_instance = MagicMock()
mock_response = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client_instance
handle = "test_tool_handle"
public = True
version = "1.0.0"
description = "Test tool description"
encoded_file = "encoded_test_file"
response = self.api.publish_tool(
handle, public, version, description, encoded_file
)
expected_params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
"available_exports": None,
"tools_metadata": None,
}
self.assert_request_with_org_id(
mock_client_instance, "POST", "/crewai_plus/api/v1/tools", json=expected_params
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_publish_tool_without_description(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
handle = "test_tool_handle"
public = False
version = "2.0.0"
description = None
encoded_file = "encoded_test_file"
response = self.api.publish_tool(
handle, public, version, description, encoded_file
)
params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
"available_exports": None,
"tools_metadata": None,
}
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools", json=params
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.httpx.Client")
def test_make_request(self, mock_client_class):
mock_client_instance = MagicMock()
mock_response = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client_instance
response = self.api._make_request("GET", "test_endpoint")
mock_client_class.assert_called_once_with(trust_env=False, verify=True)
mock_client_instance.request.assert_called_once_with(
"GET", f"{self.api.base_url}/test_endpoint", headers=self.api.headers
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_deploy_by_name(self, mock_make_request):
self.api.deploy_by_name("test_project")
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/crews/by-name/test_project/deploy"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_deploy_by_uuid(self, mock_make_request):
self.api.deploy_by_uuid("test_uuid")
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/crews/test_uuid/deploy"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_crew_status_by_name(self, mock_make_request):
self.api.crew_status_by_name("test_project")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/crews/by-name/test_project/status"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_crew_status_by_uuid(self, mock_make_request):
self.api.crew_status_by_uuid("test_uuid")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/crews/test_uuid/status"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_crew_by_name(self, mock_make_request):
self.api.crew_by_name("test_project")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/crews/by-name/test_project/logs/deployment"
)
self.api.crew_by_name("test_project", "custom_log")
mock_make_request.assert_called_with(
"GET", "/crewai_plus/api/v1/crews/by-name/test_project/logs/custom_log"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_crew_by_uuid(self, mock_make_request):
self.api.crew_by_uuid("test_uuid")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/crews/test_uuid/logs/deployment"
)
self.api.crew_by_uuid("test_uuid", "custom_log")
mock_make_request.assert_called_with(
"GET", "/crewai_plus/api/v1/crews/test_uuid/logs/custom_log"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_delete_crew_by_name(self, mock_make_request):
self.api.delete_crew_by_name("test_project")
mock_make_request.assert_called_once_with(
"DELETE", "/crewai_plus/api/v1/crews/by-name/test_project"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_delete_crew_by_uuid(self, mock_make_request):
self.api.delete_crew_by_uuid("test_uuid")
mock_make_request.assert_called_once_with(
"DELETE", "/crewai_plus/api/v1/crews/test_uuid"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_list_crews(self, mock_make_request):
self.api.list_crews()
mock_make_request.assert_called_once_with("GET", "/crewai_plus/api/v1/crews")
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_create_crew(self, mock_make_request):
payload = {"name": "test_crew"}
self.api.create_crew(payload)
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/crews", json=payload
)
@patch("crewai_core.plus_api.Settings")
@patch.dict(os.environ, {"CREWAI_PLUS_URL": ""})
def test_custom_base_url(self, mock_settings_class):
mock_settings = MagicMock()
mock_settings.enterprise_base_url = "https://custom-url.com/api"
mock_settings_class.return_value = mock_settings
custom_api = PlusAPI("test_key")
self.assertEqual(
custom_api.base_url,
"https://custom-url.com/api",
)
@patch.dict(os.environ, {"CREWAI_PLUS_URL": "https://custom-url-from-env.com"})
def test_custom_base_url_from_env(self):
custom_api = PlusAPI("test_key")
self.assertEqual(
custom_api.base_url,
"https://custom-url-from-env.com",
)
@pytest.mark.asyncio
@patch("httpx.AsyncClient")
async def test_get_agent(mock_async_client_class):
api = PlusAPI("test_api_key")
mock_response = MagicMock()
mock_client_instance = AsyncMock()
mock_client_instance.get.return_value = mock_response
mock_async_client_class.return_value.__aenter__.return_value = mock_client_instance
response = await api.get_agent("test_agent_handle")
mock_client_instance.get.assert_called_once_with(
f"{api.base_url}/crewai_plus/api/v1/agents/test_agent_handle",
headers=api.headers,
)
assert response == mock_response
@pytest.mark.asyncio
@patch("httpx.AsyncClient")
@patch("crewai_core.plus_api.Settings")
async def test_get_agent_with_org_uuid(mock_settings_class, mock_async_client_class):
org_uuid = "test-org-uuid"
mock_settings = MagicMock()
mock_settings.org_uuid = org_uuid
mock_settings.enterprise_base_url = os.getenv("CREWAI_PLUS_URL")
mock_settings_class.return_value = mock_settings
api = PlusAPI("test_api_key")
mock_response = MagicMock()
mock_client_instance = AsyncMock()
mock_client_instance.get.return_value = mock_response
mock_async_client_class.return_value.__aenter__.return_value = mock_client_instance
response = await api.get_agent("test_agent_handle")
mock_client_instance.get.assert_called_once_with(
f"{api.base_url}/crewai_plus/api/v1/agents/test_agent_handle",
headers=api.headers,
)
assert "X-Crewai-Organization-Id" in api.headers
assert api.headers["X-Crewai-Organization-Id"] == org_uuid
assert response == mock_response

View File

@@ -1,293 +0,0 @@
"""Tests for TokenManager with atomic file operations."""
import json
import tempfile
import unittest
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import patch
from cryptography.fernet import Fernet
from crewai_core.token_manager import TokenManager
class TestTokenManager(unittest.TestCase):
"""Test cases for TokenManager."""
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def setUp(self, mock_get_key: unittest.mock.MagicMock) -> None:
"""Set up test fixtures."""
mock_get_key.return_value = Fernet.generate_key()
self.token_manager = TokenManager()
@patch("crewai_core.token_manager.TokenManager._read_secure_file")
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_get_or_create_key_existing(
self,
mock_get_or_create: unittest.mock.MagicMock,
mock_read: unittest.mock.MagicMock,
) -> None:
"""Test that existing key is returned when present."""
mock_key = Fernet.generate_key()
mock_get_or_create.return_value = mock_key
token_manager = TokenManager()
result = token_manager.key
self.assertEqual(result, mock_key)
def test_get_or_create_key_new(self) -> None:
"""Test that new key is created when none exists."""
mock_key = Fernet.generate_key()
with (
patch.object(self.token_manager, "_read_secure_file", return_value=None) as mock_read,
patch.object(self.token_manager, "_atomic_create_secure_file", return_value=True) as mock_atomic_create,
patch("crewai_core.token_manager.Fernet.generate_key", return_value=mock_key) as mock_generate,
):
result = self.token_manager._get_or_create_key()
self.assertEqual(result, mock_key)
mock_read.assert_called_with("secret.key")
mock_generate.assert_called_once()
mock_atomic_create.assert_called_once_with("secret.key", mock_key)
def test_get_or_create_key_race_condition(self) -> None:
"""Test that another process's key is used when atomic create fails."""
our_key = Fernet.generate_key()
their_key = Fernet.generate_key()
with (
patch.object(self.token_manager, "_read_secure_file", side_effect=[None, their_key]) as mock_read,
patch.object(self.token_manager, "_atomic_create_secure_file", return_value=False) as mock_atomic_create,
patch("crewai_core.token_manager.Fernet.generate_key", return_value=our_key),
):
result = self.token_manager._get_or_create_key()
self.assertEqual(result, their_key)
self.assertEqual(mock_read.call_count, 2)
@patch("crewai_core.token_manager.TokenManager._atomic_write_secure_file")
def test_save_tokens(
self, mock_write: unittest.mock.MagicMock
) -> None:
"""Test saving tokens encrypts and writes atomically."""
access_token = "test_token"
expires_at = int((datetime.now() + timedelta(seconds=3600)).timestamp())
self.token_manager.save_tokens(access_token, expires_at)
mock_write.assert_called_once()
args = mock_write.call_args[0]
self.assertEqual(args[0], "tokens.enc")
decrypted_data = self.token_manager.fernet.decrypt(args[1])
data = json.loads(decrypted_data)
self.assertEqual(data["access_token"], access_token)
expiration = datetime.fromisoformat(data["expiration"])
self.assertEqual(expiration, datetime.fromtimestamp(expires_at))
@patch("crewai_core.token_manager.TokenManager._read_secure_file")
def test_get_token_valid(
self, mock_read: unittest.mock.MagicMock
) -> None:
"""Test getting a valid non-expired token."""
access_token = "test_token"
expiration = (datetime.now() + timedelta(hours=1)).isoformat()
data = {"access_token": access_token, "expiration": expiration}
encrypted_data = self.token_manager.fernet.encrypt(json.dumps(data).encode())
mock_read.return_value = encrypted_data
result = self.token_manager.get_token()
self.assertEqual(result, access_token)
@patch("crewai_core.token_manager.TokenManager._read_secure_file")
def test_get_token_expired(
self, mock_read: unittest.mock.MagicMock
) -> None:
"""Test that expired token returns None."""
access_token = "test_token"
expiration = (datetime.now() - timedelta(hours=1)).isoformat()
data = {"access_token": access_token, "expiration": expiration}
encrypted_data = self.token_manager.fernet.encrypt(json.dumps(data).encode())
mock_read.return_value = encrypted_data
result = self.token_manager.get_token()
self.assertIsNone(result)
@patch("crewai_core.token_manager.TokenManager._read_secure_file")
def test_get_token_not_found(
self, mock_read: unittest.mock.MagicMock
) -> None:
"""Test that missing token file returns None."""
mock_read.return_value = None
result = self.token_manager.get_token()
self.assertIsNone(result)
@patch("crewai_core.token_manager.TokenManager._delete_secure_file")
def test_clear_tokens(
self, mock_delete: unittest.mock.MagicMock
) -> None:
"""Test clearing tokens deletes the token file."""
self.token_manager.clear_tokens()
mock_delete.assert_called_once_with("tokens.enc")
class TestAtomicFileOperations(unittest.TestCase):
"""Test atomic file operations directly."""
def setUp(self) -> None:
"""Set up test fixtures with temp directory."""
self.temp_dir = tempfile.mkdtemp()
self.original_get_path = TokenManager._get_secure_storage_path
# Patch to use temp directory
def mock_get_path() -> Path:
return Path(self.temp_dir)
TokenManager._get_secure_storage_path = staticmethod(mock_get_path)
def tearDown(self) -> None:
"""Clean up temp directory."""
TokenManager._get_secure_storage_path = staticmethod(self.original_get_path)
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_atomic_create_new_file(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test atomic create succeeds for new file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
result = tm._atomic_create_secure_file("test.txt", b"content")
self.assertTrue(result)
file_path = Path(self.temp_dir) / "test.txt"
self.assertTrue(file_path.exists())
self.assertEqual(file_path.read_bytes(), b"content")
self.assertEqual(file_path.stat().st_mode & 0o777, 0o600)
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_atomic_create_existing_file(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test atomic create fails for existing file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
# Create file first
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"original")
result = tm._atomic_create_secure_file("test.txt", b"new content")
self.assertFalse(result)
self.assertEqual(file_path.read_bytes(), b"original")
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_atomic_write_new_file(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test atomic write creates new file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
tm._atomic_write_secure_file("test.txt", b"content")
file_path = Path(self.temp_dir) / "test.txt"
self.assertTrue(file_path.exists())
self.assertEqual(file_path.read_bytes(), b"content")
self.assertEqual(file_path.stat().st_mode & 0o777, 0o600)
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_atomic_write_overwrites(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test atomic write overwrites existing file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"original")
tm._atomic_write_secure_file("test.txt", b"new content")
self.assertEqual(file_path.read_bytes(), b"new content")
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_atomic_write_no_temp_file_on_success(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test that temp file is cleaned up after successful write."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
tm._atomic_write_secure_file("test.txt", b"content")
# Check no temp files remain
temp_files = list(Path(self.temp_dir).glob(".test.txt.*"))
self.assertEqual(len(temp_files), 0)
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_read_secure_file_exists(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test reading existing file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"content")
result = tm._read_secure_file("test.txt")
self.assertEqual(result, b"content")
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_read_secure_file_not_exists(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test reading non-existent file returns None."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
result = tm._read_secure_file("nonexistent.txt")
self.assertIsNone(result)
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_delete_secure_file_exists(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test deleting existing file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"content")
tm._delete_secure_file("test.txt")
self.assertFalse(file_path.exists())
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_delete_secure_file_not_exists(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test deleting non-existent file doesn't raise."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
# Should not raise
tm._delete_secure_file("nonexistent.txt")
if __name__ == "__main__":
unittest.main()

View File

@@ -1,107 +0,0 @@
import os
import shutil
import tempfile
from pathlib import Path
import pytest
from crewai_cli import utils
@pytest.fixture
def temp_tree():
root_dir = tempfile.mkdtemp()
create_file(os.path.join(root_dir, "file1.txt"), "Hello, world!")
create_file(os.path.join(root_dir, "file2.txt"), "Another file")
os.mkdir(os.path.join(root_dir, "empty_dir"))
nested_dir = os.path.join(root_dir, "nested_dir")
os.mkdir(nested_dir)
create_file(os.path.join(nested_dir, "nested_file.txt"), "Nested content")
yield root_dir
shutil.rmtree(root_dir)
def create_file(path, content):
with open(path, "w") as f:
f.write(content)
def test_tree_find_and_replace_file_content(temp_tree):
utils.tree_find_and_replace(temp_tree, "world", "universe")
with open(os.path.join(temp_tree, "file1.txt"), "r") as f:
assert f.read() == "Hello, universe!"
def test_tree_find_and_replace_file_name(temp_tree):
old_path = os.path.join(temp_tree, "file2.txt")
new_path = os.path.join(temp_tree, "file2_renamed.txt")
os.rename(old_path, new_path)
utils.tree_find_and_replace(temp_tree, "renamed", "modified")
assert os.path.exists(os.path.join(temp_tree, "file2_modified.txt"))
assert not os.path.exists(new_path)
def test_tree_find_and_replace_directory_name(temp_tree):
utils.tree_find_and_replace(temp_tree, "empty", "renamed")
assert os.path.exists(os.path.join(temp_tree, "renamed_dir"))
assert not os.path.exists(os.path.join(temp_tree, "empty_dir"))
def test_tree_find_and_replace_nested_content(temp_tree):
utils.tree_find_and_replace(temp_tree, "Nested", "Updated")
with open(os.path.join(temp_tree, "nested_dir", "nested_file.txt"), "r") as f:
assert f.read() == "Updated content"
def test_tree_find_and_replace_no_matches(temp_tree):
utils.tree_find_and_replace(temp_tree, "nonexistent", "replacement")
assert set(os.listdir(temp_tree)) == {
"file1.txt",
"file2.txt",
"empty_dir",
"nested_dir",
}
def test_tree_copy_full_structure(temp_tree):
dest_dir = tempfile.mkdtemp()
try:
utils.tree_copy(temp_tree, dest_dir)
assert set(os.listdir(dest_dir)) == set(os.listdir(temp_tree))
assert os.path.isfile(os.path.join(dest_dir, "file1.txt"))
assert os.path.isfile(os.path.join(dest_dir, "file2.txt"))
assert os.path.isdir(os.path.join(dest_dir, "empty_dir"))
assert os.path.isdir(os.path.join(dest_dir, "nested_dir"))
assert os.path.isfile(os.path.join(dest_dir, "nested_dir", "nested_file.txt"))
finally:
shutil.rmtree(dest_dir)
def test_tree_copy_preserve_content(temp_tree):
dest_dir = tempfile.mkdtemp()
try:
utils.tree_copy(temp_tree, dest_dir)
with open(os.path.join(dest_dir, "file1.txt"), "r") as f:
assert f.read() == "Hello, world!"
with open(os.path.join(dest_dir, "nested_dir", "nested_file.txt"), "r") as f:
assert f.read() == "Nested content"
finally:
shutil.rmtree(dest_dir)
def test_tree_copy_to_existing_directory(temp_tree):
dest_dir = tempfile.mkdtemp()
try:
create_file(os.path.join(dest_dir, "existing_file.txt"), "I was here first")
utils.tree_copy(temp_tree, dest_dir)
assert os.path.isfile(os.path.join(dest_dir, "existing_file.txt"))
assert os.path.isfile(os.path.join(dest_dir, "file1.txt"))
finally:
shutil.rmtree(dest_dir)
# Tests for extract_available_exports, get_crews, get_flows, fetch_crews,
# is_valid_tool live in lib/crewai/tests/cli/test_utils.py — the canonical
# implementations are in crewai.utilities.project_utils.

View File

@@ -1,374 +0,0 @@
"""Test for version management."""
import json
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from crewai_cli.version import get_crewai_version as _get_ver
from crewai_cli.version import (
get_crewai_version,
get_latest_version_from_pypi,
is_current_version_yanked,
is_newer_version_available,
)
from crewai_core.version import (
_find_latest_non_yanked_version,
_get_cache_file,
_is_cache_valid,
_is_version_yanked,
)
def test_dynamic_versioning_consistency() -> None:
"""Test that dynamic versioning provides consistent version across all access methods."""
cli_version = get_crewai_version()
package_version = _get_ver()
assert cli_version == package_version
assert package_version is not None
assert len(package_version.strip()) > 0
class TestVersionChecking:
"""Test version checking utilities."""
def test_get_crewai_version(self) -> None:
"""Test getting current crewai version."""
version = get_crewai_version()
assert isinstance(version, str)
assert len(version) > 0
def test_get_cache_file(self) -> None:
"""Test cache file path generation."""
cache_file = _get_cache_file()
assert isinstance(cache_file, Path)
assert cache_file.name == "version_cache.json"
def test_is_cache_valid_with_fresh_cache(self) -> None:
"""Test cache validation with fresh cache."""
cache_data = {"timestamp": datetime.now().isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is True
def test_is_cache_valid_with_stale_cache(self) -> None:
"""Test cache validation with stale cache."""
old_time = datetime.now() - timedelta(hours=25)
cache_data = {"timestamp": old_time.isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
def test_is_cache_valid_with_missing_timestamp(self) -> None:
"""Test cache validation with missing timestamp."""
cache_data = {"version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
@patch("crewai_core.version.Path.exists")
@patch("crewai_core.version.request.urlopen")
def test_get_latest_version_from_pypi_success(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test successful PyPI version fetch uses releases data."""
mock_exists.return_value = False
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0": [{"yanked": False}],
"2.1.0": [{"yanked": True, "yanked_reason": "bad release"}],
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(
{"info": {"version": "2.1.0"}, "releases": releases}
).encode()
mock_urlopen.return_value.__enter__.return_value = mock_response
version = get_latest_version_from_pypi()
assert version == "2.0.0"
@patch("crewai_core.version.Path.exists")
@patch("crewai_core.version.request.urlopen")
def test_get_latest_version_from_pypi_failure(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test PyPI version fetch failure."""
from urllib.error import URLError
mock_exists.return_value = False
mock_urlopen.side_effect = URLError("Network error")
version = get_latest_version_from_pypi()
assert version is None
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version.get_latest_version_from_pypi")
def test_is_newer_version_available_true(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when newer version is available."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is True
assert current == "1.0.0"
assert latest == "2.0.0"
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version.get_latest_version_from_pypi")
def test_is_newer_version_available_false(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when no newer version is available."""
mock_current.return_value = "2.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "2.0.0"
assert latest == "2.0.0"
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version.get_latest_version_from_pypi")
def test_is_newer_version_available_with_none_latest(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when PyPI fetch fails."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = None
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "1.0.0"
assert latest is None
class TestFindLatestNonYankedVersion:
"""Test _find_latest_non_yanked_version helper."""
def test_skips_yanked_versions(self) -> None:
"""Test that yanked versions are skipped."""
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0": [{"yanked": True}],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_returns_highest_non_yanked(self) -> None:
"""Test that the highest non-yanked version is returned."""
releases = {
"1.0.0": [{"yanked": False}],
"1.5.0": [{"yanked": False}],
"2.0.0": [{"yanked": True}],
}
assert _find_latest_non_yanked_version(releases) == "1.5.0"
def test_returns_none_when_all_yanked(self) -> None:
"""Test that None is returned when all versions are yanked."""
releases = {
"1.0.0": [{"yanked": True}],
"2.0.0": [{"yanked": True}],
}
assert _find_latest_non_yanked_version(releases) is None
def test_skips_prerelease_versions(self) -> None:
"""Test that pre-release versions are skipped."""
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0a1": [{"yanked": False}],
"2.0.0rc1": [{"yanked": False}],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_skips_versions_with_empty_files(self) -> None:
"""Test that versions with no files are skipped."""
releases: dict[str, list[dict[str, bool]]] = {
"1.0.0": [{"yanked": False}],
"2.0.0": [],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_handles_invalid_version_strings(self) -> None:
"""Test that invalid version strings are skipped."""
releases = {
"1.0.0": [{"yanked": False}],
"not-a-version": [{"yanked": False}],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_partially_yanked_files_not_considered_yanked(self) -> None:
"""Test that a version with some non-yanked files is not yanked."""
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0": [{"yanked": True}, {"yanked": False}],
}
assert _find_latest_non_yanked_version(releases) == "2.0.0"
class TestIsVersionYanked:
"""Test _is_version_yanked helper."""
def test_non_yanked_version(self) -> None:
"""Test a non-yanked version returns False."""
releases = {"1.0.0": [{"yanked": False}]}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is False
assert reason == ""
def test_yanked_version_with_reason(self) -> None:
"""Test a yanked version returns True with reason."""
releases = {
"1.0.0": [{"yanked": True, "yanked_reason": "critical bug"}],
}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is True
assert reason == "critical bug"
def test_yanked_version_without_reason(self) -> None:
"""Test a yanked version returns True with empty reason."""
releases = {"1.0.0": [{"yanked": True}]}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is True
assert reason == ""
def test_unknown_version(self) -> None:
"""Test an unknown version returns False."""
releases = {"1.0.0": [{"yanked": False}]}
is_yanked, reason = _is_version_yanked("9.9.9", releases)
assert is_yanked is False
assert reason == ""
def test_partially_yanked_files(self) -> None:
"""Test a version with mixed yanked/non-yanked files is not yanked."""
releases = {
"1.0.0": [{"yanked": True}, {"yanked": False}],
}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is False
assert reason == ""
def test_multiple_yanked_files_picks_first_reason(self) -> None:
"""Test that the first available reason is returned."""
releases = {
"1.0.0": [
{"yanked": True, "yanked_reason": ""},
{"yanked": True, "yanked_reason": "second reason"},
],
}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is True
assert reason == "second reason"
class TestIsCurrentVersionYanked:
"""Test is_current_version_yanked public function."""
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version._get_cache_file")
def test_reads_from_valid_cache(
self, mock_cache_file: MagicMock, mock_version: MagicMock, tmp_path: Path
) -> None:
"""Test reading yanked status from a valid cache."""
mock_version.return_value = "1.0.0"
cache_file = tmp_path / "version_cache.json"
cache_data = {
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"current_version": "1.0.0",
"current_version_yanked": True,
"current_version_yanked_reason": "bad release",
}
cache_file.write_text(json.dumps(cache_data))
mock_cache_file.return_value = cache_file
is_yanked, reason = is_current_version_yanked()
assert is_yanked is True
assert reason == "bad release"
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version._get_cache_file")
def test_not_yanked_from_cache(
self, mock_cache_file: MagicMock, mock_version: MagicMock, tmp_path: Path
) -> None:
"""Test non-yanked status from a valid cache."""
mock_version.return_value = "2.0.0"
cache_file = tmp_path / "version_cache.json"
cache_data = {
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"current_version": "2.0.0",
"current_version_yanked": False,
"current_version_yanked_reason": "",
}
cache_file.write_text(json.dumps(cache_data))
mock_cache_file.return_value = cache_file
is_yanked, reason = is_current_version_yanked()
assert is_yanked is False
assert reason == ""
@patch("crewai_core.version.get_latest_version_from_pypi")
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version._get_cache_file")
def test_triggers_fetch_on_stale_cache(
self,
mock_cache_file: MagicMock,
mock_version: MagicMock,
mock_fetch: MagicMock,
tmp_path: Path,
) -> None:
"""Test that a stale cache triggers a re-fetch."""
mock_version.return_value = "1.0.0"
cache_file = tmp_path / "version_cache.json"
old_time = datetime.now() - timedelta(hours=25)
cache_data = {
"version": "2.0.0",
"timestamp": old_time.isoformat(),
"current_version": "1.0.0",
"current_version_yanked": True,
"current_version_yanked_reason": "old reason",
}
cache_file.write_text(json.dumps(cache_data))
mock_cache_file.return_value = cache_file
fresh_cache = {
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"current_version": "1.0.0",
"current_version_yanked": False,
"current_version_yanked_reason": "",
}
def write_fresh_cache() -> str:
cache_file.write_text(json.dumps(fresh_cache))
return "2.0.0"
mock_fetch.side_effect = lambda: write_fresh_cache()
is_yanked, reason = is_current_version_yanked()
assert is_yanked is False
mock_fetch.assert_called_once()
@patch("crewai_core.version.get_latest_version_from_pypi")
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version._get_cache_file")
def test_returns_false_on_fetch_failure(
self,
mock_cache_file: MagicMock,
mock_version: MagicMock,
mock_fetch: MagicMock,
tmp_path: Path,
) -> None:
"""Test that fetch failure returns not yanked."""
mock_version.return_value = "1.0.0"
cache_file = tmp_path / "version_cache.json"
mock_cache_file.return_value = cache_file
mock_fetch.return_value = None
is_yanked, reason = is_current_version_yanked()
assert is_yanked is False
assert reason == ""
# TestConsoleFormatterVersionCheck tests remain in lib/crewai/tests/cli/test_version.py
# as they depend on crewai.events.utils.console_formatter (core package).

View File

@@ -1,8 +0,0 @@
# crewai-core
Shared utilities used by both `crewai` and `crewai-cli`: version lookup, storage
paths, user-data helpers, telemetry, and the printer.
This package is a leaf — it has no dependency on the `crewai` framework — and is
pulled in transitively by `crewai` and `crewai-cli`. End users do not normally
install it directly.

View File

@@ -1,38 +0,0 @@
[project]
name = "crewai-core"
dynamic = ["version"]
description = "Shared utilities for CrewAI — version, paths, user-data, telemetry, printer."
readme = "README.md"
authors = [
{ name = "Greyson R. LaLonde", email = "greyson@crewai.com" }
]
requires-python = ">=3.10, <3.14"
dependencies = [
"appdirs~=1.4.4",
"cryptography>=42.0",
"httpx~=0.28.1",
"packaging>=23.0",
"portalocker~=2.7.0",
"pyjwt>=2.9.0,<3",
"pydantic>=2.11.9,<2.13",
"rich>=13.7.1",
"opentelemetry-api~=1.34.0",
"opentelemetry-sdk~=1.34.0",
"opentelemetry-exporter-otlp-proto-http~=1.34.0",
"tomli~=2.0.2",
]
[project.urls]
Homepage = "https://crewai.com"
Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.version]
path = "src/crewai_core/__init__.py"
[tool.hatch.build.targets.wheel]
packages = ["src/crewai_core"]

View File

@@ -1 +0,0 @@
__version__ = "1.14.5a5"

View File

@@ -1,24 +0,0 @@
"""OAuth2 authentication primitives — shared by crewai and crewai-cli."""
from __future__ import annotations
from crewai_core.auth.oauth2 import (
AuthenticationCommand as AuthenticationCommand,
Oauth2Settings as Oauth2Settings,
ProviderFactory as ProviderFactory,
)
from crewai_core.auth.token import (
AuthError as AuthError,
get_auth_token as get_auth_token,
)
from crewai_core.auth.utils import validate_jwt_token as validate_jwt_token
__all__ = [
"AuthError",
"AuthenticationCommand",
"Oauth2Settings",
"ProviderFactory",
"get_auth_token",
"validate_jwt_token",
]

View File

@@ -1,8 +0,0 @@
"""Authentication constants."""
from __future__ import annotations
from typing import Final
ALGORITHMS: Final[list[str]] = ["RS256"]

View File

@@ -1 +0,0 @@
"""OAuth2 authentication providers."""

View File

@@ -1,46 +0,0 @@
"""Base OAuth2 provider interface."""
from __future__ import annotations
from abc import ABC, abstractmethod
from crewai_core.auth.oauth2 import Oauth2Settings
class BaseProvider(ABC):
"""Abstract base class for OAuth2 providers."""
def __init__(self, settings: Oauth2Settings):
self.settings = settings
@abstractmethod
def get_authorize_url(self) -> str:
"""Return the authorization endpoint URL."""
@abstractmethod
def get_token_url(self) -> str:
"""Return the token endpoint URL."""
@abstractmethod
def get_jwks_url(self) -> str:
"""Return the JWKS endpoint URL."""
@abstractmethod
def get_issuer(self) -> str:
"""Return the OAuth issuer identifier."""
@abstractmethod
def get_audience(self) -> str:
"""Return the OAuth audience identifier."""
@abstractmethod
def get_client_id(self) -> str:
"""Return the OAuth client identifier."""
def get_required_fields(self) -> list[str]:
"""Return provider-specific keys required inside ``Oauth2Settings.extra``."""
return []
def get_oauth_scopes(self) -> list[str]:
"""Return the OAuth scopes to request."""
return ["openid", "profile", "email"]

View File

@@ -1,17 +0,0 @@
"""Authentication token retrieval."""
from __future__ import annotations
from crewai_core.token_manager import TokenManager
class AuthError(Exception):
"""Raised when authentication fails."""
def get_auth_token() -> str:
"""Return the saved authentication token; raise ``AuthError`` if missing."""
access_token = TokenManager().get_token()
if not access_token:
raise AuthError("No token found, make sure you are logged in")
return access_token

View File

@@ -1,22 +0,0 @@
"""Constants shared by both crewai and crewai-cli."""
from __future__ import annotations
from typing import Final
CREWAI_TRAINED_AGENTS_FILE_ENV: Final[str] = "CREWAI_TRAINED_AGENTS_FILE"
TRAINING_DATA_FILE: Final[str] = "training_data.pkl"
TRAINED_AGENTS_DATA_FILE: Final[str] = "trained_agents_data.pkl"
KNOWLEDGE_DIRECTORY: Final[str] = "knowledge"
MAX_FILE_NAME_LENGTH: Final[int] = 255
DEFAULT_CREWAI_ENTERPRISE_URL: Final[str] = "https://app.crewai.com"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_PROVIDER: Final[str] = "workos"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE: Final[str] = (
"client_01JNJQWBJ4SPFN3SWJM5T7BDG8"
)
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID: Final[str] = (
"client_01JYT06R59SP0NXYGD994NFXXX"
)
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN: Final[str] = "login.crewai.com"

View File

@@ -1,89 +0,0 @@
"""Centralised lock factory.
If ``REDIS_URL`` is set and the ``redis`` package is installed, locks are
distributed via ``portalocker.RedisLock``. Otherwise, falls back to the
standard file-based ``portalocker.Lock`` in the system temp dir.
"""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from functools import lru_cache
from hashlib import md5
import logging
import os
import tempfile
from typing import TYPE_CHECKING, Final
import portalocker
import portalocker.exceptions
if TYPE_CHECKING:
import redis
logger = logging.getLogger(__name__)
_REDIS_URL: str | None = os.environ.get("REDIS_URL")
_DEFAULT_TIMEOUT: Final[int] = 120
def _redis_available() -> bool:
"""Return True if redis is installed and REDIS_URL is set."""
if not _REDIS_URL:
return False
try:
import redis # noqa: F401
return True
except ImportError:
return False
@lru_cache(maxsize=1)
def _redis_connection() -> redis.Redis[bytes]:
"""Return a cached Redis connection, creating one on first call."""
from redis import Redis
if _REDIS_URL is None:
raise ValueError("REDIS_URL environment variable is not set")
return Redis.from_url(_REDIS_URL)
@contextmanager
def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]:
"""Acquire a named lock, yielding while it is held.
Args:
name: A human-readable lock name (e.g. ``"chromadb_init"``).
Automatically namespaced to avoid collisions.
timeout: Maximum seconds to wait for the lock before raising.
"""
channel = f"crewai:{md5(name.encode(), usedforsecurity=False).hexdigest()}"
if _redis_available():
with portalocker.RedisLock(
channel=channel,
connection=_redis_connection(),
timeout=timeout,
):
yield
else:
lock_dir = tempfile.gettempdir()
lock_path = os.path.join(lock_dir, f"{channel}.lock")
try:
pl = portalocker.Lock(lock_path, timeout=timeout)
pl.acquire()
except portalocker.exceptions.BaseLockException as exc:
raise portalocker.exceptions.LockException(
f"Failed to acquire lock '{name}' at {lock_path} "
f"(timeout={timeout}s). This commonly occurs in "
f"multi-process environments. "
) from exc
try:
yield
finally:
pl.release() # type: ignore[no-untyped-call]

View File

@@ -1,26 +0,0 @@
"""Path management utilities for CrewAI storage and configuration."""
from __future__ import annotations
import os
from pathlib import Path
import appdirs
def get_project_directory_name() -> str:
"""Return the current project directory name (or ``CREWAI_STORAGE_DIR``)."""
return os.environ.get("CREWAI_STORAGE_DIR", Path.cwd().name)
def db_storage_path() -> str:
"""Return the path for SQLite database / app-data storage.
Creates the directory if it does not exist.
"""
app_name = get_project_directory_name()
app_author = "CrewAI"
data_dir = Path(appdirs.user_data_dir(app_name, app_author))
data_dir.mkdir(parents=True, exist_ok=True)
return str(data_dir)

View File

@@ -1,103 +0,0 @@
"""Colored console-output utilities and the shared output-suppression flag."""
from __future__ import annotations
from contextvars import ContextVar
from typing import TYPE_CHECKING, Final, Literal, NamedTuple
if TYPE_CHECKING:
from _typeshed import SupportsWrite
_suppress_console_output: ContextVar[bool] = ContextVar(
"_suppress_console_output", default=False
)
def set_suppress_console_output(suppress: bool) -> object:
"""Toggle suppression of console output for the current context.
Returns a token that can be passed to ``ContextVar.reset`` to restore the
previous value.
"""
return _suppress_console_output.set(suppress)
def should_suppress_console_output() -> bool:
"""Return True if console output should currently be suppressed."""
return _suppress_console_output.get()
PrinterColor = Literal[
"purple",
"bold_purple",
"green",
"bold_green",
"cyan",
"bold_cyan",
"magenta",
"bold_magenta",
"yellow",
"bold_yellow",
"red",
"blue",
"bold_blue",
]
_COLOR_CODES: Final[dict[PrinterColor, str]] = {
"purple": "\033[95m",
"bold_purple": "\033[1m\033[95m",
"red": "\033[91m",
"bold_green": "\033[1m\033[92m",
"green": "\033[32m",
"blue": "\033[94m",
"bold_blue": "\033[1m\033[94m",
"yellow": "\033[93m",
"bold_yellow": "\033[1m\033[93m",
"cyan": "\033[96m",
"bold_cyan": "\033[1m\033[96m",
"magenta": "\033[35m",
"bold_magenta": "\033[1m\033[35m",
}
RESET: Final[str] = "\033[0m"
class ColoredText(NamedTuple):
"""Text plus an optional color, used for multicolor lines."""
text: str
color: PrinterColor | None
class Printer:
"""Handles colored console output formatting."""
@staticmethod
def print(
content: str | list[ColoredText],
color: PrinterColor | None = None,
sep: str | None = " ",
end: str | None = "\n",
file: SupportsWrite[str] | None = None,
flush: Literal[False] = False,
) -> None:
"""Print ``content`` with optional color, honoring suppression context."""
if should_suppress_console_output():
return
if isinstance(content, str):
content = [ColoredText(content, color)]
print(
"".join(
f"{_COLOR_CODES[c.color] if c.color else ''}{c.text}{RESET}"
for c in content
),
sep=sep,
end=end,
file=file,
flush=flush,
)
PRINTER: Printer = Printer()

View File

@@ -1,109 +0,0 @@
"""TOML / pyproject.toml utilities shared by crewai and crewai-cli."""
from __future__ import annotations
from functools import reduce
import sys
from typing import Any
from rich.console import Console
import tomli
if sys.version_info >= (3, 11):
import tomllib
console = Console()
def read_toml(file_path: str = "pyproject.toml") -> dict[str, Any]:
"""Read a TOML file from disk and return its parsed contents."""
with open(file_path, "rb") as f:
return tomli.load(f)
def parse_toml(content: str) -> dict[str, Any]:
"""Parse a TOML string and return its parsed contents."""
if sys.version_info >= (3, 11):
return tomllib.loads(content)
return tomli.loads(content)
def _get_nested_value(data: dict[str, Any], keys: list[str]) -> Any:
return reduce(dict.__getitem__, keys, data)
def _get_project_attribute(
pyproject_path: str, keys: list[str], require: bool
) -> Any | None:
"""Look up a dotted attribute path inside ``pyproject_path``.
The file must declare ``crewai`` in ``[project].dependencies`` for the
lookup to succeed (a guard against running these helpers outside a crewai
project directory). When ``require=True``, missing attributes raise
``SystemExit`` after printing a friendly error.
"""
attribute = None
try:
with open(pyproject_path, "r") as f:
pyproject_content = parse_toml(f.read())
dependencies = (
_get_nested_value(pyproject_content, ["project", "dependencies"]) or []
)
if not any(True for dep in dependencies if "crewai" in dep):
raise Exception("crewai is not in the dependencies.")
attribute = _get_nested_value(pyproject_content, keys)
except FileNotFoundError:
console.print(f"Error: {pyproject_path} not found.", style="bold red")
except KeyError:
console.print(
f"Error: {pyproject_path} is not a valid pyproject.toml file.",
style="bold red",
)
except Exception as e:
if sys.version_info >= (3, 11) and isinstance(e, tomllib.TOMLDecodeError):
console.print(
f"Error: {pyproject_path} is not a valid TOML file.", style="bold red"
)
else:
console.print(
f"Error reading the pyproject.toml file: {e}", style="bold red"
)
if require and not attribute:
console.print(
f"Unable to read '{'.'.join(keys)}' in the pyproject.toml file. "
"Please verify that the file exists and contains the specified attribute.",
style="bold red",
)
raise SystemExit
return attribute
def get_project_name(
pyproject_path: str = "pyproject.toml", require: bool = False
) -> str | None:
"""Return the project name from ``pyproject.toml``."""
return _get_project_attribute(pyproject_path, ["project", "name"], require=require)
def get_project_version(
pyproject_path: str = "pyproject.toml", require: bool = False
) -> str | None:
"""Return the project version from ``pyproject.toml``."""
return _get_project_attribute(
pyproject_path, ["project", "version"], require=require
)
def get_project_description(
pyproject_path: str = "pyproject.toml", require: bool = False
) -> str | None:
"""Return the project description from ``pyproject.toml``."""
return _get_project_attribute(
pyproject_path, ["project", "description"], require=require
)

View File

@@ -1,272 +0,0 @@
"""Anonymous telemetry collection — base implementation.
This module is the leaf telemetry layer used by both ``crewai`` (which extends
it with framework-specific spans + event-bus signal hooks) and ``crewai-cli``
(which uses it directly to emit deployment / template / flow-creation spans).
No prompts, task descriptions, agent backstories/goals, responses, or sensitive
data are collected.
"""
from __future__ import annotations
import asyncio
import atexit
from collections.abc import Callable
import contextlib
import logging
import os
import threading
from typing import Any, Final
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import Span, Status, StatusCode
from typing_extensions import Self
logger = logging.getLogger(__name__)
CREWAI_TELEMETRY_BASE_URL: Final[str] = "https://telemetry.crewai.com:4319"
CREWAI_TELEMETRY_SERVICE_NAME: Final[str] = "crewAI-telemetry"
def close_span(span: Span) -> None:
"""Set span status to OK and end it."""
span.set_status(Status(StatusCode.OK))
span.end()
@contextlib.contextmanager
def suppress_warnings() -> Any:
"""Suppress noisy warnings during otel provider setup."""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
yield
class SafeOTLPSpanExporter(OTLPSpanExporter):
"""OTLP exporter that swallows export failures so telemetry never crashes the app."""
def export(self, spans: Any) -> SpanExportResult:
try:
return super().export(spans)
except Exception as e:
logger.debug("Telemetry export failed: %s", e)
return SpanExportResult.FAILURE
class Telemetry:
"""Base telemetry: OTLP setup + the spans needed by the CLI.
crewai's runtime extends this with crew/agent/task/tool/flow execution spans
and event-bus signal handlers (see ``crewai.telemetry.telemetry``).
"""
_instance = None
_lock = threading.Lock()
def __new__(cls) -> Self:
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self) -> None:
if hasattr(self, "_initialized") and self._initialized:
return
self.ready: bool = False
self.trace_set: bool = False
self._initialized: bool = True
if self._is_telemetry_disabled():
return
try:
self.resource = Resource(
attributes={SERVICE_NAME: CREWAI_TELEMETRY_SERVICE_NAME},
)
with suppress_warnings():
self.provider = TracerProvider(resource=self.resource)
processor = BatchSpanProcessor(
SafeOTLPSpanExporter(
endpoint=f"{CREWAI_TELEMETRY_BASE_URL}/v1/traces",
timeout=30,
)
)
self.provider.add_span_processor(processor)
self._register_shutdown_handlers()
self.ready = True
except Exception as e:
if isinstance(
e,
(SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError),
):
raise
self.ready = False
@classmethod
def _is_telemetry_disabled(cls) -> bool:
return (
os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true"
or os.getenv("CREWAI_DISABLE_TELEMETRY", "false").lower() == "true"
or os.getenv("CREWAI_DISABLE_TRACKING", "false").lower() == "true"
)
def _should_execute_telemetry(self) -> bool:
return self.ready and not self._is_telemetry_disabled()
def _register_shutdown_handlers(self) -> None:
"""Register an atexit flush. Subclasses may extend with signal hooks."""
atexit.register(self._shutdown)
def _shutdown(self) -> None:
if not self.ready:
return
try:
self.provider.force_flush(timeout_millis=5000)
self.provider.shutdown()
self.ready = False
except Exception as e:
logger.debug("Telemetry shutdown failed: %s", e)
def set_tracer(self) -> None:
"""Install our TracerProvider as the global one (idempotent)."""
if self.ready and not self.trace_set:
try:
with suppress_warnings():
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception as e:
logger.debug("Failed to set tracer provider: %s", e)
self.ready = False
self.trace_set = False
def _safe_telemetry_operation(
self, operation: Callable[[], Span | None]
) -> Span | None:
"""Run a span-returning telemetry operation, swallowing failures."""
if not self._should_execute_telemetry():
return None
try:
return operation()
except Exception as e:
logger.debug("Telemetry operation failed: %s", e)
return None
def _safe_telemetry_procedure(self, operation: Callable[[], None]) -> None:
"""Run a void telemetry procedure, swallowing failures."""
if not self._should_execute_telemetry():
return
try:
operation()
except Exception as e:
logger.debug("Telemetry operation failed: %s", e)
def _add_attribute(self, span: Span | None, key: str, value: Any) -> None:
if span is None:
return
def _operation() -> None:
span.set_attribute(key, value)
self._safe_telemetry_procedure(_operation)
# --- CLI-facing spans ---------------------------------------------------
def deploy_signup_error_span(self) -> None:
"""Records when an error occurs during the deployment signup process."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Deploy Signup Error")
close_span(span)
self._safe_telemetry_procedure(_operation)
def start_deployment_span(self, uuid: str | None = None) -> None:
"""Records the start of a deployment process."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Start Deployment")
if uuid:
self._add_attribute(span, "uuid", uuid)
close_span(span)
self._safe_telemetry_procedure(_operation)
def create_crew_deployment_span(self) -> None:
"""Records the creation of a new crew deployment."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Create Crew Deployment")
close_span(span)
self._safe_telemetry_procedure(_operation)
def get_crew_logs_span(
self, uuid: str | None, log_type: str = "deployment"
) -> None:
"""Records the retrieval of crew logs."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Get Crew Logs")
self._add_attribute(span, "log_type", log_type)
if uuid:
self._add_attribute(span, "uuid", uuid)
close_span(span)
self._safe_telemetry_procedure(_operation)
def remove_crew_span(self, uuid: str | None = None) -> None:
"""Records the removal of a crew."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Remove Crew")
if uuid:
self._add_attribute(span, "uuid", uuid)
close_span(span)
self._safe_telemetry_procedure(_operation)
def flow_creation_span(self, flow_name: str) -> None:
"""Records the creation of a new flow."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Creation")
self._add_attribute(span, "flow_name", flow_name)
close_span(span)
self._safe_telemetry_procedure(_operation)
def template_installed_span(self, template_name: str) -> None:
"""Records when a template is downloaded and installed."""
from crewai_core.version import get_crewai_version
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Template Installed")
self._add_attribute(span, "crewai_version", get_crewai_version())
self._add_attribute(span, "template_name", template_name)
close_span(span)
self._safe_telemetry_procedure(_operation)

View File

@@ -1,56 +0,0 @@
"""Tool-repository credential helpers shared by crewai and crewai-cli."""
from __future__ import annotations
import os
from typing import Any
from crewai_core.project import read_toml
from crewai_core.settings import Settings
def build_env_with_tool_repository_credentials(
repository_handle: str,
) -> dict[str, Any]:
"""Return a copy of ``os.environ`` augmented with UV_INDEX_* credentials
for ``repository_handle``.
The handle is normalized to upper-case with hyphens replaced by underscores
(matching ``uv``'s env-var convention).
"""
repository_handle = repository_handle.upper().replace("-", "_")
settings = Settings()
env = os.environ.copy()
env[f"UV_INDEX_{repository_handle}_USERNAME"] = str(
settings.tool_repository_username or ""
)
env[f"UV_INDEX_{repository_handle}_PASSWORD"] = str(
settings.tool_repository_password or ""
)
return env
def build_env_with_all_tool_credentials() -> dict[str, Any]:
"""Return ``os.environ`` augmented with UV_INDEX_* credentials for every
private index referenced under ``[tool.uv.sources]`` in ``pyproject.toml``.
Errors reading ``pyproject.toml`` are swallowed — the un-augmented
environment is returned in that case.
"""
env = os.environ.copy()
try:
pyproject_data = read_toml()
sources = pyproject_data.get("tool", {}).get("uv", {}).get("sources", {})
for source_config in sources.values():
if isinstance(source_config, dict):
index = source_config.get("index")
if index:
index_env = build_env_with_tool_repository_credentials(index)
env.update(index_env)
except Exception: # noqa: S110
pass
return env

Some files were not shown because too many files have changed in this diff Show More