Compare commits

..

1 Commits

Author SHA1 Message Date
Devin AI
ee8b3be8e5 fix(tracing): stop nagging users who declined tracing (#5665)
- When user explicitly declined tracing, skip the 'Tracing is disabled'
  message instead of showing it on every crew/flow execution
- Add CREWAI_SUPPRESS_TRACING_MESSAGES env var to let users fully
  suppress the message
- Remove duplicate identical if/else branches in all four
  _show_tracing_disabled_message implementations
- Add 24 tests covering suppression via env var, context var, and
  user-declined scenarios

Co-Authored-By: João <joao@crewai.com>
2026-04-30 04:52:51 +00:00
319 changed files with 2884 additions and 10857 deletions

5
.github/security.md vendored
View File

@@ -5,10 +5,7 @@ CrewAI ecosystem.
### How to Report
Please submit reports through one of the following channels:
- **crewai-vdp-ess@submit.bugcrowd.com**
- https://security.crewai.com
Please submit reports to **crewai-vdp-ess@submit.bugcrowd.com**
- **Please do not** disclose vulnerabilities via public GitHub issues, pull requests,
or social media

View File

@@ -19,7 +19,7 @@ repos:
language: system
pass_filenames: true
types: [python]
exclude: ^(lib/crewai/src/crewai/cli/templates/|lib/cli/src/crewai_cli/templates/|lib/cli/tests/|lib/crewai/tests/|lib/crewai-tools/tests/|lib/crewai-files/tests/|lib/devtools/tests/)
exclude: ^(lib/crewai/src/crewai/cli/templates/|lib/crewai/tests/|lib/crewai-tools/tests/|lib/crewai-files/tests/)
- repo: https://github.com/astral-sh/uv-pre-commit
rev: 0.11.3
hooks:

View File

@@ -54,13 +54,12 @@ _original_from_serialized_response = getattr(
)
if _original_from_serialized_response is not None:
_from_serialized: Any = _original_from_serialized_response
def _patched_from_serialized_response(
request: Any, serialized_response: Any, history: Any = None
) -> Any:
"""Patched version that ensures response._content is properly set."""
response = _from_serialized(request, serialized_response, history)
response = _original_from_serialized_response(request, serialized_response, history)
# Explicitly set _content to avoid ResponseNotRead errors
# The content was passed to the constructor but the mocked read() prevents
# proper initialization of the internal state
@@ -256,8 +255,7 @@ def vcr_cassette_dir(request: Any) -> str:
for parent in test_file.parents:
if (
parent.name
in ("crewai", "crewai-tools", "crewai-files", "cli", "crewai-core")
parent.name in ("crewai", "crewai-tools", "crewai-files")
and parent.parent.name == "lib"
):
package_root = parent

View File

@@ -26,7 +26,7 @@ mode: "wide"
</Step>
<Step title="مراقبة التقدم">
استخدم `GET /status/{kickoff_id}` للتحقق من حالة التنفيذ واسترجاع النتائج.
استخدم `GET /{kickoff_id}/status` للتحقق من حالة التنفيذ واسترجاع النتائج.
</Step>
</Steps>
@@ -65,7 +65,7 @@ https://your-crew-name.crewai.com
1. **الاكتشاف**: استدعِ `GET /inputs` لفهم ما يحتاجه طاقمك
2. **التنفيذ**: أرسل المدخلات عبر `POST /kickoff` لبدء المعالجة
3. **المراقبة**: استعلم عن `GET /status/{kickoff_id}` حتى الاكتمال
3. **المراقبة**: استعلم عن `GET /{kickoff_id}/status` حتى الاكتمال
4. **النتائج**: استخرج المخرجات النهائية من الاستجابة المكتملة
## معالجة الأخطاء

View File

@@ -1,6 +1,6 @@
---
title: "GET /status/{kickoff_id}"
title: "GET /{kickoff_id}/status"
description: "الحصول على حالة التنفيذ"
openapi: "/enterprise-api.en.yaml GET /status/{kickoff_id}"
openapi: "/enterprise-api.en.yaml GET /{kickoff_id}/status"
mode: "wide"
---

View File

@@ -4,99 +4,6 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="4 مايو 2026">
## v1.14.5a2
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a2)
## ما الذي تغير
### إصلاحات الأخطاء
- إصلاح استعادة مخرجات المهام في كتلة finally
- تضمين `thoughts_token_count` في رموز الإكمال
- الحفاظ على مخرجات المهام عبر تفريغ دفعات غير متزامنة
- تمرير kwargs إلى استدعاءات المحمل في `CrewAIRagAdapter`
- منع `result_as_answer` من إرجاع رسالة كتلة الخطاف كإجابة نهائية
- منع `result_as_answer` من إرجاع خطأ كإجابة نهائية
- استخدام `acall` لتحويل المخرجات في المسارات غير المتزامنة
- منع تغيير كلمات التوقف المشتركة في LLM عبر الوكلاء
- التعامل مع مدخلات `BaseModel` في `convert_to_model`
### الوثائق
- توثيق متغيرات البيئة الإضافية
- تحديث سجل التغييرات والإصدار لـ v1.14.5a1
## المساهمون
@NIK-TIGER-BILL, @greysonlalonde, @lorenzejay, @minasami-pr, @theCyberTech, @wishhyt
</Update>
<Update label="1 مايو 2026">
## v1.14.5a1
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a1)
## ما الذي تغير
### الميزات
- إضافة معلمة بدء `restore_from_state_id`
- إضافة تسليط الضوء على ExaSearchTool وإعادة تسميته من EXASearchTool
### إصلاحات الأخطاء
- إصلاح المواقع المفقودة لـ crewai في تدفق الإصدار
- ضمان تحميل أحداث المهارات للآثار
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.4
## المساهمون
@akaKuruma, @github-actions[bot], @greysonlalonde, @lorenzejay, @theishangoswami
</Update>
<Update label="1 مايو 2026">
## v1.14.4
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.4)
## ما الذي تغير
### الميزات
- إضافة دعم لمفتاح الاستمرارية المخصص في @persist
- إضافة دعم واجهة برمجة التطبيقات للردود لمزود Azure OpenAI
- تمرير credential_scopes إلى عميل Azure AI Inference
- إضافة دليل إعداد هوية عبء العمل لـ Vertex AI
- إضافة Tavily Research والحصول على Research
- إضافة أدوات MCP من You.com للبحث، البحث، واستخراج المحتوى
### إصلاحات الأخطاء
- إصلاح مشكلة السقوط عند عدم تطابق تعبير JSON regex مع JSON صالح
- إصلاح للحفاظ على tool_calls عندما تحتوي الاستجابة أيضًا على نص
- إصلاح لتمرير base_url و api_key إلى instructor.from_provider
- إصلاح لتحذير وإرجاع فارغ عندما لا يُرجع خادم MCP الأصلي أي أدوات
- إصلاح لاستخدام متغير الرسائل الموثقة في معالجات غير البث
- إصلاح لحماية مساعدي وصف دردشة الطاقم ضد فشل LLM
- إصلاح لإعادة تعيين الرسائل والتكرارات بين الاستدعاءات
- إصلاح لتمرير ملف trained-agents من خلال replay و test
- إصلاح لاحترام ملف trained-agents المخصص في الاستدلال
- إصلاح لربط الوكلاء المخصصين بالمهام فقط بالطاقم لملفات الإدخال متعددة الأنماط
- إصلاح لتسلسل callable الحواجز كـ null لتسجيل JSON
- إصلاح إعادة تسمية force_final_answer لتجنب توجيه ذاتي
- إصلاح زيادة litellm لإصلاح SSTI؛ تجاهل CVE غير القابل للإصلاح في pip
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.4a1
- إضافة صفحة أدوات E2B Sandbox
- إضافة وثائق أدوات صندوق Daytona
## المساهمون
@EdwardIrby, @dependabot[bot], @factory-droid-oss, @factory-droid[bot], @greysonlalonde, @kunalk16, @lorenzejay, @lucasgomide, @manisrinivasan2k1, @mattatcha, @vinibrsl
</Update>
<Update label="29 أبريل 2026">
## v1.14.4a1

View File

@@ -380,41 +380,32 @@ class AnotherFlow(Flow[dict]):
print("Method-level persisted runs:", self.state["runs"])
```
### تفرع الحالة المستمرة
### مفتاح استمرارية مخصص
يدعم `@persist` نمطين متميزين للترطيب في `kickoff` / `kickoff_async`:
- `kickoff(inputs={"id": <uuid>})` — **استئناف**: يحمّل أحدث لقطة لـ UUID المقدم ويستمر في الكتابة تحت نفس `flow_uuid`. يمتد التاريخ.
- `kickoff(restore_from_state_id=<uuid>)` — **تفرع**: يحمّل أحدث لقطة لـ UUID المقدم، يرطّب حالة التشغيل الجديد منها، ثم يعيّن `state.id` جديدًا (مولّدًا تلقائيًا، أو `inputs["id"]` إذا تم تثبيته). تذهب كتابات `@persist` للتشغيل الجديد تحت `state.id` الجديد؛ يتم الحفاظ على تاريخ تدفق المصدر.
افتراضيًا، يستخدم `@persist` الحقل `state.id` المُولّد تلقائيًا كمفتاح للاستمرارية. إذا كان لتدفقك معرّف خاص به — مثل `conversation_id` مشترك بين عدة جلسات — يمكنك تمرير الوسيط `key` ليستخدم `@persist` تلك السمة كـ UUID للتدفق:
```python
from crewai.flow.flow import Flow, start
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
class ConversationState(BaseModel):
conversation_id: str
turn: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@persist(key="conversation_id") # استخدام حقل مخصص كمفتاح للاستمرارية
class ConversationFlow(Flow[ConversationState]):
@start()
def step(self):
self.state.counter += 1
print(f"[id={self.state.id}] counter={self.state.counter}")
def begin(self):
self.state.turn += 1
print(f"Conversation {self.state.conversation_id} turn {self.state.turn}")
# التشغيل 1: حالة جديدة، العداد 0 -> 1، محفوظ تحت flow_1.state.id
flow_1 = CounterFlow()
flow_1.kickoff()
# التفرع: ترطيب من أحدث لقطة لـ flow_1، لكن باستخدام state.id جديد
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# يبدأ flow_2.state.counter بـ 1 (مرطّب)، ثم تزيده step() إلى 2.
# flow_2.state.id != flow_1.state.id؛ تاريخ flow_1 لم يتغيّر.
# إعادة تشغيل المحادثة بنفس conversation_id يُعيد تحميل الحالة السابقة
flow = ConversationFlow(conversation_id="user-42")
flow.kickoff()
```
إذا لم يطابق `restore_from_state_id` المقدم أي حالة مستمرة، يعود kickoff بصمت إلى السلوك الافتراضي — نفس سلوك `inputs["id"]` عند عدم العثور عليه. الجمع بين `restore_from_state_id` و `from_checkpoint` يطلق `ValueError`؛ اختر مصدر ترطيب واحدًا. تثبيت `inputs["id"]` أثناء التفرع يشارك مفتاح الاستمرارية مع تدفق آخر — عادةً ما تريد استخدام `restore_from_state_id` فقط.
يقرأ المزخرف القيمة من `state[key]` للحالات من نوع dict، ومن `getattr(state, key)` للحالات من نوع Pydantic / كائن. إذا كانت السمة المحددة غير موجودة أو قيمتها falsy عند الحفظ، يُطلق `@persist` خطأ `ValueError` مثل `Flow state is missing required persistence key 'conversation_id'`. عند حذف `key`، يظل السلوك الأصلي قائمًا ويُستخدم `state.id`.
### كيف تعمل

View File

@@ -146,14 +146,15 @@ class ProductionFlow(Flow[AppState]):
# ...
```
افتراضيًا، يستأنف `@persist` تدفقًا عند توفير `kickoff(inputs={"id": <uuid>})`، مما يمدّ نفس تاريخ `flow_uuid`. لـ **تفرع** تدفق مستمر إلى نسبٍ جديد — ترطيب الحالة من تشغيل سابق ولكن الكتابة تحت `state.id` جديد — مرّر `restore_from_state_id`:
افتراضيًا، يستخدم `@persist` الحقل `state.id` المُولّد تلقائيًا كمفتاح للحالة المحفوظة. إذا كان تطبيقك يمتلك معرّفًا طبيعيًا بالفعل — مثل `conversation_id` يربط عدة تشغيلات بنفس جلسة المستخدم — مرّره كـ `key` ليستخدمه المزخرف كـ UUID للتدفق. يُطلق `ValueError` إذا كانت السمة المحددة غير موجودة أو قيمتها falsy عند الحفظ.
```python
flow.kickoff(restore_from_state_id="<previous-run-state-id>")
@persist(key="conversation_id")
class ProductionFlow(Flow[AppState]):
# يجب أن يحتوي AppState على conversation_id؛ استئناف الجلسة يُعيد تحميل الحالة السابقة
...
```
يحصل التشغيل الجديد على `state.id` جديد (مولّد تلقائيًا، أو `inputs["id"]` إذا تم تثبيته) لذا لا تمتد كتابات `@persist` الخاصة به إلى تاريخ المصدر. الجمع مع `from_checkpoint` يطلق `ValueError`؛ اختر مصدر ترطيب واحدًا.
## الخلاصة
- **ابدأ بتدفق.**

View File

@@ -133,7 +133,7 @@ crew.kickoff()
| **DirectorySearchTool** | أداة RAG للبحث في المجلدات، مفيدة للتنقل في أنظمة الملفات. |
| **DOCXSearchTool** | أداة RAG للبحث في مستندات DOCX، مثالية لمعالجة ملفات Word. |
| **DirectoryReadTool** | تسهّل قراءة ومعالجة هياكل المجلدات ومحتوياتها. |
| **ExaSearchTool** | أداة مصممة لإجراء عمليات بحث شاملة عبر مصادر بيانات متنوعة. |
| **EXASearchTool** | أداة مصممة لإجراء عمليات بحث شاملة عبر مصادر بيانات متنوعة. |
| **FileReadTool** | تُمكّن قراءة واستخراج البيانات من الملفات، مع دعم تنسيقات ملفات متنوعة. |
| **FirecrawlSearchTool** | أداة للبحث في صفحات الويب باستخدام Firecrawl وإرجاع النتائج. |
| **FirecrawlCrawlWebsiteTool** | أداة لزحف صفحات الويب باستخدام Firecrawl. |

View File

@@ -116,47 +116,32 @@ class PersistentCounterFlow(Flow[CounterState]):
return self.state.value
```
#### تفرع الحالة المستمرة
### استخدام مفتاح استمرارية مخصص
يدعم `@persist` نمطين متميزين للترطيب في `kickoff` / `kickoff_async`. استخدم **استئناف** (`inputs["id"]`) لمواصلة نفس النسب؛ استخدم **تفرع** (`restore_from_state_id`) لبدء نسبٍ جديد من لقطة:
| | `state.id` بعد kickoff | كتابات `@persist` تذهب إلى |
|---|---|---|
| `inputs["id"]` (استئناف) | المعرّف المقدم | المعرّف المقدم (يمد التاريخ) |
| `restore_from_state_id` (تفرع) | معرّف جديد، أو `inputs["id"]` إذا ثُبّت | المعرّف الجديد (المصدر محفوظ) |
افتراضيًا، يستخدم `@persist()` الحقل `state.id` المُولّد تلقائيًا كمفتاح للحالة المحفوظة. عندما يكون لمجالك معرّف طبيعي بالفعل — مثل `conversation_id` يربط عدة تشغيلات للتدفق بنفس جلسة المستخدم — مرّره كوسيط `key` ليستخدمه `@persist` كـ UUID للتدفق بدلًا من `id`:
```python
from crewai.flow.flow import Flow, start
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
class ConversationState(BaseModel):
conversation_id: str
history: list[str] = []
@persist
class CounterFlow(Flow[CounterState]):
@persist(key="conversation_id")
class ConversationFlow(Flow[ConversationState]):
@start()
def step(self):
self.state.counter += 1
def greet(self):
self.state.history.append("hello")
return self.state.history
# التشغيل 1: حالة جديدة، العداد 0 -> 1
flow_1 = CounterFlow()
flow_1.kickoff()
# التفرع: الترطيب من أحدث لقطة لـ flow_1، لكن الكتابة تحت state.id جديد
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# يبدأ flow_2 بـ counter=1 (مرطّب)، ثم تزيده step() إلى 2.
# تاريخ flow_uuid لـ flow_1 لم يتغيّر.
# تشغيل ثانٍ بنفس conversation_id يُعيد تحميل الحالة السابقة
flow = ConversationFlow(conversation_id="user-42")
flow.kickoff()
```
ملاحظات السلوك:
- `restore_from_state_id` غير موجود في الاستمرارية → يعود kickoff بصمت إلى السلوك الافتراضي (يعكس سلوك `inputs["id"]` عند عدم العثور عليه). لا يُطلق أي استثناء.
- الجمع بين `restore_from_state_id` و `from_checkpoint` يطلق `ValueError` — يستهدفان نظامي حالة مختلفين (`@persist` مقابل Checkpointing) ولا يمكن الجمع بينهما.
- `restore_from_state_id=None` (افتراضي) متطابق بايت ببايت مع kickoff بدون المعامل.
- تثبيت `inputs["id"]` أثناء التفرع يعني أن التشغيل الجديد يشارك مفتاح الاستمرارية مع تدفق آخر — عادةً ما تريد فقط `restore_from_state_id`.
بالنسبة للحالات من نوع dict يقرأ `@persist` القيمة من `state[key]`، ولحالات Pydantic / الكائنات يقرأها من `getattr(state, key)`. إذا كانت السمة المحددة غير موجودة أو قيمتها falsy عند حفظ الحالة، يُطلق `@persist` خطأ `ValueError` مثل `Flow state is missing required persistence key 'conversation_id'`، فيظهر الفشل فورًا بدلًا من فقد بيانات الاستمرارية بصمت. استدعاء `@persist()` بدون `key` يحافظ على السلوك الأصلي ويستخدم `state.id`.
## أنماط حالة متقدمة

View File

@@ -1,11 +1,11 @@
---
title: "أداة بحث Exa"
description: "ابحث في الويب باستخدام Exa Search API للعثور على النتائج الأكثر صلة لأي استعلام، مع خيارات لمحتوى الصفحة الكامل والمقتطفات."
description: "ابحث في الويب باستخدام Exa Search API للعثور على النتائج الأكثر صلة لأي استعلام، مع خيارات لمحتوى الصفحة الكامل والمقتطفات والملخصات."
icon: "magnifying-glass"
mode: "wide"
---
تتيح أداة `ExaSearchTool` لوكلاء CrewAI البحث في الويب باستخدام [Exa](https://exa.ai/) search API. تُرجع النتائج الأكثر صلة لأي استعلام، مع خيارات لمحتوى الصفحة الكامل والمقتطفات الموفرة للرموز.
تتيح أداة `EXASearchTool` لوكلاء CrewAI البحث في الويب باستخدام [Exa](https://exa.ai/) search API. تُرجع النتائج الأكثر صلة لأي استعلام، مع خيارات لمحتوى الصفحة الكامل والملخصات المولّدة بالذكاء الاصطناعي.
## التثبيت
@@ -27,15 +27,15 @@ export EXA_API_KEY='your_exa_api_key'
## مثال على الاستخدام
إليك كيفية استخدام `ExaSearchTool` مع وكيل CrewAI:
إليك كيفية استخدام `EXASearchTool` مع وكيل CrewAI:
```python
import os
from crewai import Agent, Task, Crew
from crewai_tools import ExaSearchTool
from crewai_tools import EXASearchTool
# Initialize the tool
exa_tool = ExaSearchTool()
exa_tool = EXASearchTool()
# Create an agent that uses the tool
researcher = Agent(
@@ -66,11 +66,11 @@ print(result)
## خيارات التكوين
تقبل أداة `ExaSearchTool` المعاملات التالية أثناء التهيئة:
تقبل أداة `EXASearchTool` المعاملات التالية أثناء التهيئة:
- `type` (str، اختياري): نوع البحث المستخدم. الافتراضي هو `"auto"`. الخيارات: `"auto"`، `"instant"`، `"fast"`، `"deep"`.
- `highlights` (bool أو dict، اختياري): إرجاع مقتطفات موفرة للرموز أكثر صلة بالاستعلام بدلاً من الصفحة الكاملة. الافتراضي هو `True`. مرر قاموسًا مثل `{"max_characters": 4000}` للتكوين، أو `False` للتعطيل.
- `content` (bool، اختياري): ما إذا كان يجب تضمين محتوى الصفحة الكامل في النتائج. الافتراضي هو `False`.
- `summary` (bool، اختياري): ما إذا كان يجب تضمين ملخصات مولّدة بالذكاء الاصطناعي لكل نتيجة. يتطلب `content=True`. الافتراضي هو `False`.
- `api_key` (str، اختياري): مفتاح Exa API الخاص بك. يعود إلى متغير البيئة `EXA_API_KEY` إذا لم يتم تقديمه.
- `base_url` (str، اختياري): عنوان URL مخصص لخادم API. يعود إلى متغير البيئة `EXA_BASE_URL` إذا لم يتم تقديمه.
@@ -86,52 +86,25 @@ print(result)
يمكنك تكوين الأداة بمعاملات مخصصة للحصول على نتائج أغنى:
```python
# Use 'deep' for thorough, multi-step searches
exa_tool = ExaSearchTool(
highlights=True,
# Get full page content with AI summaries
exa_tool = EXASearchTool(
content=True,
summary=True,
type="deep"
)
# Use it in an agent
agent = Agent(
role="Deep Researcher",
goal="Conduct thorough research",
goal="Conduct thorough research with full content and summaries",
tools=[exa_tool]
)
```
## استخدام Exa عبر MCP
يمكنك أيضًا ربط وكيلك بخادم MCP المستضاف من Exa. مرّر مفتاح API الخاص بك عبر ترويسة `x-api-key`:
```python
from crewai import Agent
from crewai.mcp import MCPServerHTTP
agent = Agent(
role="Research Analyst",
goal="Find and analyze information on the web",
backstory="Expert researcher with access to Exa's tools",
mcps=[
MCPServerHTTP(
url="https://mcp.exa.ai/mcp",
headers={"x-api-key": "YOUR_EXA_API_KEY"},
),
],
)
```
احصل على مفتاح API من [لوحة تحكم Exa](https://dashboard.exa.ai/api-keys). لمزيد من المعلومات حول MCP في CrewAI، راجع [نظرة عامة على MCP](/ar/mcp/overview).
## الميزات
- **مقتطفات موفرة للرموز**: الحصول على المقتطفات الأكثر صلة من كل نتيجة، باستخدام رموز أقل بكثير من النص الكامل
- **البحث الدلالي**: العثور على نتائج بناءً على المعنى، وليس الكلمات المفتاحية فقط
- **استرجاع المحتوى الكامل**: الحصول على النص الكامل لصفحات الويب مع نتائج البحث
- **ملخصات الذكاء الاصطناعي**: الحصول على ملخصات موجزة مولّدة بالذكاء الاصطناعي لكل نتيجة
- **تصفية التاريخ**: تقييد النتائج لفترات زمنية محددة باستخدام فلاتر تاريخ النشر
- **تصفية النطاقات**: تقييد عمليات البحث على نطاقات محددة
## موارد
- [توثيق Exa](https://exa.ai/docs)
- [لوحة تحكم Exa — إدارة مفاتيح API والاستخدام](https://dashboard.exa.ai)
- **تصفية النطاقات**: تقييد عمليات البحث على نطاقات محددة

File diff suppressed because it is too large Load Diff

View File

@@ -26,7 +26,7 @@ Welcome to the CrewAI AMP API reference. This API allows you to programmatically
</Step>
<Step title="Monitor Progress">
Use `GET /status/{kickoff_id}` to check execution status and retrieve results.
Use `GET /{kickoff_id}/status` to check execution status and retrieve results.
</Step>
</Steps>
@@ -65,7 +65,7 @@ Replace `your-crew-name` with your actual crew's URL from the dashboard.
1. **Discovery**: Call `GET /inputs` to understand what your crew needs
2. **Execution**: Submit inputs via `POST /kickoff` to start processing
3. **Monitoring**: Poll `GET /status/{kickoff_id}` until completion
3. **Monitoring**: Poll `GET /{kickoff_id}/status` until completion
4. **Results**: Extract the final output from the completed response
## Error Handling

View File

@@ -1,6 +1,6 @@
---
title: "GET /status/{kickoff_id}"
title: "GET /{kickoff_id}/status"
description: "Get execution status"
openapi: "/enterprise-api.en.yaml GET /status/{kickoff_id}"
openapi: "/enterprise-api.en.yaml GET /{kickoff_id}/status"
mode: "wide"
---

View File

@@ -4,99 +4,6 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="May 04, 2026">
## v1.14.5a2
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a2)
## What's Changed
### Bug Fixes
- Fix task output restoration in finally block
- Include `thoughts_token_count` in completion tokens
- Preserve task outputs across async batch flush
- Forward kwargs to loader calls in `CrewAIRagAdapter`
- Prevent `result_as_answer` from returning hook-block message as final answer
- Prevent `result_as_answer` from returning error as final answer
- Use `acall` for output conversion in async paths
- Prevent shared LLM stop words mutation across agents
- Handle `BaseModel` input in `convert_to_model`
### Documentation
- Document additional environment variables
- Update changelog and version for v1.14.5a1
## Contributors
@NIK-TIGER-BILL, @greysonlalonde, @lorenzejay, @minasami-pr, @theCyberTech, @wishhyt
</Update>
<Update label="May 01, 2026">
## v1.14.5a1
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a1)
## What's Changed
### Features
- Add `restore_from_state_id` kickoff parameter
- Add highlights to ExaSearchTool and rename from EXASearchTool
### Bug Fixes
- Fix missing crewai pin sites in release flow
- Ensure skills loading events for traces
### Documentation
- Update changelog and version for v1.14.4
## Contributors
@akaKuruma, @github-actions[bot], @greysonlalonde, @lorenzejay, @theishangoswami
</Update>
<Update label="May 01, 2026">
## v1.14.4
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.4)
## What's Changed
### Features
- Add support for custom persistence key in @persist
- Add Responses API support for Azure OpenAI provider
- Forward credential_scopes to Azure AI Inference client
- Add Vertex AI workload identity setup guide
- Add Tavily Research and get Research
- Add You.com MCP tools for search, research, and content extraction
### Bug Fixes
- Fix fall through when JSON regex match isn't valid JSON
- Fix to preserve tool_calls when response also contains text
- Fix to forward base_url and api_key to instructor.from_provider
- Fix to warn and return empty when native MCP server returns no tools
- Fix to use validated messages variable in non-streaming handlers
- Fix to guard crew chat description helpers against LLM failures
- Fix to reset messages and iterations between invocations
- Fix to forward trained-agents file through replay and test
- Fix to honor custom trained-agents file at inference
- Fix to bind task-only agents to crew for multimodal input_files
- Fix to serialize guardrail callables as null for JSON checkpointing
- Fix renaming of force_final_answer to avoid self-referential router
- Fix bump of litellm for SSTI fix; ignore unfixable pip CVE
### Documentation
- Update changelog and version for v1.14.4a1
- Add E2B Sandbox Tools page
- Add Daytona sandbox tools documentation
## Contributors
@EdwardIrby, @dependabot[bot], @factory-droid-oss, @factory-droid[bot], @greysonlalonde, @kunalk16, @lorenzejay, @lucasgomide, @manisrinivasan2k1, @mattatcha, @vinibrsl
</Update>
<Update label="Apr 29, 2026">
## v1.14.4a1

View File

@@ -380,41 +380,32 @@ class AnotherFlow(Flow[dict]):
print("Method-level persisted runs:", self.state["runs"])
```
### Forking Persisted State
### Custom Persistence Key
`@persist` supports two distinct hydration modes on `kickoff` / `kickoff_async`:
- `kickoff(inputs={"id": <uuid>})` — **resume**: load the latest snapshot for the supplied UUID and continue writing under the same `flow_uuid`. The history extends.
- `kickoff(restore_from_state_id=<uuid>)` — **fork**: load the latest snapshot for the supplied UUID, hydrate the new run's state from it, and assign a fresh `state.id` (auto-generated, or `inputs["id"]` if pinned). The new run's `@persist` writes land under the new `state.id`; the source flow's history is preserved.
By default, `@persist` uses the auto-generated `state.id` field as the persistence key. If your flow models its own identifier — for example a `conversation_id` shared across sessions — you can pass a `key` argument and `@persist` will use that attribute as the flow UUID instead:
```python
from crewai.flow.flow import Flow, start
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
class ConversationState(BaseModel):
conversation_id: str
turn: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@persist(key="conversation_id") # Use a custom field as the persistence key
class ConversationFlow(Flow[ConversationState]):
@start()
def step(self):
self.state.counter += 1
print(f"[id={self.state.id}] counter={self.state.counter}")
def begin(self):
self.state.turn += 1
print(f"Conversation {self.state.conversation_id} turn {self.state.turn}")
# Run 1: fresh state, counter 0 -> 1, persisted under flow_1.state.id
flow_1 = CounterFlow()
flow_1.kickoff()
# Fork: hydrate from flow_1's latest snapshot, but use a NEW state.id
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2.state.counter starts at 1 (hydrated), then step() bumps it to 2.
# flow_2.state.id != flow_1.state.id; flow_1's history is unchanged.
# Resuming the same conversation reloads its prior state by conversation_id
flow = ConversationFlow(conversation_id="user-42")
flow.kickoff()
```
If the supplied `restore_from_state_id` does not match any persisted state, the kickoff falls back silently — same as the existing `inputs["id"]` resume not-found behavior. Combining `restore_from_state_id` with `from_checkpoint` raises a `ValueError`; pick one hydration source. Pinning `inputs["id"]` while forking shares a persistence key with another flow — usually you want only `restore_from_state_id`.
The decorator reads the value at `state[key]` for dict states, or `getattr(state, key)` for Pydantic / object states. If the named attribute is missing or falsy at save time, `@persist` raises a `ValueError` such as `Flow state is missing required persistence key 'conversation_id'`. When `key` is omitted, the existing behavior is preserved and `state.id` is used.
### How It Works

View File

@@ -146,14 +146,15 @@ class ProductionFlow(Flow[AppState]):
# ...
```
By default, `@persist` resumes a flow when `kickoff(inputs={"id": <uuid>})` is supplied, extending the same `flow_uuid` history. To **fork** a persisted flow into a new lineage — hydrate state from a previous run but write under a fresh `state.id` — pass `restore_from_state_id`:
By default `@persist` keys saved state by the auto-generated `state.id`. If your application already has a natural identifier — for example a `conversation_id` that ties multiple runs to the same user session — pass it as `key` and the decorator will use that attribute as the flow UUID. A `ValueError` is raised if the named attribute is missing or falsy at save time.
```python
flow.kickoff(restore_from_state_id="<previous-run-state-id>")
@persist(key="conversation_id")
class ProductionFlow(Flow[AppState]):
# AppState must expose conversation_id; resuming a session reloads its prior state
...
```
The new run gets a fresh `state.id` (auto-generated, or `inputs["id"]` if pinned) so its `@persist` writes don't extend the source's history. Combining with `from_checkpoint` raises a `ValueError`; pick one hydration source.
## Summary
- **Start with a Flow.**

View File

@@ -133,7 +133,7 @@ Here is a list of the available tools and their descriptions:
| **DirectorySearchTool** | A RAG tool for searching within directories, useful for navigating through file systems. |
| **DOCXSearchTool** | A RAG tool aimed at searching within DOCX documents, ideal for processing Word files. |
| **DirectoryReadTool** | Facilitates reading and processing of directory structures and their contents. |
| **ExaSearchTool** | Search the web with Exa, the fastest and most accurate web search API. Supports token-efficient highlights and full page content. |
| **EXASearchTool** | A tool designed for performing exhaustive searches across various data sources. |
| **FileReadTool** | Enables reading and extracting data from files, supporting various file formats. |
| **FirecrawlSearchTool** | A tool to search webpages using Firecrawl and return the results. |
| **FirecrawlCrawlWebsiteTool** | A tool for crawling webpages using Firecrawl. |

View File

@@ -346,47 +346,32 @@ class SelectivePersistFlow(Flow):
return f"Complete with count {self.state['count']}"
```
#### Forking Persisted State
#### Using a Custom Persistence Key
`@persist` supports two distinct hydration modes on `kickoff` / `kickoff_async`. Use **resume** (`inputs["id"]`) to continue the same lineage; use **fork** (`restore_from_state_id`) to start a new lineage seeded from a snapshot:
| | `state.id` after kickoff | `@persist` writes land under |
|---|---|---|
| `inputs["id"]` (resume) | supplied id | supplied id (extends history) |
| `restore_from_state_id` (fork) | fresh id, or `inputs["id"]` if pinned | new id (source preserved) |
By default, `@persist()` keys persisted state by the flow's auto-generated `state.id`. When your domain already has a natural identifier — for example a `conversation_id` that ties multiple flow runs to the same user session — pass it as the `key` argument and `@persist` will use that attribute as the flow UUID instead of `id`:
```python
from crewai.flow.flow import Flow, start
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
class ConversationState(BaseModel):
conversation_id: str
history: list[str] = []
@persist
class CounterFlow(Flow[CounterState]):
@persist(key="conversation_id")
class ConversationFlow(Flow[ConversationState]):
@start()
def step(self):
self.state.counter += 1
def greet(self):
self.state.history.append("hello")
return self.state.history
# Run 1: fresh state, counter 0 -> 1
flow_1 = CounterFlow()
flow_1.kickoff()
# Fork: hydrate from flow_1's latest snapshot, but write under a NEW state.id
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2 starts with counter=1 (hydrated), then step() bumps it to 2.
# flow_1's flow_uuid history is unchanged.
# A second run with the same conversation_id reloads the prior state
flow = ConversationFlow(conversation_id="user-42")
flow.kickoff()
```
Behavior notes:
- `restore_from_state_id` not found in persistence → the kickoff falls back silently to default behavior (mirrors the existing `inputs["id"]` resume not-found behavior). No exception is raised.
- Combining `restore_from_state_id` with `from_checkpoint` raises a `ValueError` — they target different state systems (`@persist` vs. Checkpointing) and cannot be combined.
- `restore_from_state_id=None` (default) is byte-identical to a kickoff without the parameter.
- Pinning `inputs["id"]` while forking means the new run shares a persistence key with another flow — usually you want only `restore_from_state_id`.
For dict-based states `@persist` reads `state[key]`, and for Pydantic / object states it reads `getattr(state, key)`. If the named attribute is missing or falsy when state is being saved, `@persist` raises a `ValueError` like `Flow state is missing required persistence key 'conversation_id'`, so the failure surfaces immediately rather than silently dropping persisted data. Calling `@persist()` without `key` keeps the original behavior of using `state.id`.
## Advanced State Patterns

View File

@@ -1,11 +1,11 @@
---
title: "Exa Search Tool"
description: "Search the web with Exa, the fastest and most accurate web search API. Get token-efficient highlights and full page content."
description: "Search the web using the Exa Search API to find the most relevant results for any query, with options for full page content, highlights, and summaries."
icon: "magnifying-glass"
mode: "wide"
---
The `ExaSearchTool` lets CrewAI agents search the web using [Exa](https://exa.ai/), the fastest and most accurate web search API. It returns the most relevant results for any query, with options for token-efficient highlights and full page content.
The `EXASearchTool` lets CrewAI agents search the web using the [Exa](https://exa.ai/) search API. It returns the most relevant results for any query, with options for full page content and AI-generated summaries.
## Installation
@@ -27,15 +27,15 @@ Get an API key from the [Exa dashboard](https://dashboard.exa.ai/api-keys).
## Example Usage
Here's how to use the `ExaSearchTool` within a CrewAI agent:
Here's how to use the `EXASearchTool` within a CrewAI agent:
```python
import os
from crewai import Agent, Task, Crew
from crewai_tools import ExaSearchTool
from crewai_tools import EXASearchTool
# Initialize the tool
exa_tool = ExaSearchTool()
exa_tool = EXASearchTool()
# Create an agent that uses the tool
researcher = Agent(
@@ -66,11 +66,11 @@ print(result)
## Configuration Options
The `ExaSearchTool` accepts the following parameters during initialization:
The `EXASearchTool` accepts the following parameters during initialization:
- `type` (str, optional): The search type to use. Defaults to `"auto"`. Options: `"auto"`, `"instant"`, `"fast"`, `"deep"`.
- `highlights` (bool or dict, optional): Return token-efficient excerpts most relevant to the query instead of the full page. Defaults to `True`. Pass a dict like `{"max_characters": 4000}` to configure, or `False` to disable.
- `content` (bool, optional): Whether to include full page content in results. Defaults to `False`.
- `summary` (bool, optional): Whether to include AI-generated summaries of each result. Requires `content=True`. Defaults to `False`.
- `api_key` (str, optional): Your Exa API key. Falls back to the `EXA_API_KEY` environment variable if not provided.
- `base_url` (str, optional): Custom API server URL. Falls back to the `EXA_BASE_URL` environment variable if not provided.
@@ -83,70 +83,28 @@ When calling the tool (or when an agent invokes it), the following search parame
## Advanced Usage
For most agent workflows we recommend `highlights` — it returns the most relevant excerpts from each result and uses far fewer tokens than full page content:
You can configure the tool with custom parameters for richer results:
```python
# Get token-efficient excerpts most relevant to the query
exa_tool = ExaSearchTool(
highlights=True,
type="auto",
# Get full page content with AI summaries
exa_tool = EXASearchTool(
content=True,
summary=True,
type="deep"
)
# Use it in an agent
agent = Agent(
role="Researcher",
goal="Answer questions with current web data",
role="Deep Researcher",
goal="Conduct thorough research with full content and summaries",
tools=[exa_tool]
)
```
For thorough, multi-step searches, use `type="deep"`:
```python
exa_tool = ExaSearchTool(
highlights=True,
type="deep",
)
```
For more on choosing between highlights and full content, see the [Exa search best practices](https://exa.ai/docs/reference/search-best-practices).
## Using Exa via MCP
You can also connect your agent to Exa's hosted MCP server. Pass your API key with the `x-api-key` header:
```python
from crewai import Agent
from crewai.mcp import MCPServerHTTP
agent = Agent(
role="Research Analyst",
goal="Find and analyze information on the web",
backstory="Expert researcher with access to Exa's tools",
mcps=[
MCPServerHTTP(
url="https://mcp.exa.ai/mcp",
headers={"x-api-key": "YOUR_EXA_API_KEY"},
),
],
)
```
Get your API key from the [Exa dashboard](https://dashboard.exa.ai/api-keys). For more on MCP in CrewAI, see the [MCP overview](/en/mcp/overview).
## Features
- **Token-Efficient Highlights**: Get the most relevant excerpts from each result, ~10x fewer tokens than full text
- **Semantic Search**: Find results based on meaning, not just keywords
- **Full Content Retrieval**: Get the full text of web pages alongside search results
- **AI Summaries**: Get concise, AI-generated summaries of each result
- **Date Filtering**: Limit results to specific time periods with published date filters
- **Domain Filtering**: Restrict searches to specific domains
<Note>
`EXASearchTool` is a deprecated alias for `ExaSearchTool`. Existing imports continue to work but will emit a deprecation warning; please migrate to `ExaSearchTool`.
</Note>
## Resources
- [Exa documentation](https://exa.ai/docs)
- [Exa dashboard — manage API keys and usage](https://dashboard.exa.ai)

View File

@@ -35,7 +35,7 @@ info:
1. **Discover inputs** using `GET /inputs`
2. **Start execution** using `POST /kickoff`
3. **Monitor progress** using `GET /status/{kickoff_id}`
3. **Monitor progress** using `GET /{kickoff_id}/status`
version: 1.0.0
contact:
name: CrewAI Support
@@ -207,7 +207,7 @@ paths:
"500":
$ref: "#/components/responses/ServerError"
/status/{kickoff_id}:
/{kickoff_id}/status:
get:
summary: Get Execution Status
description: |

View File

@@ -35,7 +35,7 @@ info:
1. **Discover inputs** using `GET /inputs`
2. **Start execution** using `POST /kickoff`
3. **Monitor progress** using `GET /status/{kickoff_id}`
3. **Monitor progress** using `GET /{kickoff_id}/status`
version: 1.0.0
contact:
name: CrewAI Support
@@ -207,7 +207,7 @@ paths:
"500":
$ref: "#/components/responses/ServerError"
/status/{kickoff_id}:
/{kickoff_id}/status:
get:
summary: Get Execution Status
description: |

View File

@@ -84,7 +84,7 @@ paths:
'500':
$ref: '#/components/responses/ServerError'
/status/{kickoff_id}:
/{kickoff_id}/status:
get:
summary: 실행 상태 조회
description: |

View File

@@ -35,7 +35,7 @@ info:
1. **Descubra os inputs** usando `GET /inputs`
2. **Inicie a execução** usando `POST /kickoff`
3. **Monitore o progresso** usando `GET /status/{kickoff_id}`
3. **Monitore o progresso** usando `GET /{kickoff_id}/status`
version: 1.0.0
contact:
name: CrewAI Suporte
@@ -120,7 +120,7 @@ paths:
"500":
$ref: "#/components/responses/ServerError"
/status/{kickoff_id}:
/{kickoff_id}/status:
get:
summary: Obter Status da Execução
description: |

View File

@@ -26,7 +26,7 @@ CrewAI 엔터프라이즈 API 참고 자료에 오신 것을 환영합니다.
</Step>
<Step title="진행 상황 모니터링">
`GET /status/{kickoff_id}`를 사용하여 실행 상태를 확인하고 결과를 조회하세요.
`GET /{kickoff_id}/status`를 사용하여 실행 상태를 확인하고 결과를 조회하세요.
</Step>
</Steps>
@@ -65,7 +65,7 @@ https://your-crew-name.crewai.com
1. **탐색**: `GET /inputs`를 호출하여 crew가 필요한 것을 파악합니다.
2. **실행**: `POST /kickoff`를 통해 입력값을 제출하여 처리를 시작합니다.
3. **모니터링**: 완료될 때까지 `GET /status/{kickoff_id}`를 주기적으로 조회합니다.
3. **모니터링**: 완료될 때까지 `GET /{kickoff_id}/status`를 주기적으로 조회합니다.
4. **결과**: 완료된 응답에서 최종 출력을 추출합니다.
## 오류 처리

View File

@@ -1,6 +1,6 @@
---
title: "GET /status/{kickoff_id}"
title: "GET /{kickoff_id}/status"
description: "실행 상태 조회"
openapi: "/enterprise-api.ko.yaml GET /status/{kickoff_id}"
openapi: "/enterprise-api.ko.yaml GET /{kickoff_id}/status"
mode: "wide"
---

View File

@@ -4,99 +4,6 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 5월 4일">
## v1.14.5a2
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a2)
## 변경 사항
### 버그 수정
- finally 블록에서 작업 출력 복원 수정
- 완료 토큰에 `thoughts_token_count` 포함
- 비동기 배치 플러시 간 작업 출력 보존
- `CrewAIRagAdapter`의 로더 호출에 kwargs 전달
- `result_as_answer`가 후크 차단 메시지를 최종 답변으로 반환하지 않도록 방지
- `result_as_answer`가 오류를 최종 답변으로 반환하지 않도록 방지
- 비동기 경로에서 출력 변환을 위해 `acall` 사용
- 에이전트 간 공유 LLM 중지 단어 변형 방지
- `convert_to_model`에서 `BaseModel` 입력 처리
### 문서화
- 추가 환경 변수 문서화
- v1.14.5a1에 대한 변경 로그 및 버전 업데이트
## 기여자
@NIK-TIGER-BILL, @greysonlalonde, @lorenzejay, @minasami-pr, @theCyberTech, @wishhyt
</Update>
<Update label="2026년 5월 1일">
## v1.14.5a1
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a1)
## 변경 사항
### 기능
- `restore_from_state_id` 시작 매개변수 추가
- ExaSearchTool에 하이라이트 추가 및 EXASearchTool에서 이름 변경
### 버그 수정
- 릴리스 흐름에서 crewai 핀 사이트 누락 수정
- 트레이스를 위한 기술 로딩 이벤트 보장
### 문서
- v1.14.4에 대한 변경 로그 및 버전 업데이트
## 기여자
@akaKuruma, @github-actions[bot], @greysonlalonde, @lorenzejay, @theishangoswami
</Update>
<Update label="2026년 5월 1일">
## v1.14.4
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.4)
## 변경 사항
### 기능
- @persist에서 사용자 정의 지속성 키 지원 추가
- Azure OpenAI 공급자를 위한 응답 API 지원 추가
- Azure AI 추론 클라이언트에 credential_scopes 전달
- Vertex AI 작업 부하 신원 설정 가이드 추가
- Tavily Research 및 Research 가져오기 추가
- 검색, 연구 및 콘텐츠 추출을 위한 You.com MCP 도구 추가
### 버그 수정
- JSON 정규 표현식이 유효한 JSON이 아닐 때의 fall through 수정
- 응답에 텍스트가 포함될 때 tool_calls를 보존하도록 수정
- instructor.from_provider에 base_url 및 api_key를 전달하도록 수정
- 기본 MCP 서버가 도구를 반환하지 않을 때 경고하고 빈 값을 반환하도록 수정
- 비스트리밍 핸들러에서 검증된 메시지 변수를 사용하도록 수정
- LLM 실패에 대한 크루 채팅 설명 도우미를 보호하도록 수정
- 호출 간 메시지 및 반복을 재설정하도록 수정
- replay 및 test를 통해 훈련된 에이전트 파일을 전달하도록 수정
- 추론 시 사용자 정의 훈련된 에이전트 파일을 존중하도록 수정
- 다중 모드 input_files에 대해 작업 전용 에이전트를 크루에 바인딩하도록 수정
- JSON 체크포인팅을 위해 가드레일 호출 가능 항목을 null로 직렬화하도록 수정
- 자기 참조 라우터를 피하기 위해 force_final_answer의 이름 변경 수정
- SSTI 수정을 위한 litellm 버전 증가; 수정할 수 없는 pip CVE 무시
### 문서
- v1.14.4a1에 대한 변경 로그 및 버전 업데이트
- E2B 샌드박스 도구 페이지 추가
- Daytona 샌드박스 도구 문서 추가
## 기여자
@EdwardIrby, @dependabot[bot], @factory-droid-oss, @factory-droid[bot], @greysonlalonde, @kunalk16, @lorenzejay, @lucasgomide, @manisrinivasan2k1, @mattatcha, @vinibrsl
</Update>
<Update label="2026년 4월 29일">
## v1.14.4a1

View File

@@ -373,41 +373,32 @@ class AnotherFlow(Flow[dict]):
print("Method-level persisted runs:", self.state["runs"])
```
### 영속 상태 포크하기
### 사용자 지정 영속성 키
`@persist`는 `kickoff` / `kickoff_async`에서 두 가지 별개의 하이드레이션 모드를 지원합니다:
- `kickoff(inputs={"id": <uuid>})` — **재개(resume)**: 제공된 UUID에 대한 최신 스냅샷을 로드하고 동일한 `flow_uuid` 아래에서 계속 기록합니다. 기록이 확장됩니다.
- `kickoff(restore_from_state_id=<uuid>)` — **포크(fork)**: 제공된 UUID에 대한 최신 스냅샷을 로드하고 새 실행의 상태를 하이드레이트한 후, 새로운 `state.id`(자동 생성, 또는 `inputs["id"]`가 고정된 경우 그 값)를 할당합니다. 새 실행의 `@persist` 기록은 새로운 `state.id` 아래에 저장되며, 원본 플로우의 기록은 보존됩니다.
기본적으로 `@persist`는 자동 생성된 `state.id` 필드를 영속성 키로 사용합니다. 여러 세션에 걸쳐 공유되는 `conversation_id`처럼 플로우에 자체 식별자가 있는 경우, `key` 인자를 전달하면 `@persist`가 해당 속성을 플로우 UUID로 사용합니다:
```python
from crewai.flow.flow import Flow, start
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
class ConversationState(BaseModel):
conversation_id: str
turn: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@persist(key="conversation_id") # 사용자 지정 필드를 영속성 키로 사용
class ConversationFlow(Flow[ConversationState]):
@start()
def step(self):
self.state.counter += 1
print(f"[id={self.state.id}] counter={self.state.counter}")
def begin(self):
self.state.turn += 1
print(f"Conversation {self.state.conversation_id} turn {self.state.turn}")
# 실행 1: 새 상태, counter 0 -> 1, flow_1.state.id 아래에 저장됨
flow_1 = CounterFlow()
flow_1.kickoff()
# 포크: flow_1의 최신 스냅샷에서 하이드레이트하지만, 새 state.id를 사용
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2.state.counter는 1(하이드레이트)로 시작하고, step()이 2로 증가시킵니다.
# flow_2.state.id != flow_1.state.id; flow_1의 기록은 변경되지 않습니다.
# 동일한 conversation_id로 다시 실행하면 이전 상태가 다시 로드됩니다
flow = ConversationFlow(conversation_id="user-42")
flow.kickoff()
```
제공된 `restore_from_state_id`가 어떤 영속 상태와도 일치하지 않으면, kickoff는 조용히 기본 동작으로 폴백됩니다 — 기존 `inputs["id"]`의 미발견 동작과 동일합니다. `restore_from_state_id`를 `from_checkpoint`와 결합하면 `ValueError` 발생합니다; 하나의 하이드레이션 소스를 선택하세요. 포크 중 `inputs["id"]`를 고정하면 다른 플로우와 영속 키를 공유하게 됩니다 — 일반적으로 `restore_from_state_id` 사용하는 것이 좋습니다.
이 데코레이터는 dict 상태의 경우 `state[key]`에서, Pydantic / 객체 상태의 경우 `getattr(state, key)`에서 값을 읽습니다. 저장 시점에 지정된 속성이 없거나 falsy 값이면, `@persist`는 `Flow state is missing required persistence key 'conversation_id'`와 같은 `ValueError` 발생시킵니다. `key`를 생략하면 기존 동작이 유지되어 `state.id` 사용니다.
### 작동 방식

View File

@@ -146,14 +146,15 @@ class ProductionFlow(Flow[AppState]):
# ...
```
기본적으로, `@persist`는 `kickoff(inputs={"id": <uuid>})`가 제공될 때 플로우를 재개하여 동일한 `flow_uuid` 기록을 확장합니다. 영속된 플로우를 새 계보로 **포크**하려면 — 이전 실행에서 상태를 하이드레이트하지만 새로운 `state.id` 아래에 기록 — `restore_from_state_id`를 전달하세요:
기본적으로 `@persist`는 자동 생성된 `state.id`를 저장된 상태의 키로 사용합니다. 애플리케이션에 이미 자연스러운 식별자가 있는 경우 — 예를 들어 같은 사용자 세션에 속한 여러 실행을 묶는 `conversation_id` — `key`로 전달하면 데코레이터가 해당 속성을 플로우 UUID로 사용합니다. 저장 시점에 지정된 속성이 없거나 falsy 값이면 `ValueError`가 발생합니다.
```python
flow.kickoff(restore_from_state_id="<previous-run-state-id>")
@persist(key="conversation_id")
class ProductionFlow(Flow[AppState]):
# AppState는 conversation_id를 노출해야 합니다; 세션을 재개하면 이전 상태가 다시 로드됩니다
...
```
새 실행은 새로운 `state.id`(자동 생성, 또는 `inputs["id"]`가 고정된 경우 그 값)를 받아 `@persist` 기록이 원본의 기록을 확장하지 않도록 합니다. `from_checkpoint`와 결합하면 `ValueError`가 발생합니다; 하나의 하이드레이션 소스를 선택하세요.
## 요약
- **Flow로 시작하세요.**

View File

@@ -132,7 +132,7 @@ crew.kickoff()
| **DirectorySearchTool** | 디렉터리 내에서 검색하는 RAG 도구로, 파일 시스템을 탐색할 때 유용합니다. |
| **DOCXSearchTool** | DOCX 문서 내에서 검색하는 데 특화된 RAG 도구로, Word 파일을 처리할 때 이상적입니다. |
| **DirectoryReadTool** | 디렉터리 구조와 그 내용을 읽고 처리하도록 지원하는 도구입니다. |
| **ExaSearchTool** | 다양한 데이터 소스를 폭넓게 검색하기 위해 설계된 도구입니다. |
| **EXASearchTool** | 다양한 데이터 소스를 폭넓게 검색하기 위해 설계된 도구입니다. |
| **FileReadTool** | 다양한 파일 형식을 지원하며 파일에서 데이터를 읽고 추출할 수 있는 도구입니다. |
| **FirecrawlSearchTool** | Firecrawl을 이용해 웹페이지를 검색하고 결과를 반환하는 도구입니다. |
| **FirecrawlCrawlWebsiteTool** | Firecrawl을 사용해 웹페이지를 크롤링하는 도구입니다. |

View File

@@ -346,47 +346,32 @@ class SelectivePersistFlow(Flow):
return f"Complete with count {self.state['count']}"
```
#### 영속 상태 포크하기
#### 사용자 지정 영속성 키 사용하기
`@persist`는 `kickoff` / `kickoff_async`에서 두 가지 별개의 하이드레이션 모드를 지원합니다. 동일한 계보를 계속하려면 **재개**(`inputs["id"]`)를 사용하고, 스냅샷에서 시작하는 새 계보를 시작하려면 **포크**(`restore_from_state_id`)를 사용하세요:
| | kickoff 후 `state.id` | `@persist` 기록 위치 |
|---|---|---|
| `inputs["id"]` (재개) | 제공된 id | 제공된 id (기록 확장) |
| `restore_from_state_id` (포크) | 새 id, 또는 고정 시 `inputs["id"]` | 새 id (원본 보존) |
기본적으로 `@persist()`는 자동 생성된 `state.id`를 영속 상태의 키로 사용합니다. 도메인에 이미 자연스러운 식별자가 있는 경우 — 예를 들어 같은 사용자 세션에 속한 여러 플로우 실행을 묶는 `conversation_id` — `key` 인자로 전달하면 `@persist`는 `id` 대신 해당 속성을 플로우 UUID로 사용합니다:
```python
from crewai.flow.flow import Flow, start
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
class ConversationState(BaseModel):
conversation_id: str
history: list[str] = []
@persist
class CounterFlow(Flow[CounterState]):
@persist(key="conversation_id")
class ConversationFlow(Flow[ConversationState]):
@start()
def step(self):
self.state.counter += 1
def greet(self):
self.state.history.append("hello")
return self.state.history
# 실행 1: 새 상태, counter 0 -> 1
flow_1 = CounterFlow()
flow_1.kickoff()
# 포크: flow_1의 최신 스냅샷에서 하이드레이트, 단 새 state.id에 기록
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2는 counter=1(하이드레이트)로 시작하고, step()이 2로 증가시킵니다.
# flow_1의 flow_uuid 기록은 변경되지 않습니다.
# 동일한 conversation_id로 두 번째 실행 시 이전 상태가 다시 로드됩니다
flow = ConversationFlow(conversation_id="user-42")
flow.kickoff()
```
동작 노트:
- `restore_from_state_id`가 영속에서 발견되지 않음 → kickoff는 조용히 기본 동작으로 폴백됩니다 (기존 `inputs["id"]`의 미발견 동작 미러링). 예외는 발생하지 않습니다.
- `restore_from_state_id`를 `from_checkpoint`와 결합하면 `ValueError`가 발생합니다 — 서로 다른 상태 시스템(`@persist` 대 Checkpointing)을 대상으로 하므로 결합할 수 없습니다.
- `restore_from_state_id=None`(기본값)은 매개변수 없는 kickoff와 바이트 단위로 동일합니다.
- 포크 중 `inputs["id"]`를 고정하면 새 실행이 다른 플로우와 영속 키를 공유함을 의미합니다 — 일반적으로 `restore_from_state_id`만 사용하는 것이 좋습니다.
dict 기반 상태의 경우 `@persist`는 `state[key]`를 읽고, Pydantic / 객체 상태의 경우 `getattr(state, key)`를 읽습니다. 상태가 저장될 때 지정된 속성이 없거나 falsy 값이면 `@persist`는 `Flow state is missing required persistence key 'conversation_id'`와 같은 `ValueError`를 발생시켜, 영속 데이터가 조용히 손실되는 대신 즉시 실패가 드러나도록 합니다. `key` 없이 `@persist()`를 호출하면 기존 동작대로 `state.id`가 사용됩니다.
## 고급 상태 패턴

View File

@@ -1,15 +1,15 @@
---
title: EXA 검색 웹 로더
description: ExaSearchTool은 인터넷 전반에 걸쳐 텍스트의 내용에서 지정된 쿼리에 대한 시맨틱 검색을 수행하도록 설계되었습니다.
description: EXASearchTool은 인터넷 전반에 걸쳐 텍스트의 내용에서 지정된 쿼리에 대한 시맨틱 검색을 수행하도록 설계되었습니다.
icon: globe-pointer
mode: "wide"
---
# `ExaSearchTool`
# `EXASearchTool`
## 설명
ExaSearchTool은 텍스트의 내용을 기반으로 지정된 쿼리를 인터넷 전반에 걸쳐 의미론적으로 검색하도록 설계되었습니다.
EXASearchTool은 텍스트의 내용을 기반으로 지정된 쿼리를 인터넷 전반에 걸쳐 의미론적으로 검색하도록 설계되었습니다.
사용자가 제공한 쿼리를 기반으로 가장 관련성 높은 검색 결과를 가져오고 표시하기 위해 [exa.ai](https://exa.ai/) API를 활용합니다.
## 설치
@@ -25,15 +25,15 @@ pip install 'crewai[tools]'
다음 예제는 도구를 초기화하고 주어진 쿼리로 검색을 실행하는 방법을 보여줍니다:
```python Code
from crewai_tools import ExaSearchTool
from crewai_tools import EXASearchTool
# Initialize the tool for internet searching capabilities
tool = ExaSearchTool()
tool = EXASearchTool()
```
## 시작 단계
ExaSearchTool을 효과적으로 사용하려면 다음 단계를 따르세요:
EXASearchTool을 효과적으로 사용하려면 다음 단계를 따르세요:
<Steps>
<Step title="패키지 설치">
@@ -47,35 +47,7 @@ ExaSearchTool을 효과적으로 사용하려면 다음 단계를 따르세요:
</Step>
</Steps>
## MCP를 통한 Exa 사용
Exa가 호스팅하는 MCP 서버에 에이전트를 연결할 수도 있습니다. API 키는 `x-api-key` 헤더로 전달하세요:
```python
from crewai import Agent
from crewai.mcp import MCPServerHTTP
agent = Agent(
role="Research Analyst",
goal="Find and analyze information on the web",
backstory="Expert researcher with access to Exa's tools",
mcps=[
MCPServerHTTP(
url="https://mcp.exa.ai/mcp",
headers={"x-api-key": "YOUR_EXA_API_KEY"},
),
],
)
```
API 키는 [Exa 대시보드](https://dashboard.exa.ai/api-keys)에서 발급받을 수 있습니다. CrewAI에서의 MCP 사용에 대한 자세한 내용은 [MCP 개요](/ko/mcp/overview)를 참고하세요.
## 결론
`ExaSearchTool`을 Python 프로젝트에 통합함으로써, 사용자는 애플리케이션 내에서 실시간으로 인터넷을 직접 검색할 수 있는 능력을 얻게 됩니다.
`EXASearchTool`을 Python 프로젝트에 통합함으로써, 사용자는 애플리케이션 내에서 실시간으로 인터넷을 직접 검색할 수 있는 능력을 얻게 됩니다.
제공된 설정 및 사용 지침을 따르면, 이 도구를 프로젝트에 포함하는 과정이 간편하고 직관적입니다.
## 참고 자료
- [Exa 공식 문서](https://exa.ai/docs)
- [Exa 대시보드 — API 키 및 사용량 관리](https://dashboard.exa.ai)

View File

@@ -26,7 +26,7 @@ Bem-vindo à referência da API do CrewAI AMP. Esta API permite que você intera
</Step>
<Step title="Monitore o Progresso">
Use `GET /status/{kickoff_id}` para checar o status da execução e recuperar os resultados.
Use `GET /{kickoff_id}/status` para checar o status da execução e recuperar os resultados.
</Step>
</Steps>
@@ -65,7 +65,7 @@ Substitua `your-crew-name` pela URL real do seu crew no painel.
1. **Descoberta**: Chame `GET /inputs` para entender o que seu crew precisa
2. **Execução**: Envie os inputs via `POST /kickoff` para iniciar o processamento
3. **Monitoramento**: Faça polling em `GET /status/{kickoff_id}` até a conclusão
3. **Monitoramento**: Faça polling em `GET /{kickoff_id}/status` até a conclusão
4. **Resultados**: Extraia o output final da resposta concluída
## Tratamento de Erros

View File

@@ -1,6 +1,6 @@
---
title: "GET /status/{kickoff_id}"
title: "GET /{kickoff_id}/status"
description: "Obter o status da execução"
openapi: "/enterprise-api.pt-BR.yaml GET /status/{kickoff_id}"
openapi: "/enterprise-api.pt-BR.yaml GET /{kickoff_id}/status"
mode: "wide"
---

View File

@@ -4,99 +4,6 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="04 mai 2026">
## v1.14.5a2
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a2)
## O que Mudou
### Correções de Bugs
- Corrigir a restauração da saída da tarefa no bloco finally
- Incluir `thoughts_token_count` nos tokens de conclusão
- Preservar as saídas das tarefas durante o descarregamento assíncrono em lote
- Encaminhar kwargs para chamadas de carregador em `CrewAIRagAdapter`
- Impedir que `result_as_answer` retorne mensagem de bloqueio de hook como resposta final
- Impedir que `result_as_answer` retorne erro como resposta final
- Usar `acall` para conversão de saída em caminhos assíncronos
- Prevenir a mutação de palavras de parada compartilhadas do LLM entre agentes
- Lidar com entrada `BaseModel` em `convert_to_model`
### Documentação
- Documentar variáveis de ambiente adicionais
- Atualizar changelog e versão para v1.14.5a1
## Contribuidores
@NIK-TIGER-BILL, @greysonlalonde, @lorenzejay, @minasami-pr, @theCyberTech, @wishhyt
</Update>
<Update label="01 mai 2026">
## v1.14.5a1
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a1)
## O que Mudou
### Recursos
- Adicionar parâmetro de início `restore_from_state_id`
- Adicionar destaques ao ExaSearchTool e renomear de EXASearchTool
### Correções de Bugs
- Corrigir sites de pinos do crewai ausentes no fluxo de lançamento
- Garantir eventos de carregamento de habilidades para rastros
### Documentação
- Atualizar changelog e versão para v1.14.4
## Contribuidores
@akaKuruma, @github-actions[bot], @greysonlalonde, @lorenzejay, @theishangoswami
</Update>
<Update label="01 mai 2026">
## v1.14.4
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.4)
## O que mudou
### Recursos
- Adicionar suporte para chave de persistência personalizada em @persist
- Adicionar suporte à API de Respostas para o provedor Azure OpenAI
- Encaminhar credential_scopes para o cliente de Inferência da Azure AI
- Adicionar guia de configuração de identidade de carga de trabalho do Vertex AI
- Adicionar Tavily Research e obter Pesquisa
- Adicionar ferramentas MCP do You.com para pesquisa, pesquisa e extração de conteúdo
### Correções de Bugs
- Corrigir falha quando a correspondência de regex JSON não é um JSON válido
- Corrigir para preservar tool_calls quando a resposta também contém texto
- Corrigir para encaminhar base_url e api_key para instructor.from_provider
- Corrigir para avisar e retornar vazio quando o servidor MCP nativo não retorna ferramentas
- Corrigir para usar a variável de mensagens validadas em manipuladores não-streaming
- Corrigir para proteger os ajudantes de descrição do chat da equipe contra falhas do LLM
- Corrigir para redefinir mensagens e iterações entre invocações
- Corrigir para encaminhar o arquivo de agentes treinados através de replay e teste
- Corrigir para honrar o arquivo de agentes treinados personalizados na inferência
- Corrigir para vincular agentes apenas de tarefa à equipe para arquivos de entrada multimodal
- Corrigir para serializar chamadas de guardrail como nulas para checkpointing JSON
- Corrigir renomeação de force_final_answer para evitar roteador autorreferencial
- Corrigir aumento de litellm para correção de SSTI; ignorar CVE pip não corrigível
### Documentação
- Atualizar changelog e versão para v1.14.4a1
- Adicionar página de Ferramentas do Sandbox E2B
- Adicionar documentação de ferramentas do sandbox Daytona
## Contributors
@EdwardIrby, @dependabot[bot], @factory-droid-oss, @factory-droid[bot], @greysonlalonde, @kunalk16, @lorenzejay, @lucasgomide, @manisrinivasan2k1, @mattatcha, @vinibrsl
</Update>
<Update label="29 abr 2026">
## v1.14.4a1

View File

@@ -193,41 +193,32 @@ Para um controle mais granular, você pode aplicar @persist em métodos específ
# (O código não é traduzido)
```
### Forking de Estado Persistido
### Chave de Persistência Personalizada
`@persist` suporta dois modos distintos de hidratação em `kickoff` / `kickoff_async`:
- `kickoff(inputs={"id": <uuid>})` — **resume**: carrega o snapshot mais recente do UUID informado e continua escrevendo sob o mesmo `flow_uuid`. O histórico se estende.
- `kickoff(restore_from_state_id=<uuid>)` — **fork**: carrega o snapshot mais recente do UUID informado, hidrata o estado da nova execução a partir dele, e atribui um novo `state.id` (auto-gerado, ou `inputs["id"]` se fixado). As escritas do `@persist` da nova execução vão para o novo `state.id`; o histórico do flow de origem é preservado.
Por padrão, `@persist` usa o campo `state.id` gerado automaticamente como chave de persistência. Se o seu flow já possui um identificador natural — por exemplo um `conversation_id` compartilhado entre sessões — você pode passar o argumento `key` e `@persist` usará esse atributo como UUID do flow:
```python
from crewai.flow.flow import Flow, start
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
class ConversationState(BaseModel):
conversation_id: str
turn: int = 0
@persist
class CounterFlow(Flow[CounterState]):
@persist(key="conversation_id") # Usa um campo personalizado como chave de persistência
class ConversationFlow(Flow[ConversationState]):
@start()
def step(self):
self.state.counter += 1
print(f"[id={self.state.id}] counter={self.state.counter}")
def begin(self):
self.state.turn += 1
print(f"Conversa {self.state.conversation_id} turno {self.state.turn}")
# Execução 1: estado novo, counter 0 -> 1, persistido sob flow_1.state.id
flow_1 = CounterFlow()
flow_1.kickoff()
# Fork: hidrata do snapshot mais recente de flow_1, mas usa um state.id NOVO
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2.state.counter começa em 1 (hidratado), e step() incrementa para 2.
# flow_2.state.id != flow_1.state.id; o histórico de flow_1 não é alterado.
# Retomar a mesma conversa recarrega o estado anterior pelo conversation_id
flow = ConversationFlow(conversation_id="user-42")
flow.kickoff()
```
Se o `restore_from_state_id` informado não corresponder a nenhum estado persistido, o kickoff retorna silenciosamente ao comportamento padrão — o mesmo comportamento do `inputs["id"]` quando não encontrado. Combinar `restore_from_state_id` com `from_checkpoint` lança um `ValueError`; escolha uma única fonte de hidratação. Fixar `inputs["id"]` durante o fork compartilha uma chave de persistência com outro flow — geralmente você quer apenas `restore_from_state_id`.
O decorador lê o valor em `state[key]` para estados do tipo dicionário ou `getattr(state, key)` para estados Pydantic / objetos. Se o atributo informado estiver ausente ou for *falsy* no momento de salvar, `@persist` lança um `ValueError` como `Flow state is missing required persistence key 'conversation_id'`. Quando `key` é omitido, o comportamento original é preservado e `state.id` continua sendo usado.
### Como Funciona

View File

@@ -146,14 +146,15 @@ class ProductionFlow(Flow[AppState]):
# ...
```
Por padrão, `@persist` retoma um flow quando `kickoff(inputs={"id": <uuid>})` é informado, estendendo o mesmo histórico do `flow_uuid`. Para **forkar** um flow persistido em uma nova linhagem — hidratar o estado a partir de uma execução anterior mas escrever sob um novo `state.id` — passe `restore_from_state_id`:
Por padrão, `@persist` usa o `state.id` gerado automaticamente como chave do estado salvo. Se a sua aplicação já tem um identificador natural — por exemplo um `conversation_id` que liga várias execuções à mesma sessão de usuário passe-o como `key` e o decorador usará esse atributo como UUID do flow. Um `ValueError` é lançado se o atributo informado estiver ausente ou for *falsy* no momento de salvar.
```python
flow.kickoff(restore_from_state_id="<previous-run-state-id>")
@persist(key="conversation_id")
class ProductionFlow(Flow[AppState]):
# AppState precisa expor conversation_id; retomar a sessão recarrega o estado anterior
...
```
A nova execução recebe um novo `state.id` (auto-gerado, ou `inputs["id"]` se fixado), então suas escritas do `@persist` não estendem o histórico da origem. Combinar com `from_checkpoint` lança um `ValueError`; escolha uma única fonte de hidratação.
## Resumo
- **Comece com um Flow.**

View File

@@ -133,7 +133,7 @@ Aqui está uma lista das ferramentas disponíveis e suas descrições:
| **DirectorySearchTool** | Ferramenta RAG para busca em diretórios, útil para navegação em sistemas de arquivos. |
| **DOCXSearchTool** | Ferramenta RAG voltada para busca em documentos DOCX, ideal para processar arquivos Word. |
| **DirectoryReadTool** | Facilita a leitura e processamento de estruturas de diretórios e seus conteúdos. |
| **ExaSearchTool** | Ferramenta projetada para buscas exaustivas em diversas fontes de dados. |
| **EXASearchTool** | Ferramenta projetada para buscas exaustivas em diversas fontes de dados. |
| **FileReadTool** | Permite a leitura e extração de dados de arquivos, suportando diversos formatos. |
| **FirecrawlSearchTool** | Ferramenta para buscar páginas web usando Firecrawl e retornar os resultados. |
| **FirecrawlCrawlWebsiteTool** | Ferramenta para rastrear páginas web utilizando o Firecrawl. |

View File

@@ -167,47 +167,32 @@ Para mais controle, você pode aplicar `@persist()` em métodos específicos:
# código não traduzido
```
#### Forking de Estado Persistido
#### Usando uma Chave de Persistência Personalizada
`@persist` suporta dois modos distintos de hidratação em `kickoff` / `kickoff_async`. Use **resume** (`inputs["id"]`) para continuar a mesma linhagem; use **fork** (`restore_from_state_id`) para iniciar uma nova linhagem a partir de um snapshot:
| | `state.id` após o kickoff | Escritas do `@persist` vão para |
|---|---|---|
| `inputs["id"]` (resume) | id informado | id informado (estende o histórico) |
| `restore_from_state_id` (fork) | id novo, ou `inputs["id"]` se fixado | id novo (origem preservada) |
Por padrão, `@persist()` usa o `state.id` gerado automaticamente como chave do estado persistido. Quando seu domínio já possui um identificador natural — por exemplo um `conversation_id` que liga várias execuções do flow à mesma sessão de usuário — passe-o como argumento `key` e `@persist` usará esse atributo como UUID do flow em vez de `id`:
```python
from crewai.flow.flow import Flow, start
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel
class CounterState(BaseModel):
id: str = ""
counter: int = 0
class ConversationState(BaseModel):
conversation_id: str
history: list[str] = []
@persist
class CounterFlow(Flow[CounterState]):
@persist(key="conversation_id")
class ConversationFlow(Flow[ConversationState]):
@start()
def step(self):
self.state.counter += 1
def greet(self):
self.state.history.append("hello")
return self.state.history
# Execução 1: estado novo, counter 0 -> 1
flow_1 = CounterFlow()
flow_1.kickoff()
# Fork: hidrata do snapshot mais recente de flow_1, mas escreve sob um state.id NOVO
flow_2 = CounterFlow()
flow_2.kickoff(restore_from_state_id=flow_1.state.id)
# flow_2 começa com counter=1 (hidratado), e step() incrementa para 2.
# O histórico do flow_uuid de flow_1 não é alterado.
# Uma segunda execução com o mesmo conversation_id recarrega o estado anterior
flow = ConversationFlow(conversation_id="user-42")
flow.kickoff()
```
Notas sobre o comportamento:
- `restore_from_state_id` não encontrado na persistência → o kickoff retorna silenciosamente ao comportamento padrão (espelha o comportamento de `inputs["id"]` quando não encontrado). Nenhuma exceção é lançada.
- Combinar `restore_from_state_id` com `from_checkpoint` lança um `ValueError` — eles miram sistemas de estado diferentes (`@persist` vs. Checkpointing) e não podem ser combinados.
- `restore_from_state_id=None` (padrão) é byte-idêntico a um kickoff sem o parâmetro.
- Fixar `inputs["id"]` durante o fork significa que a nova execução compartilha uma chave de persistência com outro flow — geralmente você quer apenas `restore_from_state_id`.
Para estados baseados em dicionário `@persist` lê `state[key]`, e para estados Pydantic / objetos lê `getattr(state, key)`. Se o atributo informado estiver ausente ou for *falsy* no momento em que o estado for salvo, `@persist` lança um `ValueError` como `Flow state is missing required persistence key 'conversation_id'`, fazendo com que a falha apareça imediatamente em vez de descartar silenciosamente os dados persistidos. Chamar `@persist()` sem `key` mantém o comportamento original de usar `state.id`.
## Padrões Avançados de Estado

View File

@@ -1,15 +1,15 @@
---
title: Carregador Web EXA Search
description: O `ExaSearchTool` foi projetado para realizar uma busca semântica para uma consulta especificada a partir do conteúdo de um texto em toda a internet.
description: O `EXASearchTool` foi projetado para realizar uma busca semântica para uma consulta especificada a partir do conteúdo de um texto em toda a internet.
icon: globe-pointer
mode: "wide"
---
# `ExaSearchTool`
# `EXASearchTool`
## Descrição
O ExaSearchTool foi projetado para realizar uma busca semântica para uma consulta especificada a partir do conteúdo de um texto em toda a internet.
O EXASearchTool foi projetado para realizar uma busca semântica para uma consulta especificada a partir do conteúdo de um texto em toda a internet.
Ele utiliza a API da [exa.ai](https://exa.ai/) para buscar e exibir os resultados de pesquisa mais relevantes com base na consulta fornecida pelo usuário.
## Instalação
@@ -25,15 +25,15 @@ pip install 'crewai[tools]'
O exemplo a seguir demonstra como inicializar a ferramenta e executar uma busca com uma consulta determinada:
```python Code
from crewai_tools import ExaSearchTool
from crewai_tools import EXASearchTool
# Initialize the tool for internet searching capabilities
tool = ExaSearchTool()
tool = EXASearchTool()
```
## Etapas para Começar
Para usar o ExaSearchTool de forma eficaz, siga estas etapas:
Para usar o EXASearchTool de forma eficaz, siga estas etapas:
<Steps>
<Step title="Instalação do Pacote">
@@ -47,35 +47,7 @@ Para usar o ExaSearchTool de forma eficaz, siga estas etapas:
</Step>
</Steps>
## Usando o Exa via MCP
Você também pode conectar seu agente ao servidor MCP hospedado pelo Exa. Passe sua chave de API no cabeçalho `x-api-key`:
```python
from crewai import Agent
from crewai.mcp import MCPServerHTTP
agent = Agent(
role="Research Analyst",
goal="Find and analyze information on the web",
backstory="Expert researcher with access to Exa's tools",
mcps=[
MCPServerHTTP(
url="https://mcp.exa.ai/mcp",
headers={"x-api-key": "YOUR_EXA_API_KEY"},
),
],
)
```
Obtenha sua chave de API no [painel da Exa](https://dashboard.exa.ai/api-keys). Para mais informações sobre MCP no CrewAI, consulte a [visão geral do MCP](/pt-BR/mcp/overview).
## Conclusão
Ao integrar o `ExaSearchTool` em projetos Python, os usuários ganham a capacidade de realizar buscas relevantes e em tempo real pela internet diretamente de suas aplicações.
Seguindo as orientações de configuração e uso fornecidas, a incorporação desta ferramenta em projetos torna-se simples e direta.
## Recursos
- [Documentação do Exa](https://exa.ai/docs)
- [Painel do Exa — gerenciar chaves de API e uso](https://dashboard.exa.ai)
Ao integrar o `EXASearchTool` em projetos Python, os usuários ganham a capacidade de realizar buscas relevantes e em tempo real pela internet diretamente de suas aplicações.
Seguindo as orientações de configuração e uso fornecidas, a incorporação desta ferramenta em projetos torna-se simples e direta.

View File

@@ -1,26 +0,0 @@
# crewai-cli
CLI for CrewAI — scaffold, run, deploy and manage AI agent crews without
installing the full framework.
## Installation
```bash
pip install crewai-cli
```
This pulls in `crewai-core` (shared utilities) but not the `crewai` framework
itself, so commands that don't need a crew loaded — `crewai version`,
`crewai login`, `crewai org list`, `crewai config *`, `crewai traces *`,
`crewai create`, `crewai template *` — work standalone.
Commands that load a user's crew or flow (`crewai run`, `crewai train`,
`crewai test`, `crewai chat`, `crewai replay`, `crewai reset-memories`,
`crewai deploy push`, `crewai tool publish`) require `crewai` to be installed
in the project's environment. They print a clear error if it is missing.
To install both at once:
```bash
pip install crewai[cli]
```

View File

@@ -1,43 +0,0 @@
[project]
name = "crewai-cli"
dynamic = ["version"]
description = "CLI for CrewAI — scaffold, run, deploy and manage AI agent crews."
readme = "README.md"
authors = [
{ name = "Joao Moura", email = "joao@crewai.com" }
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core>=1.14.5a2",
"click~=8.1.7",
"pydantic>=2.11.9,<2.13",
"pydantic-settings~=2.10.1",
"appdirs~=1.4.4",
"cryptography>=42.0",
"httpx~=0.28.1",
"pyjwt>=2.9.0,<3",
"rich>=13.7.1",
"tomli~=2.0.2",
"tomli-w~=1.1.0",
"packaging>=23.0",
"python-dotenv>=1.2.2,<2",
"uv~=0.11.6",
]
[project.urls]
Homepage = "https://crewai.com"
Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.scripts]
crewai = "crewai_cli.cli:crewai"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.version]
path = "src/crewai_cli/__init__.py"
[tool.hatch.build.targets.wheel]
packages = ["src/crewai_cli"]

View File

@@ -1 +0,0 @@
__version__ = "1.14.5a2"

View File

@@ -1,8 +0,0 @@
"""CLI authentication entry point."""
from __future__ import annotations
from crewai_cli.authentication.main import AuthenticationCommand
__all__ = ["AuthenticationCommand"]

View File

@@ -1,8 +0,0 @@
"""Re-export of authentication constants from ``crewai_core.auth.constants``."""
from __future__ import annotations
from crewai_core.auth.constants import ALGORITHMS as ALGORITHMS
__all__ = ["ALGORITHMS"]

View File

@@ -1,60 +0,0 @@
"""CLI-side authentication wiring.
Re-exports the OAuth2 primitives from ``crewai_core.auth`` and overrides the
``_post_login`` hook to also log into the tool repository.
"""
from __future__ import annotations
from crewai_core.auth.oauth2 import (
AuthenticationCommand as _BaseAuthenticationCommand,
Oauth2Settings as Oauth2Settings,
ProviderFactory as ProviderFactory,
console,
)
from crewai_core.settings import Settings
__all__ = ["AuthenticationCommand", "Oauth2Settings", "ProviderFactory"]
class AuthenticationCommand(_BaseAuthenticationCommand):
"""CLI-side login that also signs the user into the tool repository."""
def _post_login(self) -> None:
self._login_to_tool_repository()
def _login_to_tool_repository(self) -> None:
from crewai_cli.tools.main import ToolCommand
try:
console.print(
"Now logging you in to the Tool Repository... ",
style="bold blue",
end="",
)
ToolCommand().login()
console.print(
"Success!\n",
style="bold green",
)
settings = Settings()
console.print(
f"You are now authenticated to the tool repository for organization [bold cyan]'{settings.org_name if settings.org_name else settings.org_uuid}'[/bold cyan]",
style="green",
)
except (Exception, SystemExit):
console.print(
"\n[bold yellow]Warning:[/bold yellow] Authentication with the Tool Repository failed.",
style="yellow",
)
console.print(
"Other features will work normally, but you may experience limitations "
"with downloading and publishing tools."
"\nRun [bold]crewai login[/bold] to try logging in again.\n",
style="yellow",
)

View File

@@ -1 +0,0 @@
"""OAuth2 authentication providers — re-exported from ``crewai_core.auth.providers``."""

View File

@@ -1,8 +0,0 @@
"""Re-export of ``Auth0Provider`` from ``crewai_core.auth.providers.auth0``."""
from __future__ import annotations
from crewai_core.auth.providers.auth0 import Auth0Provider as Auth0Provider
__all__ = ["Auth0Provider"]

View File

@@ -1,8 +0,0 @@
"""Re-export of ``BaseProvider`` from ``crewai_core.auth.providers.base_provider``."""
from __future__ import annotations
from crewai_core.auth.providers.base_provider import BaseProvider as BaseProvider
__all__ = ["BaseProvider"]

View File

@@ -1,8 +0,0 @@
"""Re-export of ``EntraIdProvider`` from ``crewai_core.auth.providers.entra_id``."""
from __future__ import annotations
from crewai_core.auth.providers.entra_id import EntraIdProvider as EntraIdProvider
__all__ = ["EntraIdProvider"]

View File

@@ -1,8 +0,0 @@
"""Re-export of ``KeycloakProvider`` from ``crewai_core.auth.providers.keycloak``."""
from __future__ import annotations
from crewai_core.auth.providers.keycloak import KeycloakProvider as KeycloakProvider
__all__ = ["KeycloakProvider"]

View File

@@ -1,8 +0,0 @@
"""Re-export of ``OktaProvider`` from ``crewai_core.auth.providers.okta``."""
from __future__ import annotations
from crewai_core.auth.providers.okta import OktaProvider as OktaProvider
__all__ = ["OktaProvider"]

View File

@@ -1,8 +0,0 @@
"""Re-export of ``WorkosProvider`` from ``crewai_core.auth.providers.workos``."""
from __future__ import annotations
from crewai_core.auth.providers.workos import WorkosProvider as WorkosProvider
__all__ = ["WorkosProvider"]

View File

@@ -1,11 +0,0 @@
"""Re-exports of authentication token helpers from ``crewai_core.auth.token``."""
from __future__ import annotations
from crewai_core.auth.token import (
AuthError as AuthError,
get_auth_token as get_auth_token,
)
__all__ = ["AuthError", "get_auth_token"]

View File

@@ -1,8 +0,0 @@
"""Re-export of ``validate_jwt_token`` from ``crewai_core.auth.utils``."""
from __future__ import annotations
from crewai_core.auth.utils import validate_jwt_token as validate_jwt_token
__all__ = ["validate_jwt_token"]

View File

@@ -1,30 +0,0 @@
"""Re-exports of shared settings from ``crewai_core.settings``.
Kept as a stable import path for the CLI; new code should import from
``crewai_core.settings`` directly.
"""
from __future__ import annotations
from crewai_core.settings import (
CLI_SETTINGS_KEYS as CLI_SETTINGS_KEYS,
DEFAULT_CLI_SETTINGS as DEFAULT_CLI_SETTINGS,
DEFAULT_CONFIG_PATH as DEFAULT_CONFIG_PATH,
HIDDEN_SETTINGS_KEYS as HIDDEN_SETTINGS_KEYS,
READONLY_SETTINGS_KEYS as READONLY_SETTINGS_KEYS,
USER_SETTINGS_KEYS as USER_SETTINGS_KEYS,
Settings as Settings,
get_writable_config_path as get_writable_config_path,
)
__all__ = [
"CLI_SETTINGS_KEYS",
"DEFAULT_CLI_SETTINGS",
"DEFAULT_CONFIG_PATH",
"HIDDEN_SETTINGS_KEYS",
"READONLY_SETTINGS_KEYS",
"USER_SETTINGS_KEYS",
"Settings",
"get_writable_config_path",
]

View File

@@ -1,23 +0,0 @@
"""Wrapper for the crew chat command.
Delegates to ``crewai.utilities.crew_chat.run_chat`` when the full crewai
package is installed, otherwise prints a helpful error message.
"""
from __future__ import annotations
import click
def run_chat() -> None:
try:
from crewai.utilities.crew_chat import run_chat as _run_chat
except ImportError:
click.secho(
"The 'chat' command requires the full crewai package.\n"
"Install it with: pip install crewai",
fg="red",
)
raise SystemExit(1) from None
_run_chat()

View File

@@ -1,12 +0,0 @@
"""Re-export of ``crewai_core.plus_api.PlusAPI``.
Kept as a stable import path for the CLI; new code should import from
``crewai_core.plus_api`` directly.
"""
from __future__ import annotations
from crewai_core.plus_api import PlusAPI as PlusAPI
__all__ = ["PlusAPI"]

View File

@@ -1,31 +0,0 @@
"""Wrapper for the reset-memories command.
Delegates to ``crewai.utilities.reset_memories`` when the full crewai
package is installed, otherwise prints a helpful error message.
"""
from __future__ import annotations
import click
def reset_memories_command(
memory: bool,
knowledge: bool,
agent_knowledge: bool,
kickoff_outputs: bool,
all: bool,
) -> None:
try:
from crewai.utilities.reset_memories import (
reset_memories_command as _reset,
)
except ImportError:
click.secho(
"The 'reset-memories' command requires the full crewai package.\n"
"Install it with: pip install crewai",
fg="red",
)
raise SystemExit(1) from None
_reset(memory, knowledge, agent_knowledge, kickoff_outputs, all)

View File

@@ -1,12 +0,0 @@
"""Re-export of ``crewai_core.token_manager.TokenManager``.
Kept as a stable import path for the CLI; new code should import from
``crewai_core.token_manager`` directly.
"""
from __future__ import annotations
from crewai_core.token_manager import TokenManager as TokenManager
__all__ = ["TokenManager"]

View File

@@ -1,67 +0,0 @@
"""Lightweight SQLite reader for kickoff task outputs.
Only used by the ``crewai log-tasks-outputs`` CLI command. Depends solely on
the standard library + *appdirs* so crewai-cli can read stored outputs without
importing the full crewai framework.
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
import sqlite3
from typing import Any
from crewai_cli.user_data import _db_storage_path
logger = logging.getLogger(__name__)
def load_task_outputs(db_path: str | None = None) -> list[dict[str, Any]]:
"""Return all rows from the kickoff task outputs database."""
if db_path is None:
db_path = str(Path(_db_storage_path()) / "latest_kickoff_task_outputs.db")
if not Path(db_path).exists():
return []
try:
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT task_id, expected_output, output, task_index,
inputs, was_replayed, timestamp
FROM latest_kickoff_task_outputs
ORDER BY task_index
""")
rows = cursor.fetchall()
except sqlite3.Error as e:
logger.error("Failed to load task outputs: %s", e)
return []
return [
{
"task_id": row["task_id"],
"expected_output": row["expected_output"],
"output": _safe_json_loads(row["output"]),
"task_index": row["task_index"],
"inputs": _safe_json_loads(row["inputs"]),
"was_replayed": row["was_replayed"],
"timestamp": row["timestamp"],
}
for row in rows
]
def _safe_json_loads(value: str | None) -> Any:
"""Decode a JSON column tolerantly: NULL/blank/corrupt → None."""
if not value:
return None
try:
return json.loads(value)
except (json.JSONDecodeError, TypeError) as e:
logger.warning("Failed to decode JSON column: %s", e)
return None

View File

@@ -1,22 +0,0 @@
"""User-data helpers — re-exported from ``crewai_core.user_data``."""
from __future__ import annotations
from crewai_core.paths import db_storage_path as _db_storage_path
from crewai_core.user_data import (
_load_user_data as _load_user_data,
_save_user_data as _save_user_data,
has_user_declined_tracing as has_user_declined_tracing,
is_tracing_enabled as is_tracing_enabled,
update_user_data as update_user_data,
)
__all__ = [
"_db_storage_path",
"_load_user_data",
"_save_user_data",
"has_user_declined_tracing",
"is_tracing_enabled",
"update_user_data",
]

View File

@@ -1,137 +0,0 @@
from __future__ import annotations
import os
from pathlib import Path
import shutil
from typing import Any
import click
from crewai_core.project import (
get_project_description as get_project_description,
get_project_name as get_project_name,
get_project_version as get_project_version,
parse_toml as parse_toml,
read_toml as read_toml,
)
from crewai_core.tool_credentials import (
build_env_with_all_tool_credentials as build_env_with_all_tool_credentials,
build_env_with_tool_repository_credentials as build_env_with_tool_repository_credentials,
)
from rich.console import Console
__all__ = [
"build_env_with_all_tool_credentials",
"build_env_with_tool_repository_credentials",
"copy_template",
"fetch_and_json_env_file",
"get_project_description",
"get_project_name",
"get_project_version",
"load_env_vars",
"parse_toml",
"read_toml",
"tree_copy",
"tree_find_and_replace",
"write_env_file",
]
console = Console()
def copy_template(
src: Path, dst: Path, name: str, class_name: str, folder_name: str
) -> None:
"""Copy a file from src to dst."""
with open(src, "r") as file:
content = file.read()
content = content.replace("{{name}}", name)
content = content.replace("{{crew_name}}", class_name)
content = content.replace("{{folder_name}}", folder_name)
with open(dst, "w") as file:
file.write(content)
click.secho(f" - Created {dst}", fg="green")
def fetch_and_json_env_file(env_file_path: str = ".env") -> dict[str, Any]:
"""Fetch the environment variables from a .env file and return them as a dictionary."""
try:
with open(env_file_path, "r") as f:
env_content = f.read()
env_dict = {}
for line in env_content.splitlines():
if line.strip() and not line.strip().startswith("#"):
key, value = line.split("=", 1)
env_dict[key.strip()] = value.strip()
return env_dict
except FileNotFoundError:
console.print(f"Error: {env_file_path} not found.", style="bold red")
except Exception as e:
console.print(f"Error reading the .env file: {e}", style="bold red")
return {}
def tree_copy(source: Path, destination: Path) -> None:
"""Copies the entire directory structure from the source to the destination."""
for item in os.listdir(source):
source_item = os.path.join(source, item)
destination_item = os.path.join(destination, item)
if os.path.isdir(source_item):
shutil.copytree(source_item, destination_item)
else:
shutil.copy2(source_item, destination_item)
def tree_find_and_replace(directory: Path, find: str, replace: str) -> None:
"""Recursively searches through a directory, replacing a target string in
both file contents and filenames with a specified replacement string.
"""
for path, dirs, files in os.walk(os.path.abspath(directory), topdown=False):
for filename in files:
filepath = os.path.join(path, filename)
with open(filepath, "r", encoding="utf-8", errors="ignore") as file:
contents = file.read()
with open(filepath, "w") as file:
file.write(contents.replace(find, replace))
if find in filename:
new_filename = filename.replace(find, replace)
new_filepath = os.path.join(path, new_filename)
os.rename(filepath, new_filepath)
for dirname in dirs:
if find in dirname:
new_dirname = dirname.replace(find, replace)
new_dirpath = os.path.join(path, new_dirname)
old_dirpath = os.path.join(path, dirname)
os.rename(old_dirpath, new_dirpath)
def load_env_vars(folder_path: Path) -> dict[str, Any]:
"""Loads environment variables from a .env file in the specified folder path."""
env_file_path = folder_path / ".env"
env_vars = {}
if env_file_path.exists():
with open(env_file_path, "r") as file:
for line in file:
key, _, value = line.strip().partition("=")
if key and value:
env_vars[key] = value
return env_vars
def write_env_file(folder_path: Path, env_vars: dict[str, Any]) -> None:
"""Writes environment variables to a .env file in the specified folder."""
env_file_path = folder_path / ".env"
with open(env_file_path, "w") as file:
for key, value in env_vars.items():
file.write(f"{key.upper()}={value}\n")

View File

@@ -1,24 +0,0 @@
"""Re-exports of version utilities from ``crewai_core.version``.
Kept as a stable import path for the CLI; new code should import from
``crewai_core.version`` directly.
"""
from __future__ import annotations
from crewai_core.version import (
check_version as check_version,
get_crewai_version as get_crewai_version,
get_latest_version_from_pypi as get_latest_version_from_pypi,
is_current_version_yanked as is_current_version_yanked,
is_newer_version_available as is_newer_version_available,
)
__all__ = [
"check_version",
"get_crewai_version",
"get_latest_version_from_pypi",
"is_current_version_yanked",
"is_newer_version_available",
]

View File

@@ -1,91 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.auth0 import Auth0Provider
class TestAuth0Provider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="auth0",
domain="test-domain.auth0.com",
client_id="test-client-id",
audience="test-audience"
)
self.provider = Auth0Provider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = Auth0Provider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "auth0"
assert provider.settings.domain == "test-domain.auth0.com"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
def test_get_authorize_url(self):
expected_url = "https://test-domain.auth0.com/oauth/device/code"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
settings = Oauth2Settings(
provider="auth0",
domain="my-company.auth0.com",
client_id="test-client",
audience="test-audience"
)
provider = Auth0Provider(settings)
expected_url = "https://my-company.auth0.com/oauth/device/code"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://test-domain.auth0.com/oauth/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
settings = Oauth2Settings(
provider="auth0",
domain="another-domain.auth0.com",
client_id="test-client",
audience="test-audience"
)
provider = Auth0Provider(settings)
expected_url = "https://another-domain.auth0.com/oauth/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://test-domain.auth0.com/.well-known/jwks.json"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
settings = Oauth2Settings(
provider="auth0",
domain="dev.auth0.com",
client_id="test-client",
audience="test-audience"
)
provider = Auth0Provider(settings)
expected_url = "https://dev.auth0.com/.well-known/jwks.json"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://test-domain.auth0.com/"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
settings = Oauth2Settings(
provider="auth0",
domain="prod.auth0.com",
client_id="test-client",
audience="test-audience"
)
provider = Auth0Provider(settings)
expected_issuer = "https://prod.auth0.com/"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"

View File

@@ -1,141 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.entra_id import EntraIdProvider
class TestEntraIdProvider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="entra_id",
domain="tenant-id-abcdef123456",
client_id="test-client-id",
audience="test-audience",
extra={
"scope": "openid profile email api://crewai-cli-dev/read"
}
)
self.provider = EntraIdProvider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = EntraIdProvider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "entra_id"
assert provider.settings.domain == "tenant-id-abcdef123456"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
def test_get_authorize_url(self):
expected_url = "https://login.microsoftonline.com/tenant-id-abcdef123456/oauth2/v2.0/devicecode"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
# For EntraID, the domain is the tenant ID.
settings = Oauth2Settings(
provider="entra_id",
domain="my-company.entra.id",
client_id="test-client",
audience="test-audience",
)
provider = EntraIdProvider(settings)
expected_url = "https://login.microsoftonline.com/my-company.entra.id/oauth2/v2.0/devicecode"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://login.microsoftonline.com/tenant-id-abcdef123456/oauth2/v2.0/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
# For EntraID, the domain is the tenant ID.
settings = Oauth2Settings(
provider="entra_id",
domain="another-domain.entra.id",
client_id="test-client",
audience="test-audience",
)
provider = EntraIdProvider(settings)
expected_url = "https://login.microsoftonline.com/another-domain.entra.id/oauth2/v2.0/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://login.microsoftonline.com/tenant-id-abcdef123456/discovery/v2.0/keys"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
# For EntraID, the domain is the tenant ID.
settings = Oauth2Settings(
provider="entra_id",
domain="dev.entra.id",
client_id="test-client",
audience="test-audience",
)
provider = EntraIdProvider(settings)
expected_url = "https://login.microsoftonline.com/dev.entra.id/discovery/v2.0/keys"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://login.microsoftonline.com/tenant-id-abcdef123456/v2.0"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
# For EntraID, the domain is the tenant ID.
settings = Oauth2Settings(
provider="entra_id",
domain="other-tenant-id-xpto",
client_id="test-client",
audience="test-audience",
)
provider = EntraIdProvider(settings)
expected_issuer = "https://login.microsoftonline.com/other-tenant-id-xpto/v2.0"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_audience_assertion_error_when_none(self):
settings = Oauth2Settings(
provider="entra_id",
domain="test-tenant-id",
client_id="test-client-id",
audience=None,
)
provider = EntraIdProvider(settings)
with pytest.raises(ValueError, match="Audience is required"):
provider.get_audience()
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"
def test_get_required_fields(self):
assert set(self.provider.get_required_fields()) == set(["scope"])
def test_get_oauth_scopes(self):
settings = Oauth2Settings(
provider="entra_id",
domain="tenant-id-abcdef123456",
client_id="test-client-id",
audience="test-audience",
extra={
"scope": "api://crewai-cli-dev/read"
}
)
provider = EntraIdProvider(settings)
assert provider.get_oauth_scopes() == ["openid", "profile", "email", "api://crewai-cli-dev/read"]
def test_get_oauth_scopes_with_multiple_custom_scopes(self):
settings = Oauth2Settings(
provider="entra_id",
domain="tenant-id-abcdef123456",
client_id="test-client-id",
audience="test-audience",
extra={
"scope": "api://crewai-cli-dev/read api://crewai-cli-dev/write custom-scope1 custom-scope2"
}
)
provider = EntraIdProvider(settings)
assert provider.get_oauth_scopes() == ["openid", "profile", "email", "api://crewai-cli-dev/read", "api://crewai-cli-dev/write", "custom-scope1", "custom-scope2"]
def test_base_url(self):
assert self.provider._base_url() == "https://login.microsoftonline.com/tenant-id-abcdef123456"

View File

@@ -1,138 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.keycloak import KeycloakProvider
class TestKeycloakProvider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="keycloak",
domain="keycloak.example.com",
client_id="test-client-id",
audience="test-audience",
extra={
"realm": "test-realm"
}
)
self.provider = KeycloakProvider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = KeycloakProvider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "keycloak"
assert provider.settings.domain == "keycloak.example.com"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
assert provider.settings.extra.get("realm") == "test-realm"
def test_get_authorize_url(self):
expected_url = "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/auth/device"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
settings = Oauth2Settings(
provider="keycloak",
domain="auth.company.com",
client_id="test-client",
audience="test-audience",
extra={
"realm": "my-realm"
}
)
provider = KeycloakProvider(settings)
expected_url = "https://auth.company.com/realms/my-realm/protocol/openid-connect/auth/device"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
settings = Oauth2Settings(
provider="keycloak",
domain="sso.enterprise.com",
client_id="test-client",
audience="test-audience",
extra={
"realm": "enterprise-realm"
}
)
provider = KeycloakProvider(settings)
expected_url = "https://sso.enterprise.com/realms/enterprise-realm/protocol/openid-connect/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/certs"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
settings = Oauth2Settings(
provider="keycloak",
domain="identity.org",
client_id="test-client",
audience="test-audience",
extra={
"realm": "org-realm"
}
)
provider = KeycloakProvider(settings)
expected_url = "https://identity.org/realms/org-realm/protocol/openid-connect/certs"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://keycloak.example.com/realms/test-realm"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
settings = Oauth2Settings(
provider="keycloak",
domain="login.myapp.io",
client_id="test-client",
audience="test-audience",
extra={
"realm": "app-realm"
}
)
provider = KeycloakProvider(settings)
expected_issuer = "https://login.myapp.io/realms/app-realm"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"
def test_get_required_fields(self):
assert self.provider.get_required_fields() == ["realm"]
def test_oauth2_base_url(self):
assert self.provider._oauth2_base_url() == "https://keycloak.example.com"
def test_oauth2_base_url_strips_https_prefix(self):
settings = Oauth2Settings(
provider="keycloak",
domain="https://keycloak.example.com",
client_id="test-client-id",
audience="test-audience",
extra={
"realm": "test-realm"
}
)
provider = KeycloakProvider(settings)
assert provider._oauth2_base_url() == "https://keycloak.example.com"
def test_oauth2_base_url_strips_http_prefix(self):
settings = Oauth2Settings(
provider="keycloak",
domain="http://keycloak.example.com",
client_id="test-client-id",
audience="test-audience",
extra={
"realm": "test-realm"
}
)
provider = KeycloakProvider(settings)
assert provider._oauth2_base_url() == "https://keycloak.example.com"

View File

@@ -1,257 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.okta import OktaProvider
class TestOktaProvider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience="test-audience",
)
self.provider = OktaProvider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = OktaProvider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "okta"
assert provider.settings.domain == "test-domain.okta.com"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
def test_get_authorize_url(self):
expected_url = "https://test-domain.okta.com/oauth2/default/v1/device/authorize"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
settings = Oauth2Settings(
provider="okta",
domain="my-company.okta.com",
client_id="test-client",
audience="test-audience",
)
provider = OktaProvider(settings)
expected_url = "https://my-company.okta.com/oauth2/default/v1/device/authorize"
assert provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777/v1/device/authorize"
assert provider.get_authorize_url() == expected_url
def test_get_authorize_url_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/v1/device/authorize"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://test-domain.okta.com/oauth2/default/v1/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
settings = Oauth2Settings(
provider="okta",
domain="another-domain.okta.com",
client_id="test-client",
audience="test-audience",
)
provider = OktaProvider(settings)
expected_url = "https://another-domain.okta.com/oauth2/default/v1/token"
assert provider.get_token_url() == expected_url
def test_get_token_url_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777/v1/token"
assert provider.get_token_url() == expected_url
def test_get_token_url_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/v1/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://test-domain.okta.com/oauth2/default/v1/keys"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
settings = Oauth2Settings(
provider="okta",
domain="dev.okta.com",
client_id="test-client",
audience="test-audience",
)
provider = OktaProvider(settings)
expected_url = "https://dev.okta.com/oauth2/default/v1/keys"
assert provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777/v1/keys"
assert provider.get_jwks_url() == expected_url
def test_get_jwks_url_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/v1/keys"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://test-domain.okta.com/oauth2/default"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
settings = Oauth2Settings(
provider="okta",
domain="prod.okta.com",
client_id="test-client",
audience="test-audience",
)
provider = OktaProvider(settings)
expected_issuer = "https://prod.okta.com/oauth2/default"
assert provider.get_issuer() == expected_issuer
def test_get_issuer_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
expected_issuer = "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777"
assert provider.get_issuer() == expected_issuer
def test_get_issuer_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
expected_issuer = "https://test-domain.okta.com"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_audience_assertion_error_when_none(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
)
provider = OktaProvider(settings)
with pytest.raises(ValueError, match="Audience is required"):
provider.get_audience()
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"
def test_get_required_fields(self):
assert set(self.provider.get_required_fields()) == set(["authorization_server_name", "using_org_auth_server"])
def test_oauth2_base_url(self):
assert self.provider._oauth2_base_url() == "https://test-domain.okta.com/oauth2/default"
def test_oauth2_base_url_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
assert provider._oauth2_base_url() == "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777"
def test_oauth2_base_url_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
assert provider._oauth2_base_url() == "https://test-domain.okta.com/oauth2"

View File

@@ -1,100 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.workos import WorkosProvider
class TestWorkosProvider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="workos",
domain="login.company.com",
client_id="test-client-id",
audience="test-audience"
)
self.provider = WorkosProvider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = WorkosProvider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "workos"
assert provider.settings.domain == "login.company.com"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
def test_get_authorize_url(self):
expected_url = "https://login.company.com/oauth2/device_authorization"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
settings = Oauth2Settings(
provider="workos",
domain="login.example.com",
client_id="test-client",
audience="test-audience"
)
provider = WorkosProvider(settings)
expected_url = "https://login.example.com/oauth2/device_authorization"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://login.company.com/oauth2/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
settings = Oauth2Settings(
provider="workos",
domain="api.workos.com",
client_id="test-client",
audience="test-audience"
)
provider = WorkosProvider(settings)
expected_url = "https://api.workos.com/oauth2/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://login.company.com/oauth2/jwks"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
settings = Oauth2Settings(
provider="workos",
domain="auth.enterprise.com",
client_id="test-client",
audience="test-audience"
)
provider = WorkosProvider(settings)
expected_url = "https://auth.enterprise.com/oauth2/jwks"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://login.company.com"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
settings = Oauth2Settings(
provider="workos",
domain="sso.company.com",
client_id="test-client",
audience="test-audience"
)
provider = WorkosProvider(settings)
expected_issuer = "https://sso.company.com"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_audience_fallback_to_default(self):
settings = Oauth2Settings(
provider="workos",
domain="login.company.com",
client_id="test-client-id",
audience=None
)
provider = WorkosProvider(settings)
assert provider.get_audience() == ""
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"

View File

@@ -1,107 +0,0 @@
import unittest
from unittest.mock import MagicMock, patch
import jwt
from crewai_cli.authentication.utils import validate_jwt_token
@patch("crewai_core.auth.utils.PyJWKClient", return_value=MagicMock())
@patch("crewai_core.auth.utils.jwt")
class TestUtils(unittest.TestCase):
def test_validate_jwt_token(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.return_value = {"exp": 1719859200}
# Create signing key object mock with a .key attribute
mock_pyjwkclient.return_value.get_signing_key_from_jwt.return_value = MagicMock(
key="mock_signing_key"
)
jwt_token = "aaaaa.bbbbbb.cccccc" # noqa: S105
decoded_token = validate_jwt_token(
jwt_token=jwt_token,
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
mock_jwt.decode.assert_called_with(
jwt_token,
"mock_signing_key",
algorithms=["RS256"],
audience="app_id_xxxx",
issuer="https://mock_issuer",
leeway=10.0,
options={
"verify_signature": True,
"verify_exp": True,
"verify_nbf": True,
"verify_iat": True,
"require": ["exp", "iat", "iss", "aud", "sub"],
},
)
mock_pyjwkclient.assert_called_once_with("https://mock_jwks_url")
self.assertEqual(decoded_token, {"exp": 1719859200})
def test_validate_jwt_token_expired(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.ExpiredSignatureError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_invalid_audience(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.InvalidAudienceError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_invalid_issuer(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.InvalidIssuerError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_missing_required_claims(
self, mock_jwt, mock_pyjwkclient
):
mock_jwt.decode.side_effect = jwt.MissingRequiredClaimError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_jwks_error(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.exceptions.PyJWKClientError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_invalid_token(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.InvalidTokenError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)

View File

@@ -1,255 +0,0 @@
from pathlib import Path
from unittest import mock
import pytest
from click.testing import CliRunner
from crewai_cli.cli import (
deploy_create,
deploy_list,
deploy_logs,
deploy_push,
deploy_remove,
deply_status,
flow_add_crew,
login,
reset_memories,
test,
train,
version,
)
@pytest.fixture
def runner():
return CliRunner()
@mock.patch("crewai_cli.cli.train_crew")
def test_train_default_iterations(train_crew, runner):
result = runner.invoke(train)
train_crew.assert_called_once_with(5, "trained_agents_data.pkl")
assert result.exit_code == 0
assert "Training the Crew for 5 iterations" in result.output
@mock.patch("crewai_cli.cli.train_crew")
def test_train_custom_iterations(train_crew, runner):
result = runner.invoke(train, ["--n_iterations", "10"])
train_crew.assert_called_once_with(10, "trained_agents_data.pkl")
assert result.exit_code == 0
assert "Training the Crew for 10 iterations" in result.output
@mock.patch("crewai_cli.cli.train_crew")
def test_train_invalid_string_iterations(train_crew, runner):
result = runner.invoke(train, ["--n_iterations", "invalid"])
train_crew.assert_not_called()
assert result.exit_code == 2
assert (
"Usage: train [OPTIONS]\nTry 'train --help' for help.\n\nError: Invalid value for '-n' / '--n_iterations': 'invalid' is not a valid integer.\n"
in result.output
)
def test_reset_no_memory_flags(runner):
result = runner.invoke(
reset_memories,
)
assert (
result.output
== "Please specify at least one memory type to reset using the appropriate flags.\n"
)
def test_version_flag(runner):
result = runner.invoke(version)
assert result.exit_code == 0
assert "crewai version:" in result.output
def test_version_command(runner):
result = runner.invoke(version)
assert result.exit_code == 0
assert "crewai version:" in result.output
def test_version_command_with_tools(runner):
result = runner.invoke(version, ["--tools"])
assert result.exit_code == 0
assert "crewai version:" in result.output
assert (
"crewai tools version:" in result.output
or "crewai tools not installed" in result.output
)
@mock.patch("crewai_cli.cli.evaluate_crew")
def test_test_default_iterations(evaluate_crew, runner):
result = runner.invoke(test)
evaluate_crew.assert_called_once_with(3, "gpt-4o-mini", trained_agents_file=None)
assert result.exit_code == 0
assert "Testing the crew for 3 iterations with model gpt-4o-mini" in result.output
@mock.patch("crewai_cli.cli.evaluate_crew")
def test_test_custom_iterations(evaluate_crew, runner):
result = runner.invoke(test, ["--n_iterations", "5", "--model", "gpt-4o"])
evaluate_crew.assert_called_once_with(5, "gpt-4o", trained_agents_file=None)
assert result.exit_code == 0
assert "Testing the crew for 5 iterations with model gpt-4o" in result.output
@mock.patch("crewai_cli.cli.evaluate_crew")
def test_test_invalid_string_iterations(evaluate_crew, runner):
result = runner.invoke(test, ["--n_iterations", "invalid"])
evaluate_crew.assert_not_called()
assert result.exit_code == 2
assert (
"Usage: test [OPTIONS]\nTry 'test --help' for help.\n\nError: Invalid value for '-n' / '--n_iterations': 'invalid' is not a valid integer.\n"
in result.output
)
@mock.patch("crewai_cli.cli.AuthenticationCommand")
def test_login(command, runner):
mock_auth = command.return_value
result = runner.invoke(login)
assert result.exit_code == 0
mock_auth.login.assert_called_once()
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_create(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_create)
assert result.exit_code == 0
mock_deploy.create_crew.assert_called_once()
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_list(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_list)
assert result.exit_code == 0
mock_deploy.list_crews.assert_called_once()
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_push(command, runner):
mock_deploy = command.return_value
uuid = "test-uuid"
result = runner.invoke(deploy_push, ["-u", uuid])
assert result.exit_code == 0
mock_deploy.deploy.assert_called_once_with(uuid=uuid, skip_validate=False)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_push_no_uuid(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_push)
assert result.exit_code == 0
mock_deploy.deploy.assert_called_once_with(uuid=None, skip_validate=False)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_status(command, runner):
mock_deploy = command.return_value
uuid = "test-uuid"
result = runner.invoke(deply_status, ["-u", uuid])
assert result.exit_code == 0
mock_deploy.get_crew_status.assert_called_once_with(uuid=uuid)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_status_no_uuid(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deply_status)
assert result.exit_code == 0
mock_deploy.get_crew_status.assert_called_once_with(uuid=None)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_logs(command, runner):
mock_deploy = command.return_value
uuid = "test-uuid"
result = runner.invoke(deploy_logs, ["-u", uuid])
assert result.exit_code == 0
mock_deploy.get_crew_logs.assert_called_once_with(uuid=uuid)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_logs_no_uuid(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_logs)
assert result.exit_code == 0
mock_deploy.get_crew_logs.assert_called_once_with(uuid=None)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_remove(command, runner):
mock_deploy = command.return_value
uuid = "test-uuid"
result = runner.invoke(deploy_remove, ["-u", uuid])
assert result.exit_code == 0
mock_deploy.remove_crew.assert_called_once_with(uuid=uuid)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_remove_no_uuid(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_remove)
assert result.exit_code == 0
mock_deploy.remove_crew.assert_called_once_with(uuid=None)
@mock.patch("crewai_cli.add_crew_to_flow.create_embedded_crew")
@mock.patch("pathlib.Path.exists", return_value=True)
def test_flow_add_crew(mock_path_exists, mock_create_embedded_crew, runner):
crew_name = "new_crew"
result = runner.invoke(flow_add_crew, [crew_name])
assert result.exit_code == 0, f"Command failed with output: {result.output}"
assert f"Adding crew {crew_name} to the flow" in result.output
mock_create_embedded_crew.assert_called_once()
call_args, call_kwargs = mock_create_embedded_crew.call_args
assert call_args[0] == crew_name
assert "parent_folder" in call_kwargs
assert isinstance(call_kwargs["parent_folder"], Path)
def test_add_crew_to_flow_not_in_root(runner):
with mock.patch("pathlib.Path.exists", autospec=True) as mock_exists:
def exists_side_effect(self):
if self.name == "pyproject.toml":
return False
return True
mock_exists.side_effect = exists_side_effect
result = runner.invoke(flow_add_crew, ["new_crew"])
assert result.exit_code != 0
assert "This command must be run from the root of a flow project." in str(
result.output
)

View File

@@ -1,148 +0,0 @@
import json
import shutil
import tempfile
import unittest
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from crewai_cli.config import (
CLI_SETTINGS_KEYS,
DEFAULT_CLI_SETTINGS,
USER_SETTINGS_KEYS,
Settings,
)
from crewai_core.token_manager import TokenManager
class TestSettings(unittest.TestCase):
def setUp(self):
self.test_dir = Path(tempfile.mkdtemp())
self.config_path = self.test_dir / "settings.json"
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_empty_initialization(self):
settings = Settings(config_path=self.config_path)
self.assertIsNone(settings.tool_repository_username)
self.assertIsNone(settings.tool_repository_password)
def test_initialization_with_data(self):
settings = Settings(
config_path=self.config_path, tool_repository_username="user1"
)
self.assertEqual(settings.tool_repository_username, "user1")
self.assertIsNone(settings.tool_repository_password)
def test_initialization_with_existing_file(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with self.config_path.open("w") as f:
json.dump({"tool_repository_username": "file_user"}, f)
settings = Settings(config_path=self.config_path)
self.assertEqual(settings.tool_repository_username, "file_user")
def test_merge_file_and_input_data(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with self.config_path.open("w") as f:
json.dump(
{
"tool_repository_username": "file_user",
"tool_repository_password": "file_pass",
},
f,
)
settings = Settings(
config_path=self.config_path, tool_repository_username="new_user"
)
self.assertEqual(settings.tool_repository_username, "new_user")
self.assertEqual(settings.tool_repository_password, "file_pass")
def test_clear_user_settings(self):
user_settings = {key: f"value_for_{key}" for key in USER_SETTINGS_KEYS}
settings = Settings(config_path=self.config_path, **user_settings)
settings.clear_user_settings()
for key in user_settings.keys():
self.assertEqual(getattr(settings, key), None)
@patch("crewai_core.settings.TokenManager")
def test_reset_settings(self, mock_token_manager):
user_settings = {key: f"value_for_{key}" for key in USER_SETTINGS_KEYS}
cli_settings = {key: f"value_for_{key}" for key in CLI_SETTINGS_KEYS if key != "oauth2_extra"}
cli_settings["oauth2_extra"] = {"scope": "xxx", "other": "yyy"}
settings = Settings(
config_path=self.config_path, **user_settings, **cli_settings
)
mock_token_manager.return_value = MagicMock()
TokenManager().save_tokens(
"aaa.bbb.ccc", (datetime.now() + timedelta(seconds=36000)).timestamp()
)
settings.reset()
for key in user_settings.keys():
self.assertEqual(getattr(settings, key), None)
for key in cli_settings.keys():
self.assertEqual(getattr(settings, key), DEFAULT_CLI_SETTINGS.get(key))
mock_token_manager.return_value.clear_tokens.assert_called_once()
def test_dump_new_settings(self):
settings = Settings(
config_path=self.config_path, tool_repository_username="user1"
)
settings.dump()
with self.config_path.open("r") as f:
saved_data = json.load(f)
self.assertEqual(saved_data["tool_repository_username"], "user1")
def test_update_existing_settings(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with self.config_path.open("w") as f:
json.dump({"existing_setting": "value"}, f)
settings = Settings(
config_path=self.config_path, tool_repository_username="user1"
)
settings.dump()
with self.config_path.open("r") as f:
saved_data = json.load(f)
self.assertEqual(saved_data["existing_setting"], "value")
self.assertEqual(saved_data["tool_repository_username"], "user1")
def test_none_values(self):
settings = Settings(config_path=self.config_path, tool_repository_username=None)
settings.dump()
with self.config_path.open("r") as f:
saved_data = json.load(f)
self.assertIsNone(saved_data.get("tool_repository_username"))
def test_invalid_json_in_config(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with self.config_path.open("w") as f:
f.write("invalid json")
try:
settings = Settings(config_path=self.config_path)
self.assertIsNone(settings.tool_repository_username)
except json.JSONDecodeError:
self.fail("Settings initialization should handle invalid JSON")
def test_empty_config_file(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
self.config_path.touch()
settings = Settings(config_path=self.config_path)
self.assertIsNone(settings.tool_repository_username)

View File

@@ -1,20 +0,0 @@
from crewai_cli.constants import ENV_VARS, MODELS, PROVIDERS
def test_huggingface_in_providers():
"""Test that Huggingface is in the PROVIDERS list."""
assert "huggingface" in PROVIDERS
def test_huggingface_env_vars():
"""Test that Huggingface environment variables are properly configured."""
assert "huggingface" in ENV_VARS
assert any(
detail.get("key_name") == "HF_TOKEN" for detail in ENV_VARS["huggingface"]
)
def test_huggingface_models():
"""Test that Huggingface models are properly configured."""
assert "huggingface" in MODELS
assert len(MODELS["huggingface"]) > 0

View File

@@ -1,359 +0,0 @@
import os
import unittest
from unittest.mock import ANY, AsyncMock, MagicMock, patch
import pytest
from crewai_cli.plus_api import PlusAPI
class TestPlusAPI(unittest.TestCase):
def setUp(self):
self.api_key = "test_api_key"
self.api = PlusAPI(self.api_key)
self.org_uuid = "test-org-uuid"
def test_init(self):
self.assertEqual(self.api.api_key, self.api_key)
self.assertEqual(self.api.headers["Authorization"], f"Bearer {self.api_key}")
self.assertEqual(self.api.headers["Content-Type"], "application/json")
self.assertIn("CrewAI-CLI/", self.api.headers["User-Agent"])
self.assertTrue(self.api.headers["X-Crewai-Version"])
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_login_to_tool_repository(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
response = self.api.login_to_tool_repository()
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools/login", json={}
)
self.assertEqual(response, mock_response)
def assert_request_with_org_id(
self, mock_client_instance, method: str, endpoint: str, **kwargs
):
mock_client_instance.request.assert_called_once_with(
method,
f"{os.getenv('CREWAI_PLUS_URL')}{endpoint}",
headers={
"Authorization": ANY,
"Content-Type": ANY,
"User-Agent": ANY,
"X-Crewai-Version": ANY,
"X-Crewai-Organization-Id": self.org_uuid,
},
**kwargs,
)
@patch("crewai_core.plus_api.Settings")
@patch("crewai_core.plus_api.httpx.Client")
def test_login_to_tool_repository_with_org_uuid(
self, mock_client_class, mock_settings_class
):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings_class.return_value = mock_settings
self.api = PlusAPI(self.api_key)
mock_client_instance = MagicMock()
mock_response = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client_instance
response = self.api.login_to_tool_repository()
self.assert_request_with_org_id(
mock_client_instance, "POST", "/crewai_plus/api/v1/tools/login", json={}
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_get_tool(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
response = self.api.get_tool("test_tool_handle")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/tools/test_tool_handle"
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.Settings")
@patch("crewai_core.plus_api.httpx.Client")
def test_get_tool_with_org_uuid(self, mock_client_class, mock_settings_class):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings_class.return_value = mock_settings
self.api = PlusAPI(self.api_key)
mock_client_instance = MagicMock()
mock_response = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client_instance
response = self.api.get_tool("test_tool_handle")
self.assert_request_with_org_id(
mock_client_instance, "GET", "/crewai_plus/api/v1/tools/test_tool_handle"
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_publish_tool(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
handle = "test_tool_handle"
public = True
version = "1.0.0"
description = "Test tool description"
encoded_file = "encoded_test_file"
response = self.api.publish_tool(
handle, public, version, description, encoded_file
)
params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
"available_exports": None,
"tools_metadata": None,
}
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools", json=params
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.Settings")
@patch("crewai_core.plus_api.httpx.Client")
def test_publish_tool_with_org_uuid(self, mock_client_class, mock_settings_class):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings_class.return_value = mock_settings
self.api = PlusAPI(self.api_key)
mock_client_instance = MagicMock()
mock_response = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client_instance
handle = "test_tool_handle"
public = True
version = "1.0.0"
description = "Test tool description"
encoded_file = "encoded_test_file"
response = self.api.publish_tool(
handle, public, version, description, encoded_file
)
expected_params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
"available_exports": None,
"tools_metadata": None,
}
self.assert_request_with_org_id(
mock_client_instance, "POST", "/crewai_plus/api/v1/tools", json=expected_params
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_publish_tool_without_description(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
handle = "test_tool_handle"
public = False
version = "2.0.0"
description = None
encoded_file = "encoded_test_file"
response = self.api.publish_tool(
handle, public, version, description, encoded_file
)
params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
"available_exports": None,
"tools_metadata": None,
}
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools", json=params
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.httpx.Client")
def test_make_request(self, mock_client_class):
mock_client_instance = MagicMock()
mock_response = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client_instance
response = self.api._make_request("GET", "test_endpoint")
mock_client_class.assert_called_once_with(trust_env=False, verify=True)
mock_client_instance.request.assert_called_once_with(
"GET", f"{self.api.base_url}/test_endpoint", headers=self.api.headers
)
self.assertEqual(response, mock_response)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_deploy_by_name(self, mock_make_request):
self.api.deploy_by_name("test_project")
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/crews/by-name/test_project/deploy"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_deploy_by_uuid(self, mock_make_request):
self.api.deploy_by_uuid("test_uuid")
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/crews/test_uuid/deploy"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_crew_status_by_name(self, mock_make_request):
self.api.crew_status_by_name("test_project")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/crews/by-name/test_project/status"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_crew_status_by_uuid(self, mock_make_request):
self.api.crew_status_by_uuid("test_uuid")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/crews/test_uuid/status"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_crew_by_name(self, mock_make_request):
self.api.crew_by_name("test_project")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/crews/by-name/test_project/logs/deployment"
)
self.api.crew_by_name("test_project", "custom_log")
mock_make_request.assert_called_with(
"GET", "/crewai_plus/api/v1/crews/by-name/test_project/logs/custom_log"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_crew_by_uuid(self, mock_make_request):
self.api.crew_by_uuid("test_uuid")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/crews/test_uuid/logs/deployment"
)
self.api.crew_by_uuid("test_uuid", "custom_log")
mock_make_request.assert_called_with(
"GET", "/crewai_plus/api/v1/crews/test_uuid/logs/custom_log"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_delete_crew_by_name(self, mock_make_request):
self.api.delete_crew_by_name("test_project")
mock_make_request.assert_called_once_with(
"DELETE", "/crewai_plus/api/v1/crews/by-name/test_project"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_delete_crew_by_uuid(self, mock_make_request):
self.api.delete_crew_by_uuid("test_uuid")
mock_make_request.assert_called_once_with(
"DELETE", "/crewai_plus/api/v1/crews/test_uuid"
)
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_list_crews(self, mock_make_request):
self.api.list_crews()
mock_make_request.assert_called_once_with("GET", "/crewai_plus/api/v1/crews")
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_create_crew(self, mock_make_request):
payload = {"name": "test_crew"}
self.api.create_crew(payload)
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/crews", json=payload
)
@patch("crewai_core.plus_api.Settings")
@patch.dict(os.environ, {"CREWAI_PLUS_URL": ""})
def test_custom_base_url(self, mock_settings_class):
mock_settings = MagicMock()
mock_settings.enterprise_base_url = "https://custom-url.com/api"
mock_settings_class.return_value = mock_settings
custom_api = PlusAPI("test_key")
self.assertEqual(
custom_api.base_url,
"https://custom-url.com/api",
)
@patch.dict(os.environ, {"CREWAI_PLUS_URL": "https://custom-url-from-env.com"})
def test_custom_base_url_from_env(self):
custom_api = PlusAPI("test_key")
self.assertEqual(
custom_api.base_url,
"https://custom-url-from-env.com",
)
@pytest.mark.asyncio
@patch("httpx.AsyncClient")
async def test_get_agent(mock_async_client_class):
api = PlusAPI("test_api_key")
mock_response = MagicMock()
mock_client_instance = AsyncMock()
mock_client_instance.get.return_value = mock_response
mock_async_client_class.return_value.__aenter__.return_value = mock_client_instance
response = await api.get_agent("test_agent_handle")
mock_client_instance.get.assert_called_once_with(
f"{api.base_url}/crewai_plus/api/v1/agents/test_agent_handle",
headers=api.headers,
)
assert response == mock_response
@pytest.mark.asyncio
@patch("httpx.AsyncClient")
@patch("crewai_core.plus_api.Settings")
async def test_get_agent_with_org_uuid(mock_settings_class, mock_async_client_class):
org_uuid = "test-org-uuid"
mock_settings = MagicMock()
mock_settings.org_uuid = org_uuid
mock_settings.enterprise_base_url = os.getenv("CREWAI_PLUS_URL")
mock_settings_class.return_value = mock_settings
api = PlusAPI("test_api_key")
mock_response = MagicMock()
mock_client_instance = AsyncMock()
mock_client_instance.get.return_value = mock_response
mock_async_client_class.return_value.__aenter__.return_value = mock_client_instance
response = await api.get_agent("test_agent_handle")
mock_client_instance.get.assert_called_once_with(
f"{api.base_url}/crewai_plus/api/v1/agents/test_agent_handle",
headers=api.headers,
)
assert "X-Crewai-Organization-Id" in api.headers
assert api.headers["X-Crewai-Organization-Id"] == org_uuid
assert response == mock_response

View File

@@ -1,293 +0,0 @@
"""Tests for TokenManager with atomic file operations."""
import json
import tempfile
import unittest
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import patch
from cryptography.fernet import Fernet
from crewai_core.token_manager import TokenManager
class TestTokenManager(unittest.TestCase):
"""Test cases for TokenManager."""
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def setUp(self, mock_get_key: unittest.mock.MagicMock) -> None:
"""Set up test fixtures."""
mock_get_key.return_value = Fernet.generate_key()
self.token_manager = TokenManager()
@patch("crewai_core.token_manager.TokenManager._read_secure_file")
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_get_or_create_key_existing(
self,
mock_get_or_create: unittest.mock.MagicMock,
mock_read: unittest.mock.MagicMock,
) -> None:
"""Test that existing key is returned when present."""
mock_key = Fernet.generate_key()
mock_get_or_create.return_value = mock_key
token_manager = TokenManager()
result = token_manager.key
self.assertEqual(result, mock_key)
def test_get_or_create_key_new(self) -> None:
"""Test that new key is created when none exists."""
mock_key = Fernet.generate_key()
with (
patch.object(self.token_manager, "_read_secure_file", return_value=None) as mock_read,
patch.object(self.token_manager, "_atomic_create_secure_file", return_value=True) as mock_atomic_create,
patch("crewai_core.token_manager.Fernet.generate_key", return_value=mock_key) as mock_generate,
):
result = self.token_manager._get_or_create_key()
self.assertEqual(result, mock_key)
mock_read.assert_called_with("secret.key")
mock_generate.assert_called_once()
mock_atomic_create.assert_called_once_with("secret.key", mock_key)
def test_get_or_create_key_race_condition(self) -> None:
"""Test that another process's key is used when atomic create fails."""
our_key = Fernet.generate_key()
their_key = Fernet.generate_key()
with (
patch.object(self.token_manager, "_read_secure_file", side_effect=[None, their_key]) as mock_read,
patch.object(self.token_manager, "_atomic_create_secure_file", return_value=False) as mock_atomic_create,
patch("crewai_core.token_manager.Fernet.generate_key", return_value=our_key),
):
result = self.token_manager._get_or_create_key()
self.assertEqual(result, their_key)
self.assertEqual(mock_read.call_count, 2)
@patch("crewai_core.token_manager.TokenManager._atomic_write_secure_file")
def test_save_tokens(
self, mock_write: unittest.mock.MagicMock
) -> None:
"""Test saving tokens encrypts and writes atomically."""
access_token = "test_token"
expires_at = int((datetime.now() + timedelta(seconds=3600)).timestamp())
self.token_manager.save_tokens(access_token, expires_at)
mock_write.assert_called_once()
args = mock_write.call_args[0]
self.assertEqual(args[0], "tokens.enc")
decrypted_data = self.token_manager.fernet.decrypt(args[1])
data = json.loads(decrypted_data)
self.assertEqual(data["access_token"], access_token)
expiration = datetime.fromisoformat(data["expiration"])
self.assertEqual(expiration, datetime.fromtimestamp(expires_at))
@patch("crewai_core.token_manager.TokenManager._read_secure_file")
def test_get_token_valid(
self, mock_read: unittest.mock.MagicMock
) -> None:
"""Test getting a valid non-expired token."""
access_token = "test_token"
expiration = (datetime.now() + timedelta(hours=1)).isoformat()
data = {"access_token": access_token, "expiration": expiration}
encrypted_data = self.token_manager.fernet.encrypt(json.dumps(data).encode())
mock_read.return_value = encrypted_data
result = self.token_manager.get_token()
self.assertEqual(result, access_token)
@patch("crewai_core.token_manager.TokenManager._read_secure_file")
def test_get_token_expired(
self, mock_read: unittest.mock.MagicMock
) -> None:
"""Test that expired token returns None."""
access_token = "test_token"
expiration = (datetime.now() - timedelta(hours=1)).isoformat()
data = {"access_token": access_token, "expiration": expiration}
encrypted_data = self.token_manager.fernet.encrypt(json.dumps(data).encode())
mock_read.return_value = encrypted_data
result = self.token_manager.get_token()
self.assertIsNone(result)
@patch("crewai_core.token_manager.TokenManager._read_secure_file")
def test_get_token_not_found(
self, mock_read: unittest.mock.MagicMock
) -> None:
"""Test that missing token file returns None."""
mock_read.return_value = None
result = self.token_manager.get_token()
self.assertIsNone(result)
@patch("crewai_core.token_manager.TokenManager._delete_secure_file")
def test_clear_tokens(
self, mock_delete: unittest.mock.MagicMock
) -> None:
"""Test clearing tokens deletes the token file."""
self.token_manager.clear_tokens()
mock_delete.assert_called_once_with("tokens.enc")
class TestAtomicFileOperations(unittest.TestCase):
"""Test atomic file operations directly."""
def setUp(self) -> None:
"""Set up test fixtures with temp directory."""
self.temp_dir = tempfile.mkdtemp()
self.original_get_path = TokenManager._get_secure_storage_path
# Patch to use temp directory
def mock_get_path() -> Path:
return Path(self.temp_dir)
TokenManager._get_secure_storage_path = staticmethod(mock_get_path)
def tearDown(self) -> None:
"""Clean up temp directory."""
TokenManager._get_secure_storage_path = staticmethod(self.original_get_path)
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_atomic_create_new_file(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test atomic create succeeds for new file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
result = tm._atomic_create_secure_file("test.txt", b"content")
self.assertTrue(result)
file_path = Path(self.temp_dir) / "test.txt"
self.assertTrue(file_path.exists())
self.assertEqual(file_path.read_bytes(), b"content")
self.assertEqual(file_path.stat().st_mode & 0o777, 0o600)
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_atomic_create_existing_file(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test atomic create fails for existing file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
# Create file first
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"original")
result = tm._atomic_create_secure_file("test.txt", b"new content")
self.assertFalse(result)
self.assertEqual(file_path.read_bytes(), b"original")
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_atomic_write_new_file(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test atomic write creates new file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
tm._atomic_write_secure_file("test.txt", b"content")
file_path = Path(self.temp_dir) / "test.txt"
self.assertTrue(file_path.exists())
self.assertEqual(file_path.read_bytes(), b"content")
self.assertEqual(file_path.stat().st_mode & 0o777, 0o600)
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_atomic_write_overwrites(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test atomic write overwrites existing file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"original")
tm._atomic_write_secure_file("test.txt", b"new content")
self.assertEqual(file_path.read_bytes(), b"new content")
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_atomic_write_no_temp_file_on_success(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test that temp file is cleaned up after successful write."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
tm._atomic_write_secure_file("test.txt", b"content")
# Check no temp files remain
temp_files = list(Path(self.temp_dir).glob(".test.txt.*"))
self.assertEqual(len(temp_files), 0)
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_read_secure_file_exists(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test reading existing file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"content")
result = tm._read_secure_file("test.txt")
self.assertEqual(result, b"content")
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_read_secure_file_not_exists(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test reading non-existent file returns None."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
result = tm._read_secure_file("nonexistent.txt")
self.assertIsNone(result)
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_delete_secure_file_exists(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test deleting existing file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"content")
tm._delete_secure_file("test.txt")
self.assertFalse(file_path.exists())
@patch("crewai_core.token_manager.TokenManager._get_or_create_key")
def test_delete_secure_file_not_exists(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test deleting non-existent file doesn't raise."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
# Should not raise
tm._delete_secure_file("nonexistent.txt")
if __name__ == "__main__":
unittest.main()

View File

@@ -1,107 +0,0 @@
import os
import shutil
import tempfile
from pathlib import Path
import pytest
from crewai_cli import utils
@pytest.fixture
def temp_tree():
root_dir = tempfile.mkdtemp()
create_file(os.path.join(root_dir, "file1.txt"), "Hello, world!")
create_file(os.path.join(root_dir, "file2.txt"), "Another file")
os.mkdir(os.path.join(root_dir, "empty_dir"))
nested_dir = os.path.join(root_dir, "nested_dir")
os.mkdir(nested_dir)
create_file(os.path.join(nested_dir, "nested_file.txt"), "Nested content")
yield root_dir
shutil.rmtree(root_dir)
def create_file(path, content):
with open(path, "w") as f:
f.write(content)
def test_tree_find_and_replace_file_content(temp_tree):
utils.tree_find_and_replace(temp_tree, "world", "universe")
with open(os.path.join(temp_tree, "file1.txt"), "r") as f:
assert f.read() == "Hello, universe!"
def test_tree_find_and_replace_file_name(temp_tree):
old_path = os.path.join(temp_tree, "file2.txt")
new_path = os.path.join(temp_tree, "file2_renamed.txt")
os.rename(old_path, new_path)
utils.tree_find_and_replace(temp_tree, "renamed", "modified")
assert os.path.exists(os.path.join(temp_tree, "file2_modified.txt"))
assert not os.path.exists(new_path)
def test_tree_find_and_replace_directory_name(temp_tree):
utils.tree_find_and_replace(temp_tree, "empty", "renamed")
assert os.path.exists(os.path.join(temp_tree, "renamed_dir"))
assert not os.path.exists(os.path.join(temp_tree, "empty_dir"))
def test_tree_find_and_replace_nested_content(temp_tree):
utils.tree_find_and_replace(temp_tree, "Nested", "Updated")
with open(os.path.join(temp_tree, "nested_dir", "nested_file.txt"), "r") as f:
assert f.read() == "Updated content"
def test_tree_find_and_replace_no_matches(temp_tree):
utils.tree_find_and_replace(temp_tree, "nonexistent", "replacement")
assert set(os.listdir(temp_tree)) == {
"file1.txt",
"file2.txt",
"empty_dir",
"nested_dir",
}
def test_tree_copy_full_structure(temp_tree):
dest_dir = tempfile.mkdtemp()
try:
utils.tree_copy(temp_tree, dest_dir)
assert set(os.listdir(dest_dir)) == set(os.listdir(temp_tree))
assert os.path.isfile(os.path.join(dest_dir, "file1.txt"))
assert os.path.isfile(os.path.join(dest_dir, "file2.txt"))
assert os.path.isdir(os.path.join(dest_dir, "empty_dir"))
assert os.path.isdir(os.path.join(dest_dir, "nested_dir"))
assert os.path.isfile(os.path.join(dest_dir, "nested_dir", "nested_file.txt"))
finally:
shutil.rmtree(dest_dir)
def test_tree_copy_preserve_content(temp_tree):
dest_dir = tempfile.mkdtemp()
try:
utils.tree_copy(temp_tree, dest_dir)
with open(os.path.join(dest_dir, "file1.txt"), "r") as f:
assert f.read() == "Hello, world!"
with open(os.path.join(dest_dir, "nested_dir", "nested_file.txt"), "r") as f:
assert f.read() == "Nested content"
finally:
shutil.rmtree(dest_dir)
def test_tree_copy_to_existing_directory(temp_tree):
dest_dir = tempfile.mkdtemp()
try:
create_file(os.path.join(dest_dir, "existing_file.txt"), "I was here first")
utils.tree_copy(temp_tree, dest_dir)
assert os.path.isfile(os.path.join(dest_dir, "existing_file.txt"))
assert os.path.isfile(os.path.join(dest_dir, "file1.txt"))
finally:
shutil.rmtree(dest_dir)
# Tests for extract_available_exports, get_crews, get_flows, fetch_crews,
# is_valid_tool live in lib/crewai/tests/cli/test_utils.py — the canonical
# implementations are in crewai.utilities.project_utils.

View File

@@ -1,374 +0,0 @@
"""Test for version management."""
import json
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from crewai_cli.version import get_crewai_version as _get_ver
from crewai_cli.version import (
get_crewai_version,
get_latest_version_from_pypi,
is_current_version_yanked,
is_newer_version_available,
)
from crewai_core.version import (
_find_latest_non_yanked_version,
_get_cache_file,
_is_cache_valid,
_is_version_yanked,
)
def test_dynamic_versioning_consistency() -> None:
"""Test that dynamic versioning provides consistent version across all access methods."""
cli_version = get_crewai_version()
package_version = _get_ver()
assert cli_version == package_version
assert package_version is not None
assert len(package_version.strip()) > 0
class TestVersionChecking:
"""Test version checking utilities."""
def test_get_crewai_version(self) -> None:
"""Test getting current crewai version."""
version = get_crewai_version()
assert isinstance(version, str)
assert len(version) > 0
def test_get_cache_file(self) -> None:
"""Test cache file path generation."""
cache_file = _get_cache_file()
assert isinstance(cache_file, Path)
assert cache_file.name == "version_cache.json"
def test_is_cache_valid_with_fresh_cache(self) -> None:
"""Test cache validation with fresh cache."""
cache_data = {"timestamp": datetime.now().isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is True
def test_is_cache_valid_with_stale_cache(self) -> None:
"""Test cache validation with stale cache."""
old_time = datetime.now() - timedelta(hours=25)
cache_data = {"timestamp": old_time.isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
def test_is_cache_valid_with_missing_timestamp(self) -> None:
"""Test cache validation with missing timestamp."""
cache_data = {"version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
@patch("crewai_core.version.Path.exists")
@patch("crewai_core.version.request.urlopen")
def test_get_latest_version_from_pypi_success(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test successful PyPI version fetch uses releases data."""
mock_exists.return_value = False
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0": [{"yanked": False}],
"2.1.0": [{"yanked": True, "yanked_reason": "bad release"}],
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(
{"info": {"version": "2.1.0"}, "releases": releases}
).encode()
mock_urlopen.return_value.__enter__.return_value = mock_response
version = get_latest_version_from_pypi()
assert version == "2.0.0"
@patch("crewai_core.version.Path.exists")
@patch("crewai_core.version.request.urlopen")
def test_get_latest_version_from_pypi_failure(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test PyPI version fetch failure."""
from urllib.error import URLError
mock_exists.return_value = False
mock_urlopen.side_effect = URLError("Network error")
version = get_latest_version_from_pypi()
assert version is None
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version.get_latest_version_from_pypi")
def test_is_newer_version_available_true(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when newer version is available."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is True
assert current == "1.0.0"
assert latest == "2.0.0"
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version.get_latest_version_from_pypi")
def test_is_newer_version_available_false(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when no newer version is available."""
mock_current.return_value = "2.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "2.0.0"
assert latest == "2.0.0"
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version.get_latest_version_from_pypi")
def test_is_newer_version_available_with_none_latest(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when PyPI fetch fails."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = None
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "1.0.0"
assert latest is None
class TestFindLatestNonYankedVersion:
"""Test _find_latest_non_yanked_version helper."""
def test_skips_yanked_versions(self) -> None:
"""Test that yanked versions are skipped."""
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0": [{"yanked": True}],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_returns_highest_non_yanked(self) -> None:
"""Test that the highest non-yanked version is returned."""
releases = {
"1.0.0": [{"yanked": False}],
"1.5.0": [{"yanked": False}],
"2.0.0": [{"yanked": True}],
}
assert _find_latest_non_yanked_version(releases) == "1.5.0"
def test_returns_none_when_all_yanked(self) -> None:
"""Test that None is returned when all versions are yanked."""
releases = {
"1.0.0": [{"yanked": True}],
"2.0.0": [{"yanked": True}],
}
assert _find_latest_non_yanked_version(releases) is None
def test_skips_prerelease_versions(self) -> None:
"""Test that pre-release versions are skipped."""
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0a1": [{"yanked": False}],
"2.0.0rc1": [{"yanked": False}],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_skips_versions_with_empty_files(self) -> None:
"""Test that versions with no files are skipped."""
releases: dict[str, list[dict[str, bool]]] = {
"1.0.0": [{"yanked": False}],
"2.0.0": [],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_handles_invalid_version_strings(self) -> None:
"""Test that invalid version strings are skipped."""
releases = {
"1.0.0": [{"yanked": False}],
"not-a-version": [{"yanked": False}],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_partially_yanked_files_not_considered_yanked(self) -> None:
"""Test that a version with some non-yanked files is not yanked."""
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0": [{"yanked": True}, {"yanked": False}],
}
assert _find_latest_non_yanked_version(releases) == "2.0.0"
class TestIsVersionYanked:
"""Test _is_version_yanked helper."""
def test_non_yanked_version(self) -> None:
"""Test a non-yanked version returns False."""
releases = {"1.0.0": [{"yanked": False}]}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is False
assert reason == ""
def test_yanked_version_with_reason(self) -> None:
"""Test a yanked version returns True with reason."""
releases = {
"1.0.0": [{"yanked": True, "yanked_reason": "critical bug"}],
}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is True
assert reason == "critical bug"
def test_yanked_version_without_reason(self) -> None:
"""Test a yanked version returns True with empty reason."""
releases = {"1.0.0": [{"yanked": True}]}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is True
assert reason == ""
def test_unknown_version(self) -> None:
"""Test an unknown version returns False."""
releases = {"1.0.0": [{"yanked": False}]}
is_yanked, reason = _is_version_yanked("9.9.9", releases)
assert is_yanked is False
assert reason == ""
def test_partially_yanked_files(self) -> None:
"""Test a version with mixed yanked/non-yanked files is not yanked."""
releases = {
"1.0.0": [{"yanked": True}, {"yanked": False}],
}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is False
assert reason == ""
def test_multiple_yanked_files_picks_first_reason(self) -> None:
"""Test that the first available reason is returned."""
releases = {
"1.0.0": [
{"yanked": True, "yanked_reason": ""},
{"yanked": True, "yanked_reason": "second reason"},
],
}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is True
assert reason == "second reason"
class TestIsCurrentVersionYanked:
"""Test is_current_version_yanked public function."""
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version._get_cache_file")
def test_reads_from_valid_cache(
self, mock_cache_file: MagicMock, mock_version: MagicMock, tmp_path: Path
) -> None:
"""Test reading yanked status from a valid cache."""
mock_version.return_value = "1.0.0"
cache_file = tmp_path / "version_cache.json"
cache_data = {
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"current_version": "1.0.0",
"current_version_yanked": True,
"current_version_yanked_reason": "bad release",
}
cache_file.write_text(json.dumps(cache_data))
mock_cache_file.return_value = cache_file
is_yanked, reason = is_current_version_yanked()
assert is_yanked is True
assert reason == "bad release"
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version._get_cache_file")
def test_not_yanked_from_cache(
self, mock_cache_file: MagicMock, mock_version: MagicMock, tmp_path: Path
) -> None:
"""Test non-yanked status from a valid cache."""
mock_version.return_value = "2.0.0"
cache_file = tmp_path / "version_cache.json"
cache_data = {
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"current_version": "2.0.0",
"current_version_yanked": False,
"current_version_yanked_reason": "",
}
cache_file.write_text(json.dumps(cache_data))
mock_cache_file.return_value = cache_file
is_yanked, reason = is_current_version_yanked()
assert is_yanked is False
assert reason == ""
@patch("crewai_core.version.get_latest_version_from_pypi")
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version._get_cache_file")
def test_triggers_fetch_on_stale_cache(
self,
mock_cache_file: MagicMock,
mock_version: MagicMock,
mock_fetch: MagicMock,
tmp_path: Path,
) -> None:
"""Test that a stale cache triggers a re-fetch."""
mock_version.return_value = "1.0.0"
cache_file = tmp_path / "version_cache.json"
old_time = datetime.now() - timedelta(hours=25)
cache_data = {
"version": "2.0.0",
"timestamp": old_time.isoformat(),
"current_version": "1.0.0",
"current_version_yanked": True,
"current_version_yanked_reason": "old reason",
}
cache_file.write_text(json.dumps(cache_data))
mock_cache_file.return_value = cache_file
fresh_cache = {
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"current_version": "1.0.0",
"current_version_yanked": False,
"current_version_yanked_reason": "",
}
def write_fresh_cache() -> str:
cache_file.write_text(json.dumps(fresh_cache))
return "2.0.0"
mock_fetch.side_effect = lambda: write_fresh_cache()
is_yanked, reason = is_current_version_yanked()
assert is_yanked is False
mock_fetch.assert_called_once()
@patch("crewai_core.version.get_latest_version_from_pypi")
@patch("crewai_core.version.get_crewai_version")
@patch("crewai_core.version._get_cache_file")
def test_returns_false_on_fetch_failure(
self,
mock_cache_file: MagicMock,
mock_version: MagicMock,
mock_fetch: MagicMock,
tmp_path: Path,
) -> None:
"""Test that fetch failure returns not yanked."""
mock_version.return_value = "1.0.0"
cache_file = tmp_path / "version_cache.json"
mock_cache_file.return_value = cache_file
mock_fetch.return_value = None
is_yanked, reason = is_current_version_yanked()
assert is_yanked is False
assert reason == ""
# TestConsoleFormatterVersionCheck tests remain in lib/crewai/tests/cli/test_version.py
# as they depend on crewai.events.utils.console_formatter (core package).

View File

@@ -1,8 +0,0 @@
# crewai-core
Shared utilities used by both `crewai` and `crewai-cli`: version lookup, storage
paths, user-data helpers, telemetry, and the printer.
This package is a leaf — it has no dependency on the `crewai` framework — and is
pulled in transitively by `crewai` and `crewai-cli`. End users do not normally
install it directly.

View File

@@ -1,38 +0,0 @@
[project]
name = "crewai-core"
dynamic = ["version"]
description = "Shared utilities for CrewAI — version, paths, user-data, telemetry, printer."
readme = "README.md"
authors = [
{ name = "Greyson R. LaLonde", email = "greyson@crewai.com" }
]
requires-python = ">=3.10, <3.14"
dependencies = [
"appdirs~=1.4.4",
"cryptography>=42.0",
"httpx~=0.28.1",
"packaging>=23.0",
"portalocker~=2.7.0",
"pyjwt>=2.9.0,<3",
"pydantic>=2.11.9,<2.13",
"rich>=13.7.1",
"opentelemetry-api~=1.34.0",
"opentelemetry-sdk~=1.34.0",
"opentelemetry-exporter-otlp-proto-http~=1.34.0",
"tomli~=2.0.2",
]
[project.urls]
Homepage = "https://crewai.com"
Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.version]
path = "src/crewai_core/__init__.py"
[tool.hatch.build.targets.wheel]
packages = ["src/crewai_core"]

View File

@@ -1 +0,0 @@
__version__ = "1.14.5a2"

View File

@@ -1,24 +0,0 @@
"""OAuth2 authentication primitives — shared by crewai and crewai-cli."""
from __future__ import annotations
from crewai_core.auth.oauth2 import (
AuthenticationCommand as AuthenticationCommand,
Oauth2Settings as Oauth2Settings,
ProviderFactory as ProviderFactory,
)
from crewai_core.auth.token import (
AuthError as AuthError,
get_auth_token as get_auth_token,
)
from crewai_core.auth.utils import validate_jwt_token as validate_jwt_token
__all__ = [
"AuthError",
"AuthenticationCommand",
"Oauth2Settings",
"ProviderFactory",
"get_auth_token",
"validate_jwt_token",
]

View File

@@ -1,8 +0,0 @@
"""Authentication constants."""
from __future__ import annotations
from typing import Final
ALGORITHMS: Final[list[str]] = ["RS256"]

View File

@@ -1 +0,0 @@
"""OAuth2 authentication providers."""

View File

@@ -1,46 +0,0 @@
"""Base OAuth2 provider interface."""
from __future__ import annotations
from abc import ABC, abstractmethod
from crewai_core.auth.oauth2 import Oauth2Settings
class BaseProvider(ABC):
"""Abstract base class for OAuth2 providers."""
def __init__(self, settings: Oauth2Settings):
self.settings = settings
@abstractmethod
def get_authorize_url(self) -> str:
"""Return the authorization endpoint URL."""
@abstractmethod
def get_token_url(self) -> str:
"""Return the token endpoint URL."""
@abstractmethod
def get_jwks_url(self) -> str:
"""Return the JWKS endpoint URL."""
@abstractmethod
def get_issuer(self) -> str:
"""Return the OAuth issuer identifier."""
@abstractmethod
def get_audience(self) -> str:
"""Return the OAuth audience identifier."""
@abstractmethod
def get_client_id(self) -> str:
"""Return the OAuth client identifier."""
def get_required_fields(self) -> list[str]:
"""Return provider-specific keys required inside ``Oauth2Settings.extra``."""
return []
def get_oauth_scopes(self) -> list[str]:
"""Return the OAuth scopes to request."""
return ["openid", "profile", "email"]

View File

@@ -1,17 +0,0 @@
"""Authentication token retrieval."""
from __future__ import annotations
from crewai_core.token_manager import TokenManager
class AuthError(Exception):
"""Raised when authentication fails."""
def get_auth_token() -> str:
"""Return the saved authentication token; raise ``AuthError`` if missing."""
access_token = TokenManager().get_token()
if not access_token:
raise AuthError("No token found, make sure you are logged in")
return access_token

View File

@@ -1,22 +0,0 @@
"""Constants shared by both crewai and crewai-cli."""
from __future__ import annotations
from typing import Final
CREWAI_TRAINED_AGENTS_FILE_ENV: Final[str] = "CREWAI_TRAINED_AGENTS_FILE"
TRAINING_DATA_FILE: Final[str] = "training_data.pkl"
TRAINED_AGENTS_DATA_FILE: Final[str] = "trained_agents_data.pkl"
KNOWLEDGE_DIRECTORY: Final[str] = "knowledge"
MAX_FILE_NAME_LENGTH: Final[int] = 255
DEFAULT_CREWAI_ENTERPRISE_URL: Final[str] = "https://app.crewai.com"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_PROVIDER: Final[str] = "workos"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE: Final[str] = (
"client_01JNJQWBJ4SPFN3SWJM5T7BDG8"
)
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID: Final[str] = (
"client_01JYT06R59SP0NXYGD994NFXXX"
)
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN: Final[str] = "login.crewai.com"

View File

@@ -1,89 +0,0 @@
"""Centralised lock factory.
If ``REDIS_URL`` is set and the ``redis`` package is installed, locks are
distributed via ``portalocker.RedisLock``. Otherwise, falls back to the
standard file-based ``portalocker.Lock`` in the system temp dir.
"""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from functools import lru_cache
from hashlib import md5
import logging
import os
import tempfile
from typing import TYPE_CHECKING, Final
import portalocker
import portalocker.exceptions
if TYPE_CHECKING:
import redis
logger = logging.getLogger(__name__)
_REDIS_URL: str | None = os.environ.get("REDIS_URL")
_DEFAULT_TIMEOUT: Final[int] = 120
def _redis_available() -> bool:
"""Return True if redis is installed and REDIS_URL is set."""
if not _REDIS_URL:
return False
try:
import redis # noqa: F401
return True
except ImportError:
return False
@lru_cache(maxsize=1)
def _redis_connection() -> redis.Redis[bytes]:
"""Return a cached Redis connection, creating one on first call."""
from redis import Redis
if _REDIS_URL is None:
raise ValueError("REDIS_URL environment variable is not set")
return Redis.from_url(_REDIS_URL)
@contextmanager
def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]:
"""Acquire a named lock, yielding while it is held.
Args:
name: A human-readable lock name (e.g. ``"chromadb_init"``).
Automatically namespaced to avoid collisions.
timeout: Maximum seconds to wait for the lock before raising.
"""
channel = f"crewai:{md5(name.encode(), usedforsecurity=False).hexdigest()}"
if _redis_available():
with portalocker.RedisLock(
channel=channel,
connection=_redis_connection(),
timeout=timeout,
):
yield
else:
lock_dir = tempfile.gettempdir()
lock_path = os.path.join(lock_dir, f"{channel}.lock")
try:
pl = portalocker.Lock(lock_path, timeout=timeout)
pl.acquire()
except portalocker.exceptions.BaseLockException as exc:
raise portalocker.exceptions.LockException(
f"Failed to acquire lock '{name}' at {lock_path} "
f"(timeout={timeout}s). This commonly occurs in "
f"multi-process environments. "
) from exc
try:
yield
finally:
pl.release() # type: ignore[no-untyped-call]

View File

@@ -1,26 +0,0 @@
"""Path management utilities for CrewAI storage and configuration."""
from __future__ import annotations
import os
from pathlib import Path
import appdirs
def get_project_directory_name() -> str:
"""Return the current project directory name (or ``CREWAI_STORAGE_DIR``)."""
return os.environ.get("CREWAI_STORAGE_DIR", Path.cwd().name)
def db_storage_path() -> str:
"""Return the path for SQLite database / app-data storage.
Creates the directory if it does not exist.
"""
app_name = get_project_directory_name()
app_author = "CrewAI"
data_dir = Path(appdirs.user_data_dir(app_name, app_author))
data_dir.mkdir(parents=True, exist_ok=True)
return str(data_dir)

View File

@@ -1,103 +0,0 @@
"""Colored console-output utilities and the shared output-suppression flag."""
from __future__ import annotations
from contextvars import ContextVar
from typing import TYPE_CHECKING, Final, Literal, NamedTuple
if TYPE_CHECKING:
from _typeshed import SupportsWrite
_suppress_console_output: ContextVar[bool] = ContextVar(
"_suppress_console_output", default=False
)
def set_suppress_console_output(suppress: bool) -> object:
"""Toggle suppression of console output for the current context.
Returns a token that can be passed to ``ContextVar.reset`` to restore the
previous value.
"""
return _suppress_console_output.set(suppress)
def should_suppress_console_output() -> bool:
"""Return True if console output should currently be suppressed."""
return _suppress_console_output.get()
PrinterColor = Literal[
"purple",
"bold_purple",
"green",
"bold_green",
"cyan",
"bold_cyan",
"magenta",
"bold_magenta",
"yellow",
"bold_yellow",
"red",
"blue",
"bold_blue",
]
_COLOR_CODES: Final[dict[PrinterColor, str]] = {
"purple": "\033[95m",
"bold_purple": "\033[1m\033[95m",
"red": "\033[91m",
"bold_green": "\033[1m\033[92m",
"green": "\033[32m",
"blue": "\033[94m",
"bold_blue": "\033[1m\033[94m",
"yellow": "\033[93m",
"bold_yellow": "\033[1m\033[93m",
"cyan": "\033[96m",
"bold_cyan": "\033[1m\033[96m",
"magenta": "\033[35m",
"bold_magenta": "\033[1m\033[35m",
}
RESET: Final[str] = "\033[0m"
class ColoredText(NamedTuple):
"""Text plus an optional color, used for multicolor lines."""
text: str
color: PrinterColor | None
class Printer:
"""Handles colored console output formatting."""
@staticmethod
def print(
content: str | list[ColoredText],
color: PrinterColor | None = None,
sep: str | None = " ",
end: str | None = "\n",
file: SupportsWrite[str] | None = None,
flush: Literal[False] = False,
) -> None:
"""Print ``content`` with optional color, honoring suppression context."""
if should_suppress_console_output():
return
if isinstance(content, str):
content = [ColoredText(content, color)]
print(
"".join(
f"{_COLOR_CODES[c.color] if c.color else ''}{c.text}{RESET}"
for c in content
),
sep=sep,
end=end,
file=file,
flush=flush,
)
PRINTER: Printer = Printer()

View File

@@ -1,109 +0,0 @@
"""TOML / pyproject.toml utilities shared by crewai and crewai-cli."""
from __future__ import annotations
from functools import reduce
import sys
from typing import Any
from rich.console import Console
import tomli
if sys.version_info >= (3, 11):
import tomllib
console = Console()
def read_toml(file_path: str = "pyproject.toml") -> dict[str, Any]:
"""Read a TOML file from disk and return its parsed contents."""
with open(file_path, "rb") as f:
return tomli.load(f)
def parse_toml(content: str) -> dict[str, Any]:
"""Parse a TOML string and return its parsed contents."""
if sys.version_info >= (3, 11):
return tomllib.loads(content)
return tomli.loads(content)
def _get_nested_value(data: dict[str, Any], keys: list[str]) -> Any:
return reduce(dict.__getitem__, keys, data)
def _get_project_attribute(
pyproject_path: str, keys: list[str], require: bool
) -> Any | None:
"""Look up a dotted attribute path inside ``pyproject_path``.
The file must declare ``crewai`` in ``[project].dependencies`` for the
lookup to succeed (a guard against running these helpers outside a crewai
project directory). When ``require=True``, missing attributes raise
``SystemExit`` after printing a friendly error.
"""
attribute = None
try:
with open(pyproject_path, "r") as f:
pyproject_content = parse_toml(f.read())
dependencies = (
_get_nested_value(pyproject_content, ["project", "dependencies"]) or []
)
if not any(True for dep in dependencies if "crewai" in dep):
raise Exception("crewai is not in the dependencies.")
attribute = _get_nested_value(pyproject_content, keys)
except FileNotFoundError:
console.print(f"Error: {pyproject_path} not found.", style="bold red")
except KeyError:
console.print(
f"Error: {pyproject_path} is not a valid pyproject.toml file.",
style="bold red",
)
except Exception as e:
if sys.version_info >= (3, 11) and isinstance(e, tomllib.TOMLDecodeError):
console.print(
f"Error: {pyproject_path} is not a valid TOML file.", style="bold red"
)
else:
console.print(
f"Error reading the pyproject.toml file: {e}", style="bold red"
)
if require and not attribute:
console.print(
f"Unable to read '{'.'.join(keys)}' in the pyproject.toml file. "
"Please verify that the file exists and contains the specified attribute.",
style="bold red",
)
raise SystemExit
return attribute
def get_project_name(
pyproject_path: str = "pyproject.toml", require: bool = False
) -> str | None:
"""Return the project name from ``pyproject.toml``."""
return _get_project_attribute(pyproject_path, ["project", "name"], require=require)
def get_project_version(
pyproject_path: str = "pyproject.toml", require: bool = False
) -> str | None:
"""Return the project version from ``pyproject.toml``."""
return _get_project_attribute(
pyproject_path, ["project", "version"], require=require
)
def get_project_description(
pyproject_path: str = "pyproject.toml", require: bool = False
) -> str | None:
"""Return the project description from ``pyproject.toml``."""
return _get_project_attribute(
pyproject_path, ["project", "description"], require=require
)

View File

@@ -1,272 +0,0 @@
"""Anonymous telemetry collection — base implementation.
This module is the leaf telemetry layer used by both ``crewai`` (which extends
it with framework-specific spans + event-bus signal hooks) and ``crewai-cli``
(which uses it directly to emit deployment / template / flow-creation spans).
No prompts, task descriptions, agent backstories/goals, responses, or sensitive
data are collected.
"""
from __future__ import annotations
import asyncio
import atexit
from collections.abc import Callable
import contextlib
import logging
import os
import threading
from typing import Any, Final
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import Span, Status, StatusCode
from typing_extensions import Self
logger = logging.getLogger(__name__)
CREWAI_TELEMETRY_BASE_URL: Final[str] = "https://telemetry.crewai.com:4319"
CREWAI_TELEMETRY_SERVICE_NAME: Final[str] = "crewAI-telemetry"
def close_span(span: Span) -> None:
"""Set span status to OK and end it."""
span.set_status(Status(StatusCode.OK))
span.end()
@contextlib.contextmanager
def suppress_warnings() -> Any:
"""Suppress noisy warnings during otel provider setup."""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
yield
class SafeOTLPSpanExporter(OTLPSpanExporter):
"""OTLP exporter that swallows export failures so telemetry never crashes the app."""
def export(self, spans: Any) -> SpanExportResult:
try:
return super().export(spans)
except Exception as e:
logger.debug("Telemetry export failed: %s", e)
return SpanExportResult.FAILURE
class Telemetry:
"""Base telemetry: OTLP setup + the spans needed by the CLI.
crewai's runtime extends this with crew/agent/task/tool/flow execution spans
and event-bus signal handlers (see ``crewai.telemetry.telemetry``).
"""
_instance = None
_lock = threading.Lock()
def __new__(cls) -> Self:
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self) -> None:
if hasattr(self, "_initialized") and self._initialized:
return
self.ready: bool = False
self.trace_set: bool = False
self._initialized: bool = True
if self._is_telemetry_disabled():
return
try:
self.resource = Resource(
attributes={SERVICE_NAME: CREWAI_TELEMETRY_SERVICE_NAME},
)
with suppress_warnings():
self.provider = TracerProvider(resource=self.resource)
processor = BatchSpanProcessor(
SafeOTLPSpanExporter(
endpoint=f"{CREWAI_TELEMETRY_BASE_URL}/v1/traces",
timeout=30,
)
)
self.provider.add_span_processor(processor)
self._register_shutdown_handlers()
self.ready = True
except Exception as e:
if isinstance(
e,
(SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError),
):
raise
self.ready = False
@classmethod
def _is_telemetry_disabled(cls) -> bool:
return (
os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true"
or os.getenv("CREWAI_DISABLE_TELEMETRY", "false").lower() == "true"
or os.getenv("CREWAI_DISABLE_TRACKING", "false").lower() == "true"
)
def _should_execute_telemetry(self) -> bool:
return self.ready and not self._is_telemetry_disabled()
def _register_shutdown_handlers(self) -> None:
"""Register an atexit flush. Subclasses may extend with signal hooks."""
atexit.register(self._shutdown)
def _shutdown(self) -> None:
if not self.ready:
return
try:
self.provider.force_flush(timeout_millis=5000)
self.provider.shutdown()
self.ready = False
except Exception as e:
logger.debug("Telemetry shutdown failed: %s", e)
def set_tracer(self) -> None:
"""Install our TracerProvider as the global one (idempotent)."""
if self.ready and not self.trace_set:
try:
with suppress_warnings():
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception as e:
logger.debug("Failed to set tracer provider: %s", e)
self.ready = False
self.trace_set = False
def _safe_telemetry_operation(
self, operation: Callable[[], Span | None]
) -> Span | None:
"""Run a span-returning telemetry operation, swallowing failures."""
if not self._should_execute_telemetry():
return None
try:
return operation()
except Exception as e:
logger.debug("Telemetry operation failed: %s", e)
return None
def _safe_telemetry_procedure(self, operation: Callable[[], None]) -> None:
"""Run a void telemetry procedure, swallowing failures."""
if not self._should_execute_telemetry():
return
try:
operation()
except Exception as e:
logger.debug("Telemetry operation failed: %s", e)
def _add_attribute(self, span: Span | None, key: str, value: Any) -> None:
if span is None:
return
def _operation() -> None:
span.set_attribute(key, value)
self._safe_telemetry_procedure(_operation)
# --- CLI-facing spans ---------------------------------------------------
def deploy_signup_error_span(self) -> None:
"""Records when an error occurs during the deployment signup process."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Deploy Signup Error")
close_span(span)
self._safe_telemetry_procedure(_operation)
def start_deployment_span(self, uuid: str | None = None) -> None:
"""Records the start of a deployment process."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Start Deployment")
if uuid:
self._add_attribute(span, "uuid", uuid)
close_span(span)
self._safe_telemetry_procedure(_operation)
def create_crew_deployment_span(self) -> None:
"""Records the creation of a new crew deployment."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Create Crew Deployment")
close_span(span)
self._safe_telemetry_procedure(_operation)
def get_crew_logs_span(
self, uuid: str | None, log_type: str = "deployment"
) -> None:
"""Records the retrieval of crew logs."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Get Crew Logs")
self._add_attribute(span, "log_type", log_type)
if uuid:
self._add_attribute(span, "uuid", uuid)
close_span(span)
self._safe_telemetry_procedure(_operation)
def remove_crew_span(self, uuid: str | None = None) -> None:
"""Records the removal of a crew."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Remove Crew")
if uuid:
self._add_attribute(span, "uuid", uuid)
close_span(span)
self._safe_telemetry_procedure(_operation)
def flow_creation_span(self, flow_name: str) -> None:
"""Records the creation of a new flow."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Creation")
self._add_attribute(span, "flow_name", flow_name)
close_span(span)
self._safe_telemetry_procedure(_operation)
def template_installed_span(self, template_name: str) -> None:
"""Records when a template is downloaded and installed."""
from crewai_core.version import get_crewai_version
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Template Installed")
self._add_attribute(span, "crewai_version", get_crewai_version())
self._add_attribute(span, "template_name", template_name)
close_span(span)
self._safe_telemetry_procedure(_operation)

View File

@@ -1,56 +0,0 @@
"""Tool-repository credential helpers shared by crewai and crewai-cli."""
from __future__ import annotations
import os
from typing import Any
from crewai_core.project import read_toml
from crewai_core.settings import Settings
def build_env_with_tool_repository_credentials(
repository_handle: str,
) -> dict[str, Any]:
"""Return a copy of ``os.environ`` augmented with UV_INDEX_* credentials
for ``repository_handle``.
The handle is normalized to upper-case with hyphens replaced by underscores
(matching ``uv``'s env-var convention).
"""
repository_handle = repository_handle.upper().replace("-", "_")
settings = Settings()
env = os.environ.copy()
env[f"UV_INDEX_{repository_handle}_USERNAME"] = str(
settings.tool_repository_username or ""
)
env[f"UV_INDEX_{repository_handle}_PASSWORD"] = str(
settings.tool_repository_password or ""
)
return env
def build_env_with_all_tool_credentials() -> dict[str, Any]:
"""Return ``os.environ`` augmented with UV_INDEX_* credentials for every
private index referenced under ``[tool.uv.sources]`` in ``pyproject.toml``.
Errors reading ``pyproject.toml`` are swallowed — the un-augmented
environment is returned in that case.
"""
env = os.environ.copy()
try:
pyproject_data = read_toml()
sources = pyproject_data.get("tool", {}).get("uv", {}).get("sources", {})
for source_config in sources.values():
if isinstance(source_config, dict):
index = source_config.get("index")
if index:
index_env = build_env_with_tool_repository_credentials(index)
env.update(index_env)
except Exception: # noqa: S110
pass
return env

View File

@@ -1,91 +0,0 @@
"""Persistent per-user data + tracing-consent helpers.
This is the single source of truth for the ``.crewai_user.json`` file used by
both crewai (to record trace consent) and crewai-cli (to read/write it via
``crewai traces enable/disable/status``).
"""
from __future__ import annotations
import json
import logging
import os
from pathlib import Path
from typing import Any, cast
from crewai_core.lock_store import lock as store_lock
from crewai_core.paths import db_storage_path
logger = logging.getLogger(__name__)
def _user_data_file() -> Path:
base = Path(db_storage_path())
base.mkdir(parents=True, exist_ok=True)
return base / ".crewai_user.json"
def _user_data_lock_name() -> str:
"""Return a stable lock name for the user data file."""
return f"file:{os.path.realpath(_user_data_file())}"
def _load_user_data() -> dict[str, Any]:
"""Read the user-data JSON file, returning ``{}`` on missing/corrupt."""
p = _user_data_file()
if p.exists():
try:
return cast(dict[str, Any], json.loads(p.read_text()))
except (json.JSONDecodeError, OSError, PermissionError) as e:
logger.warning("Failed to load user data: %s", e)
return {}
def _save_user_data(data: dict[str, Any]) -> None:
"""Write the full user-data dict, ignoring write errors with a warning."""
try:
p = _user_data_file()
p.write_text(json.dumps(data, indent=2))
except (OSError, PermissionError) as e:
logger.warning("Failed to save user data: %s", e)
def update_user_data(updates: dict[str, Any]) -> None:
"""Atomically read-modify-write the user data file under a file lock.
Args:
updates: Key-value pairs to merge into the existing user data.
"""
try:
with store_lock(_user_data_lock_name()):
data = _load_user_data()
data.update(updates)
_save_user_data(data)
except (OSError, PermissionError) as e:
logger.warning("Failed to update user data: %s", e)
def has_user_declined_tracing() -> bool:
"""Return True if the user has explicitly declined trace collection."""
data = _load_user_data()
if data.get("first_execution_done", False):
return data.get("trace_consent", False) is False
return False
def is_tracing_enabled() -> bool:
"""Return True if tracing should currently be active.
Mirrors the runtime gate (``crewai.events.listeners.tracing.utils.
should_enable_tracing``): ``CREWAI_TRACING_ENABLED=true`` always activates;
otherwise recorded consent activates; otherwise off. Used by
``crewai traces status`` so the displayed state matches what crews and
flows actually do.
"""
if os.getenv("CREWAI_TRACING_ENABLED", "false").lower() in ("true", "1"):
return True
if has_user_declined_tracing():
return False
data = _load_user_data()
return data.get("trace_consent", False) is not False

View File

@@ -1,96 +0,0 @@
"""Smoke tests for the crewai-core leaf modules."""
from __future__ import annotations
import os
from pathlib import Path
from crewai_core import (
constants,
lock_store,
paths,
printer,
user_data,
version,
)
import pytest
def test_version_returns_string() -> None:
v = version.get_crewai_version()
assert isinstance(v, str) and v
def test_paths_creates_storage_dir(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setenv("CREWAI_STORAGE_DIR", str(tmp_path / "store"))
monkeypatch.setattr(
"crewai_core.paths.appdirs.user_data_dir",
lambda app, author: str(tmp_path / app),
)
out = paths.db_storage_path()
assert Path(out).exists()
def test_constants_exposes_env_keys() -> None:
assert constants.CREWAI_TRAINED_AGENTS_FILE_ENV == "CREWAI_TRAINED_AGENTS_FILE"
def test_printer_emits_when_not_suppressed(capsys: pytest.CaptureFixture[str]) -> None:
printer.PRINTER.print("hello", color="green")
out = capsys.readouterr().out
assert "hello" in out
def test_printer_respects_suppression(capsys: pytest.CaptureFixture[str]) -> None:
token = printer.set_suppress_console_output(True)
try:
printer.PRINTER.print("hidden")
finally:
printer._suppress_console_output.reset(token) # type: ignore[arg-type]
assert "hidden" not in capsys.readouterr().out
def test_lock_acquires_and_releases() -> None:
with lock_store.lock("crewai_core.tests.smoke", timeout=5):
pass
def test_user_data_round_trip(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("CREWAI_STORAGE_DIR", "crewai_core_test_user_data")
monkeypatch.setattr(
"crewai_core.paths.appdirs.user_data_dir",
lambda app, author: str(tmp_path / app),
)
user_data.update_user_data({"trace_consent": True, "first_execution_done": True})
data = user_data._load_user_data()
assert data == {"trace_consent": True, "first_execution_done": True}
assert user_data.has_user_declined_tracing() is False
monkeypatch.setenv("CREWAI_TRACING_ENABLED", "true")
assert user_data.is_tracing_enabled() is True
monkeypatch.delenv("CREWAI_TRACING_ENABLED", raising=False)
assert (
user_data.is_tracing_enabled() is True
) # consent alone enables (matches runtime)
def test_user_data_decline_blocks(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setenv("CREWAI_STORAGE_DIR", "crewai_core_test_decline")
monkeypatch.setattr(
"crewai_core.paths.appdirs.user_data_dir",
lambda app, author: str(tmp_path / app),
)
user_data.update_user_data({"trace_consent": False, "first_execution_done": True})
assert user_data.has_user_declined_tracing() is True
monkeypatch.delenv("CREWAI_TRACING_ENABLED", raising=False)
assert user_data.is_tracing_enabled() is False
monkeypatch.setenv("CREWAI_TRACING_ENABLED", "true")
assert user_data.is_tracing_enabled() is True # env-var override (matches runtime)
def test_unused_var_warning_silenced() -> None:
# Touch os to keep the import (used by env-var fixtures above)
assert os.environ is not None

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.5a2"
__version__ = "1.14.4a1"

Some files were not shown because too many files have changed in this diff Show More