mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-06-12 19:58:09 +00:00
Compare commits
28 Commits
fix/interp
...
flow-defin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc4890be8b | ||
|
|
b0a71bb861 | ||
|
|
5a04033d4c | ||
|
|
887adafd2c | ||
|
|
d3fc0d31f8 | ||
|
|
373dca3d04 | ||
|
|
21fa8e32d9 | ||
|
|
f18c03cd8f | ||
|
|
50b9c02272 | ||
|
|
c55334be5f | ||
|
|
05a2ba9ca4 | ||
|
|
fbafe1f0d3 | ||
|
|
5267c059f5 | ||
|
|
243c9edc1c | ||
|
|
68910b70c0 | ||
|
|
299782765c | ||
|
|
a1f44eb272 | ||
|
|
036b032ab6 | ||
|
|
f88ae54f96 | ||
|
|
b6e5d632c1 | ||
|
|
0d971e5bc5 | ||
|
|
b3f175b56f | ||
|
|
f523a7d029 | ||
|
|
f214ff4b7b | ||
|
|
a9e7c3a44f | ||
|
|
da8fe8c715 | ||
|
|
ce42994ae3 | ||
|
|
820c3905e3 |
2
.github/workflows/vulnerability-scan.yml
vendored
2
.github/workflows/vulnerability-scan.yml
vendored
@@ -64,6 +64,7 @@ jobs:
|
||||
--ignore-vuln PYSEC-2025-197 \
|
||||
--ignore-vuln PYSEC-2025-210 \
|
||||
--ignore-vuln PYSEC-2026-139 \
|
||||
--ignore-vuln GHSA-rrmf-rvhw-rf47 \
|
||||
--ignore-vuln PYSEC-2025-211 \
|
||||
--ignore-vuln PYSEC-2025-212 \
|
||||
--ignore-vuln PYSEC-2025-213 \
|
||||
@@ -81,6 +82,7 @@ jobs:
|
||||
# PYSEC-2025-183 - pyjwt 2.12.1: disputed weak-encryption claim; key length is application-chosen
|
||||
# PYSEC-2025-189..197 - torch 2.11.0: memory-corruption/DoS in functions only reachable via untrusted models; no fix available
|
||||
# PYSEC-2025-210, PYSEC-2026-139 - torch 2.11.0: profiler/deserialization issues; no fix available
|
||||
# GHSA-rrmf-rvhw-rf47 - torch 2.11.0 (CVE-2025-3000, alias of PYSEC-2025-194): memory corruption in torch.jit.script, CVSS 1.9, local-only; affected <=2.12.0, no fix available. pip-audit reports it under the GHSA id so the PYSEC ignore above does not catch it.
|
||||
# PYSEC-2025-211..218 - transformers 5.5.4: deserialization/code injection via malicious model checkpoints; no fix available
|
||||
# GHSA-f4j7-r4q5-qw2c - chromadb 1.1.1 (CVE-2026-45829): pre-auth RCE via /api/v2/tenants/{tenant}/databases/{db}/collections when trust_remote_code=true.
|
||||
# Advisory: vulnerable >=1.0.0,<=1.5.9, firstPatchedVersion=none. We only use chromadb.PersistentClient (lib/crewai/src/crewai/rag/chromadb/factory.py)
|
||||
|
||||
@@ -47,6 +47,7 @@ repos:
|
||||
--ignore-vuln PYSEC-2025-197
|
||||
--ignore-vuln PYSEC-2025-210
|
||||
--ignore-vuln PYSEC-2026-139
|
||||
--ignore-vuln GHSA-rrmf-rvhw-rf47
|
||||
--ignore-vuln PYSEC-2025-211
|
||||
--ignore-vuln PYSEC-2025-212
|
||||
--ignore-vuln PYSEC-2025-213
|
||||
|
||||
@@ -4,6 +4,126 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="11 يونيو 2026">
|
||||
## v1.14.7
|
||||
|
||||
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
|
||||
|
||||
## ما الذي تغير
|
||||
|
||||
### الميزات
|
||||
- إضافة واجهات خلفية افتراضية قابلة للتوصيل للذاكرة، والمعرفة، وrag، وflow.
|
||||
- عرض السبب الحقيقي للإنهاء، ومعلمات العينة، وresponse.id في أحداث LLM.
|
||||
- تصنيف مشغلات DSL كزخارف واعية للمسار.
|
||||
- إضافة واجهة برمجة تطبيقات الدردشة لتدفقات المحادثة.
|
||||
- جعل واجهة القفل قابلة للتجاوز.
|
||||
- بناء FlowDefinition من بيانات التعريف الخاصة بـ Flow DSL.
|
||||
- إضافة مزود LLM من Snowflake Cortex الأصلي.
|
||||
- إضافة دعم لملفات الوكلاء المدربين من crew.
|
||||
|
||||
### إصلاحات الأخطاء
|
||||
- إصلاح نقطة التحقق لإعادة بناء BaseLLM مخصص كـ LLM ملموس عند الاستعادة.
|
||||
- تقييد الاستعادة على علامة لمنع اللقطات الحية من إعادة التشغيل كاستئناف.
|
||||
- تحديد حالة وقت التشغيل لكل تشغيل للحد من النمو وعزل التشغيل المتزامن.
|
||||
- إصلاح إعدادات التتبع على crewai-login.
|
||||
- احترام suppress_flow_events لأحداث تنفيذ الطريقة.
|
||||
- استعادة [project.scripts] في حزمة crewai لتثبيت أداة uv.
|
||||
- حل مشكلات CVE الخاصة بـ pip-audit لـ aiohttp وdocling وdocling-core.
|
||||
- إصلاح إدخال الملفات الذي لا يعمل بشكل موثوق.
|
||||
- إصلاح تاريخ نتائج أدوات Snowflake Claude غير المكتملة.
|
||||
|
||||
### الوثائق
|
||||
- تحديث سجل التغييرات والإصدار لـ v1.14.7.
|
||||
- تحديث وثائق جامع OpenTelemetry.
|
||||
- تحديث دليل NVIDIA Nemotron LLM.
|
||||
- إضافة دليل تكامل Databricks.
|
||||
- إضافة دليل تكامل Snowflake.
|
||||
|
||||
### الأداء
|
||||
- تحسين سرعة استيراد crewai من خلال تحميل مستندات docling بشكل كسول.
|
||||
|
||||
### إعادة الهيكلة
|
||||
- تبسيط تقييم شروط التدفق ليكون بلا حالة لكل حدث.
|
||||
- فصل منطق المحادثة عن وقت التشغيل وإضافة تعريف المحادثة.
|
||||
- تقسيم `flow.py` إلى DSL، وتعريف، ووقت تشغيل.
|
||||
|
||||
## المساهمون
|
||||
|
||||
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="10 يونيو 2026">
|
||||
## v1.14.7rc2
|
||||
|
||||
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
|
||||
|
||||
## ما الذي تغير
|
||||
|
||||
### إصلاحات الأخطاء
|
||||
- استعادة البوابة على علامة لمنع اللقطات الحية من إعادة التشغيل كاستئناف
|
||||
|
||||
### الوثائق
|
||||
- تحديث سجل التغييرات والإصدار لـ v1.14.7rc1
|
||||
|
||||
## المساهمون
|
||||
|
||||
@greysonlalonde
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="10 يونيو 2026">
|
||||
## v1.14.7rc1
|
||||
|
||||
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
|
||||
|
||||
## ما الذي تغير
|
||||
|
||||
### الميزات
|
||||
- إضافة `reset_runtime_state` لإطلاق حالة الحافلة المتراكمة
|
||||
- التعامل مع دعم كل من الموجهات المخصصة
|
||||
- فصل منطق المحادثة عن وقت التشغيل وإضافة `conversational_definition`
|
||||
|
||||
### إصلاحات الأخطاء
|
||||
- إصلاح نطاق حالة وقت التشغيل لكل تشغيل للحد من النمو وعزل التشغيلات المتزامنة
|
||||
- إصلاح إعدادات القياس عن بُعد على `crewai-login`
|
||||
- إصلاح احترام `suppress_flow_events` لفعاليات تنفيذ الأساليب
|
||||
|
||||
### الوثائق
|
||||
- تحديث صور OpenTelemetry
|
||||
- تحديث الوثائق لتعكس الحالة الجديدة لجمع بيانات OpenTelemetry
|
||||
- تحديث سجل التغييرات والإصدار لـ v1.14.7a4
|
||||
|
||||
### إعادة الهيكلة
|
||||
- تبسيط تقييم شرط التدفق ليكون بلا حالة لكل حدث
|
||||
- تحسين دورة توجيه المحادثة مع تقليل مسار واحد
|
||||
|
||||
## المساهمون
|
||||
|
||||
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="9 يونيو 2026">
|
||||
## v1.14.7a4
|
||||
|
||||
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7a4)
|
||||
|
||||
## ما الذي تغير
|
||||
|
||||
### الميزات
|
||||
- نقل وقت التشغيل @listen/@router لقراءة من FlowDefinition
|
||||
- إضافة واجهات خلفية افتراضية قابلة للتوصيل للذاكرة، والمعرفة، وrag، وflow
|
||||
|
||||
### الوثائق
|
||||
- تحديث سجل التغييرات والإصدار لـ v1.14.7a3
|
||||
|
||||
## المساهمون
|
||||
|
||||
@greysonlalonde, @mattatcha, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="8 يونيو 2026">
|
||||
## v1.14.7a3
|
||||
|
||||
|
||||
@@ -226,6 +226,48 @@ counter=2 message='Hello from first_method - updated by second_method'
|
||||
من خلال ضمان إعادة مخرجات الدالة الأخيرة وتوفير الوصول إلى الحالة، تجعل تدفقات CrewAI من السهل دمج نتائج سير عمل الذكاء الاصطناعي في التطبيقات أو الأنظمة الأكبر،
|
||||
مع الحفاظ على الوصول إلى الحالة طوال تنفيذ التدفق.
|
||||
|
||||
## مقاييس استخدام التدفق
|
||||
|
||||
بعد اكتمال تنفيذ التدفق، يمكنك الوصول إلى الخاصية `usage_metrics` لعرض إجمالي استخدام التوكنات عبر **كل استدعاء لنموذج اللغة** يتم خلال التشغيل — بما في ذلك الاستدعاءات من كل فريق (Crew) ينظمه التدفق، والاستدعاءات داخل أدوات الـ Agents، والاستدعاءات المباشرة لـ `LLM.call(...)` من دوال التدفق. هذا هو المكافئ على جانب الـ SDK للإجماليات المعروضة في واجهة CrewAI Enterprise.
|
||||
|
||||
```python Code
|
||||
from crewai import LLM
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
|
||||
class UsageMetricsFlow(Flow):
|
||||
@start()
|
||||
def run_first_crew(self):
|
||||
self.state.first_result = FirstCrew().crew().kickoff()
|
||||
|
||||
@listen(run_first_crew)
|
||||
def call_llm_directly(self):
|
||||
# استدعاء مباشر لنموذج اللغة — يُحسب أيضًا ضمن flow.usage_metrics
|
||||
llm = LLM(model="openai/gpt-4o-mini")
|
||||
self.state.summary = llm.call("لخّص النقاط الرئيسية.")
|
||||
|
||||
@listen(call_llm_directly)
|
||||
def run_second_crew(self):
|
||||
self.state.second_result = SecondCrew().crew().kickoff()
|
||||
|
||||
flow = UsageMetricsFlow()
|
||||
flow.kickoff()
|
||||
|
||||
print(flow.usage_metrics)
|
||||
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
|
||||
# cached_prompt_tokens=0, reasoning_tokens=0,
|
||||
# cache_creation_tokens=0, successful_requests=5)
|
||||
```
|
||||
|
||||
<Note>
|
||||
`flow.usage_metrics` **ليست** نفس `flow.kickoff().token_usage`. هذه الأخيرة
|
||||
ترجع فقط `CrewOutput.token_usage` لـ **آخر** دالة `@listen` أعادت
|
||||
`CrewOutput`، مما يعني أنها تعكس فقط الفريق الأخير وتتجاهل الفرق السابقة
|
||||
وكذلك أي استدعاءات مباشرة لـ `LLM.call(...)`. استخدم `flow.usage_metrics`
|
||||
كلما احتجت إلى الإجمالي **الكامل** للتوكنات لتنفيذ التدفق.
|
||||
</Note>
|
||||
|
||||
كل حقل في [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) المُعاد هو مجموع جميع استدعاءات نموذج اللغة التي حدثت خلال استدعاء واحد لـ `flow.kickoff()`. تتم إعادة تعيين العدادات عند الاستدعاء التالي لـ `kickoff()` (وفي كل تكرار من `kickoff_for_each`)، لذلك لن تتكرر العدّات عبر التشغيلات المتتالية. يمكن قراءة هذه الخاصية بأمان في أي وقت بعد اكتمال `kickoff()`؛ قراءتها أثناء التنفيذ تُرجع المجموع الجزئي المتراكم حتى تلك اللحظة.
|
||||
|
||||
## إدارة حالة التدفق
|
||||
|
||||
إدارة الحالة بفعالية أمر بالغ الأهمية لبناء سير عمل ذكاء اصطناعي موثوق وقابل للصيانة. توفر تدفقات CrewAI آليات قوية لإدارة الحالة غير المهيكلة والمهيكلة،
|
||||
|
||||
@@ -24,15 +24,39 @@ mode: "wide"
|
||||
|
||||
1. في CrewAI AMP، انتقل إلى **Settings** > **OpenTelemetry Collectors**.
|
||||
2. انقر على **Add Collector**.
|
||||
3. اختر نوع التكامل — **OpenTelemetry Traces** أو **OpenTelemetry Logs**.
|
||||
4. هيّئ الاتصال:
|
||||
- **Endpoint** — نقطة نهاية OTLP لمجمّعك (مثل `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — اسم لتعريف هذه الخدمة في منصة المراقبة.
|
||||
- **Custom Headers** *(اختياري)* — أضف رؤوس المصادقة أو التوجيه كأزواج مفتاح-قيمة.
|
||||
- **Certificate** *(اختياري)* — قدم شهادة TLS إذا كان مجمّعك يتطلبها.
|
||||
5. انقر على **Save**.
|
||||
3. اختر تكاملاً:
|
||||
- **OpenTelemetry Traces** و**OpenTelemetry Logs** — صدّر إلى أي مجمّع أو واجهة خلفية متوافقة مع OTLP.
|
||||
- **Datadog** — أرسل التتبعات مباشرة إلى استقبال OTLP الخاص بـ Datadog، دون الحاجة إلى مجمّع منفصل أو Datadog Agent.
|
||||
4. هيّئ الاتصال. تعتمد الحقول على التكامل الذي اخترته:
|
||||
|
||||
<Frame></Frame>
|
||||
<Tabs>
|
||||
<Tab title="OpenTelemetry Traces / Logs">
|
||||
إن **OpenTelemetry Traces** و**OpenTelemetry Logs** تكاملان منفصلان يتشاركان نفس الحقول — اختر التكامل المطابق للإشارة التي تريد تصديرها.
|
||||
|
||||
- **Endpoint** — نقطة نهاية OTLP لمجمّعك (مثل `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — اسم لتعريف هذه الخدمة في منصة المراقبة.
|
||||
- **Custom Headers** *(اختياري)* — أضف رؤوس المصادقة أو التوجيه كأزواج مفتاح-قيمة.
|
||||
- **Certificate** *(اختياري)* — قدم شهادة TLS إذا كان مجمّعك يتطلبها.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
<Tab title="Datadog">
|
||||
- **Datadog Site Domain** — مضيف OTLP لموقع Datadog الخاص بك فقط، دون بروتوكول أو مسار. يقوم CrewAI ببناء نقطة نهاية HTTPS OTLP الكاملة نيابةً عنك. استخدم المضيف المطابق لـ [موقع Datadog](https://docs.datadoghq.com/getting_started/site/) الخاص بك:
|
||||
- `otlp.datadoghq.com` (US1)
|
||||
- `otlp.us3.datadoghq.com` (US3)
|
||||
- `otlp.us5.datadoghq.com` (US5)
|
||||
- `otlp.datadoghq.eu` (EU1)
|
||||
- `otlp.ap1.datadoghq.com` (AP1)
|
||||
- **API Key** — مفتاح واجهة برمجة تطبيقات Datadog الخاص بك. راجع [كيفية إنشاء واحد](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
|
||||
|
||||
يصدّر تكامل Datadog **التتبعات**.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
</Tabs>
|
||||
|
||||
5. *(اختياري)* انقر على **Test Connection** للتحقق من قدرة CrewAI على الوصول إلى نقطة النهاية باستخدام بيانات الاعتماد التي قدمتها.
|
||||
6. انقر على **Save**.
|
||||
|
||||
<Tip>
|
||||
يمكنك إضافة مجمّعات متعددة — على سبيل المثال، واحد للتتبعات وآخر للسجلات، أو الإرسال إلى واجهات خلفية مختلفة لأغراض مختلفة.
|
||||
|
||||
@@ -161,6 +161,18 @@ crew = Crew(
|
||||
)
|
||||
```
|
||||
|
||||
<Note>
|
||||
يُحتفظ بـ `agent.i18n` للتوافق مع الإصدارات السابقة فقط، وقد تم إهماله. لتخصيص المطالبات أثناء التشغيل، مرّر `prompt_file` إلى `Crew`. وللوصول البرمجي المباشر إلى شرائح المطالبات، استخدم أداة i18n مباشرة:
|
||||
</Note>
|
||||
|
||||
```python
|
||||
from crewai.utilities.i18n import get_i18n
|
||||
|
||||
i18n = get_i18n("custom_prompts.json")
|
||||
format_slice = i18n.slice("format")
|
||||
tool_prompt = i18n.tools("ask_question")
|
||||
```
|
||||
|
||||
#### الخيار 3: تعطيل مطالبات النظام لنماذج o1
|
||||
```python
|
||||
agent = Agent(
|
||||
@@ -208,6 +220,8 @@ agent = Agent(
|
||||
|
||||
يدمج CrewAI بعد ذلك تخصيصاتك مع الإعدادات الافتراضية، فلا تحتاج لإعادة تعريف كل مطالبة. إليك الطريقة:
|
||||
|
||||
بالنسبة للكود الذي يحتاج إلى قراءة شرائح المطالبات مباشرة، استخدم `crewai.utilities.i18n.get_i18n()` مع ملف المطالبات نفسه بدلًا من قراءة `agent.i18n`.
|
||||
|
||||
### مثال: تخصيص أساسي للمطالبات
|
||||
|
||||
أنشئ ملف `custom_prompts.json` بالمطالبات التي تريد تعديلها. تأكد من إدراج جميع المطالبات عالية المستوى التي يجب أن يحتويها، وليس فقط تغييراتك:
|
||||
|
||||
2137
docs/docs.json
2137
docs/docs.json
File diff suppressed because it is too large
Load Diff
@@ -4,6 +4,126 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="Jun 11, 2026">
|
||||
## v1.14.7
|
||||
|
||||
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
|
||||
|
||||
## What's Changed
|
||||
|
||||
### Features
|
||||
- Add pluggable default backends for memory, knowledge, rag, and flow.
|
||||
- Surface real finish_reason, sampling params, and response.id on LLM events.
|
||||
- Type DSL triggers as route-aware decorators.
|
||||
- Add chat API for conversational flows.
|
||||
- Make locking backend overridable.
|
||||
- Build FlowDefinition from Flow DSL metadata.
|
||||
- Add native Snowflake Cortex LLM provider.
|
||||
- Add crew trained agents file support.
|
||||
|
||||
### Bug Fixes
|
||||
- Fix checkpoint to rebuild custom BaseLLM as concrete LLM on restore.
|
||||
- Gate restore on a flag to prevent live snapshots from replaying as resume.
|
||||
- Scope runtime state per run to bound growth and isolate concurrent runs.
|
||||
- Fix telemetry setup on crewai-login.
|
||||
- Respect suppress_flow_events for method-execution events.
|
||||
- Restore [project.scripts] in crewai package for uv tool install.
|
||||
- Resolve pip-audit CVEs for aiohttp, docling, and docling-core.
|
||||
- Fix file input not working reliably.
|
||||
- Fix Snowflake Claude incomplete tool result histories.
|
||||
|
||||
### Documentation
|
||||
- Update changelog and version for v1.14.7.
|
||||
- Update OpenTelemetry collector documentation.
|
||||
- Update NVIDIA Nemotron LLM guide.
|
||||
- Add Databricks integration guide.
|
||||
- Add Snowflake integration guide.
|
||||
|
||||
### Performance
|
||||
- Improve crewai import speed by lazy-loading docling imports.
|
||||
|
||||
### Refactoring
|
||||
- Simplify flow condition evaluation to be stateless per event.
|
||||
- Decouple convo logic from runtime and add a conversational_definition.
|
||||
- Split `flow.py` into DSL, definition, and runtime.
|
||||
|
||||
## Contributors
|
||||
|
||||
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="Jun 10, 2026">
|
||||
## v1.14.7rc2
|
||||
|
||||
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
|
||||
|
||||
## What's Changed
|
||||
|
||||
### Bug Fixes
|
||||
- Gate restore on a flag to prevent live snapshots from replaying as resume
|
||||
|
||||
### Documentation
|
||||
- Update changelog and version for v1.14.7rc1
|
||||
|
||||
## Contributors
|
||||
|
||||
@greysonlalonde
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="Jun 10, 2026">
|
||||
## v1.14.7rc1
|
||||
|
||||
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
|
||||
|
||||
## What's Changed
|
||||
|
||||
### Features
|
||||
- Add `reset_runtime_state` to release accumulated bus state
|
||||
- Handle supporting both custom prompts
|
||||
- Decouple conversation logic from runtime and add a `conversational_definition`
|
||||
|
||||
### Bug Fixes
|
||||
- Fix scope of runtime state per run to bound growth and isolate concurrent runs
|
||||
- Fix telemetry setup on `crewai-login`
|
||||
- Fix respect for `suppress_flow_events` for method-execution events
|
||||
|
||||
### Documentation
|
||||
- Update OpenTelemetry images
|
||||
- Update documentation to reflect new state of OpenTelemetry collector
|
||||
- Update changelog and version for v1.14.7a4
|
||||
|
||||
### Refactoring
|
||||
- Simplify flow condition evaluation to be stateless per event
|
||||
- Improve conversation routing cycle with one less route
|
||||
|
||||
## Contributors
|
||||
|
||||
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="Jun 09, 2026">
|
||||
## v1.14.7a4
|
||||
|
||||
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7a4)
|
||||
|
||||
## What's Changed
|
||||
|
||||
### Features
|
||||
- Migrate @listen/@router runtime to read from FlowDefinition
|
||||
- Add pluggable default backends for memory, knowledge, rag, and flow
|
||||
|
||||
### Documentation
|
||||
- Update changelog and version for v1.14.7a3
|
||||
|
||||
## Contributors
|
||||
|
||||
@greysonlalonde, @mattatcha, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="Jun 08, 2026">
|
||||
## v1.14.7a3
|
||||
|
||||
|
||||
@@ -226,6 +226,49 @@ After the Flow has run, you can access the final state to see the updates made b
|
||||
By ensuring that the final method's output is returned and providing access to the state, CrewAI Flows make it easy to integrate the results of your AI workflows into larger applications or systems,
|
||||
while also maintaining and accessing the state throughout the Flow's execution.
|
||||
|
||||
## Flow Usage Metrics
|
||||
|
||||
After a Flow execution completes, you can access the `usage_metrics` property to view aggregated token usage across **every LLM call** made during the run — including calls from every Crew the Flow orchestrated, calls inside Agent tools, and bare `LLM.call(...)` invocations from Flow methods. This is the SDK-side equivalent of the totals shown in the CrewAI Enterprise UI.
|
||||
|
||||
```python Code
|
||||
from crewai import LLM
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
|
||||
class UsageMetricsFlow(Flow):
|
||||
@start()
|
||||
def run_first_crew(self):
|
||||
self.state.first_result = FirstCrew().crew().kickoff()
|
||||
|
||||
@listen(run_first_crew)
|
||||
def call_llm_directly(self):
|
||||
# Bare LLM call — still counted by flow.usage_metrics
|
||||
llm = LLM(model="openai/gpt-4o-mini")
|
||||
self.state.summary = llm.call("Summarize the key takeaways.")
|
||||
|
||||
@listen(call_llm_directly)
|
||||
def run_second_crew(self):
|
||||
self.state.second_result = SecondCrew().crew().kickoff()
|
||||
|
||||
flow = UsageMetricsFlow()
|
||||
flow.kickoff()
|
||||
|
||||
print(flow.usage_metrics)
|
||||
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
|
||||
# cached_prompt_tokens=0, reasoning_tokens=0,
|
||||
# cache_creation_tokens=0, successful_requests=5)
|
||||
```
|
||||
|
||||
<Note>
|
||||
`flow.usage_metrics` is **not** the same as `flow.kickoff().token_usage`. The
|
||||
latter returns the `CrewOutput.token_usage` of the **last** `@listen` method
|
||||
that returned a `CrewOutput`, which means it only reflects the final Crew and
|
||||
ignores prior Crews and bare `LLM.call(...)` invocations entirely. Use
|
||||
`flow.usage_metrics` whenever you need the **full** token rollup for the Flow
|
||||
execution.
|
||||
</Note>
|
||||
|
||||
Each entry in the returned [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) is the sum across all LLM calls made within a single `flow.kickoff()` invocation. Counters reset on the next `kickoff()` call (or on each iteration of `kickoff_for_each`), so successive runs don't double-count. The property is safe to read at any point after `kickoff()` completes; reading it during execution returns the partial total accumulated so far.
|
||||
|
||||
## Flow State Management
|
||||
|
||||
Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management,
|
||||
|
||||
@@ -24,15 +24,39 @@ Telemetry data follows the [OpenTelemetry GenAI semantic conventions](https://op
|
||||
|
||||
1. In CrewAI AMP, go to **Settings** > **OpenTelemetry Collectors**.
|
||||
2. Click **Add Collector**.
|
||||
3. Select an integration type — **OpenTelemetry Traces** or **OpenTelemetry Logs**.
|
||||
4. Configure the connection:
|
||||
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — A name to identify this service in your observability platform.
|
||||
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
|
||||
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
|
||||
5. Click **Save**.
|
||||
3. Select an integration:
|
||||
- **OpenTelemetry Traces** and **OpenTelemetry Logs** — export to any OTLP-compatible collector or backend.
|
||||
- **Datadog** — send traces straight to Datadog's OTLP intake, no separate collector or Datadog Agent required.
|
||||
4. Configure the connection. The fields depend on the integration you selected:
|
||||
|
||||
<Frame></Frame>
|
||||
<Tabs>
|
||||
<Tab title="OpenTelemetry Traces / Logs">
|
||||
**OpenTelemetry Traces** and **OpenTelemetry Logs** are separate integrations that share the same fields — pick the one matching the signal you want to export.
|
||||
|
||||
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — A name to identify this service in your observability platform.
|
||||
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
|
||||
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
<Tab title="Datadog">
|
||||
- **Datadog Site Domain** — Your Datadog site's OTLP host only, with no protocol or path. CrewAI builds the full HTTPS OTLP endpoint for you. Use the host that matches your [Datadog site](https://docs.datadoghq.com/getting_started/site/):
|
||||
- `otlp.datadoghq.com` (US1)
|
||||
- `otlp.us3.datadoghq.com` (US3)
|
||||
- `otlp.us5.datadoghq.com` (US5)
|
||||
- `otlp.datadoghq.eu` (EU1)
|
||||
- `otlp.ap1.datadoghq.com` (AP1)
|
||||
- **API Key** — Your Datadog API key. See [how to create one](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
|
||||
|
||||
The Datadog integration exports **traces**.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
</Tabs>
|
||||
|
||||
5. *(optional)* Click **Test Connection** to verify CrewAI can reach the endpoint with the credentials you provided.
|
||||
6. Click **Save**.
|
||||
|
||||
<Tip>
|
||||
You can add multiple collectors — for example, one for traces and another for logs, or send to different backends for different purposes.
|
||||
|
||||
@@ -161,6 +161,18 @@ crew = Crew(
|
||||
)
|
||||
```
|
||||
|
||||
<Note>
|
||||
`agent.i18n` is maintained only for backward compatibility and is deprecated. For runtime prompt customization, pass `prompt_file` to `Crew`. For programmatic access to prompt slices, use the i18n utility directly:
|
||||
</Note>
|
||||
|
||||
```python
|
||||
from crewai.utilities.i18n import get_i18n
|
||||
|
||||
i18n = get_i18n("custom_prompts.json")
|
||||
format_slice = i18n.slice("format")
|
||||
tool_prompt = i18n.tools("ask_question")
|
||||
```
|
||||
|
||||
#### Option 3: Disable System Prompts for o1 Models
|
||||
```python
|
||||
agent = Agent(
|
||||
@@ -208,6 +220,8 @@ One straightforward approach is to create a JSON file for the prompts you want t
|
||||
|
||||
CrewAI then merges your customizations with the defaults, so you don't have to redefine every prompt. Here's how:
|
||||
|
||||
For code that needs to read prompt slices directly, use `crewai.utilities.i18n.get_i18n()` with the same prompt file instead of reading `agent.i18n`.
|
||||
|
||||
### Example: Basic Prompt Customization
|
||||
|
||||
Create a `custom_prompts.json` file with the prompts you want to modify. Ensure you list all top-level prompts it should contain, not just your changes:
|
||||
|
||||
BIN
docs/images/crewai-otel-collector-datadog.png
Normal file
BIN
docs/images/crewai-otel-collector-datadog.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 455 KiB |
BIN
docs/images/crewai-otel-collector-opentelemetry.png
Normal file
BIN
docs/images/crewai-otel-collector-opentelemetry.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 420 KiB |
@@ -4,6 +4,126 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="2026년 6월 11일">
|
||||
## v1.14.7
|
||||
|
||||
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
|
||||
|
||||
## 변경 사항
|
||||
|
||||
### 기능
|
||||
- 메모리, 지식, RAG 및 흐름에 대한 플러그 가능한 기본 백엔드를 추가했습니다.
|
||||
- LLM 이벤트에서 실제 finish_reason, 샘플링 매개변수 및 response.id를 표시합니다.
|
||||
- 경로 인식 장식자로서의 타입 DSL 트리거를 설정합니다.
|
||||
- 대화 흐름을 위한 채팅 API를 추가했습니다.
|
||||
- 잠금 백엔드를 재정의 가능하도록 만듭니다.
|
||||
- Flow DSL 메타데이터에서 FlowDefinition을 빌드합니다.
|
||||
- 네이티브 Snowflake Cortex LLM 공급자를 추가했습니다.
|
||||
- 훈련된 에이전트 파일 지원을 추가했습니다.
|
||||
|
||||
### 버그 수정
|
||||
- 복원 시 사용자 정의 BaseLLM을 구체적인 LLM으로 재구성하도록 체크포인트를 수정했습니다.
|
||||
- 라이브 스냅샷이 재개로 재생되지 않도록 플래그를 사용하여 복원을 제한합니다.
|
||||
- 실행마다 런타임 상태의 범위를 설정하여 성장을 제한하고 동시 실행을 격리합니다.
|
||||
- crewai-login에서 텔레메트리 설정을 수정했습니다.
|
||||
- 메서드 실행 이벤트에 대해 suppress_flow_events를 존중합니다.
|
||||
- uv 도구 설치를 위해 crewai 패키지에서 [project.scripts]를 복원합니다.
|
||||
- aiohttp, docling 및 docling-core에 대한 pip-audit CVE를 해결합니다.
|
||||
- 파일 입력이 신뢰할 수 없게 작동하는 문제를 수정했습니다.
|
||||
- Snowflake Claude의 불완전한 도구 결과 기록을 수정했습니다.
|
||||
|
||||
### 문서
|
||||
- v1.14.7에 대한 변경 로그 및 버전을 업데이트했습니다.
|
||||
- OpenTelemetry 수집기 문서를 업데이트했습니다.
|
||||
- NVIDIA Nemotron LLM 가이드를 업데이트했습니다.
|
||||
- Databricks 통합 가이드를 추가했습니다.
|
||||
- Snowflake 통합 가이드를 추가했습니다.
|
||||
|
||||
### 성능
|
||||
- docling 가져오기를 지연 로딩하여 crewai 가져오기 속도를 개선했습니다.
|
||||
|
||||
### 리팩토링
|
||||
- 흐름 조건 평가를 이벤트별로 상태 비저장으로 단순화했습니다.
|
||||
- 대화 논리를 런타임에서 분리하고 conversational_definition을 추가했습니다.
|
||||
- `flow.py`를 DSL, 정의 및 런타임으로 분리했습니다.
|
||||
|
||||
## 기여자
|
||||
|
||||
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="2026년 6월 10일">
|
||||
## v1.14.7rc2
|
||||
|
||||
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
|
||||
|
||||
## 변경 사항
|
||||
|
||||
### 버그 수정
|
||||
- 라이브 스냅샷이 재개로 재생되는 것을 방지하기 위한 플래그에서 게이트 복원
|
||||
|
||||
### 문서
|
||||
- v1.14.7rc1에 대한 변경 로그 및 버전 업데이트
|
||||
|
||||
## 기여자
|
||||
|
||||
@greysonlalonde
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="2026년 6월 10일">
|
||||
## v1.14.7rc1
|
||||
|
||||
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
|
||||
|
||||
## 변경 사항
|
||||
|
||||
### 기능
|
||||
- 누적된 버스 상태를 해제하기 위해 `reset_runtime_state` 추가
|
||||
- 사용자 정의 프롬프트를 모두 지원하도록 처리
|
||||
- 대화 논리를 런타임과 분리하고 `conversational_definition` 추가
|
||||
|
||||
### 버그 수정
|
||||
- 실행당 런타임 상태의 범위를 수정하여 성장 제한 및 동시 실행 격리
|
||||
- `crewai-login`에서 원격 측정 설정 수정
|
||||
- 메서드 실행 이벤트에 대한 `suppress_flow_events` 존중 수정
|
||||
|
||||
### 문서
|
||||
- OpenTelemetry 이미지 업데이트
|
||||
- OpenTelemetry 수집기의 새로운 상태를 반영하도록 문서 업데이트
|
||||
- v1.14.7a4에 대한 변경 로그 및 버전 업데이트
|
||||
|
||||
### 리팩토링
|
||||
- 이벤트당 상태 비저장 방식으로 흐름 조건 평가 단순화
|
||||
- 경로를 하나 줄여 대화 라우팅 사이클 개선
|
||||
|
||||
## 기여자
|
||||
|
||||
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="2026년 6월 9일">
|
||||
## v1.14.7a4
|
||||
|
||||
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7a4)
|
||||
|
||||
## 변경 사항
|
||||
|
||||
### 기능
|
||||
- @listen/@router 런타임을 FlowDefinition에서 읽도록 마이그레이션
|
||||
- 메모리, 지식, rag 및 flow에 대한 플러그형 기본 백엔드 추가
|
||||
|
||||
### 문서
|
||||
- v1.14.7a3에 대한 변경 로그 및 버전 업데이트
|
||||
|
||||
## 기여자
|
||||
|
||||
@greysonlalonde, @mattatcha, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="2026년 6월 8일">
|
||||
## v1.14.7a3
|
||||
|
||||
|
||||
@@ -221,6 +221,48 @@ Flow가 실행된 후, 이러한 메소드들에 의해 수행된 업데이트
|
||||
최종 메소드의 출력이 반환되고 상태에 접근할 수 있도록 함으로써, CrewAI Flow는 AI 워크플로우의 결과를 더 큰 애플리케이션이나 시스템에 쉽게 통합할 수 있게 하며,
|
||||
Flow 실행 과정 전반에 걸쳐 상태를 유지하고 접근하면서도 이를 용이하게 만듭니다.
|
||||
|
||||
## 플로우 사용 메트릭
|
||||
|
||||
Flow 실행이 완료된 후, `usage_metrics` 속성에 접근하여 실행 동안 발생한 **모든 LLM 호출**의 토큰 사용량 집계를 확인할 수 있습니다. 여기에는 Flow가 오케스트레이션한 모든 Crew의 호출, Agent의 도구 내부에서 발생한 호출, 그리고 Flow 메서드에서 직접 호출한 `LLM.call(...)`이 모두 포함됩니다. 이는 CrewAI Enterprise UI에 표시되는 총량과 동등한 SDK 측 값입니다.
|
||||
|
||||
```python Code
|
||||
from crewai import LLM
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
|
||||
class UsageMetricsFlow(Flow):
|
||||
@start()
|
||||
def run_first_crew(self):
|
||||
self.state.first_result = FirstCrew().crew().kickoff()
|
||||
|
||||
@listen(run_first_crew)
|
||||
def call_llm_directly(self):
|
||||
# 직접 LLM 호출 — flow.usage_metrics에서도 집계됩니다
|
||||
llm = LLM(model="openai/gpt-4o-mini")
|
||||
self.state.summary = llm.call("핵심 내용을 요약해 주세요.")
|
||||
|
||||
@listen(call_llm_directly)
|
||||
def run_second_crew(self):
|
||||
self.state.second_result = SecondCrew().crew().kickoff()
|
||||
|
||||
flow = UsageMetricsFlow()
|
||||
flow.kickoff()
|
||||
|
||||
print(flow.usage_metrics)
|
||||
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
|
||||
# cached_prompt_tokens=0, reasoning_tokens=0,
|
||||
# cache_creation_tokens=0, successful_requests=5)
|
||||
```
|
||||
|
||||
<Note>
|
||||
`flow.usage_metrics`는 `flow.kickoff().token_usage`와 **동일하지 않습니다**.
|
||||
후자는 `CrewOutput`을 반환한 **마지막** `@listen` 메서드의
|
||||
`CrewOutput.token_usage`만 반환하므로, 이전에 실행된 Crew들과 Flow 메서드에서
|
||||
직접 호출한 `LLM.call(...)`은 전혀 포함되지 않습니다. Flow 실행에 대한
|
||||
**전체** 토큰 집계가 필요할 때는 항상 `flow.usage_metrics`를 사용하십시오.
|
||||
</Note>
|
||||
|
||||
반환되는 [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py)의 각 항목은 단일 `flow.kickoff()` 실행 동안 발생한 모든 LLM 호출의 합계입니다. 다음 `kickoff()` 호출(및 `kickoff_for_each`의 각 반복)에서 카운터가 초기화되므로 연속 실행이 이중으로 집계되지 않습니다. 이 속성은 `kickoff()` 완료 후 언제든지 안전하게 읽을 수 있으며, 실행 중에 읽으면 그 시점까지 누적된 부분 합계를 반환합니다.
|
||||
|
||||
## 플로우 상태 관리
|
||||
|
||||
상태를 효과적으로 관리하는 것은 신뢰할 수 있고 유지 보수가 용이한 AI 워크플로를 구축하는 데 매우 중요합니다. CrewAI 플로우는 비정형 및 정형 상태 관리를 위한 강력한 메커니즘을 제공하여, 개발자가 자신의 애플리케이션에 가장 적합한 접근 방식을 선택할 수 있도록 합니다.
|
||||
|
||||
@@ -24,15 +24,39 @@ CrewAI AMP는 배포에서 OpenTelemetry **트레이스**와 **로그**를 자
|
||||
|
||||
1. CrewAI AMP에서 **Settings** > **OpenTelemetry Collectors**로 이동합니다.
|
||||
2. **Add Collector**를 클릭합니다.
|
||||
3. 통합 유형을 선택합니다 — **OpenTelemetry Traces** 또는 **OpenTelemetry Logs**.
|
||||
4. 연결을 구성합니다:
|
||||
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
|
||||
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
|
||||
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
|
||||
5. **Save**를 클릭합니다.
|
||||
3. 통합을 선택합니다:
|
||||
- **OpenTelemetry Traces** 및 **OpenTelemetry Logs** — OTLP 호환 수집기 또는 백엔드로 내보냅니다.
|
||||
- **Datadog** — 별도의 수집기나 Datadog Agent 없이 트레이스를 Datadog의 OTLP 인테이크로 직접 전송합니다.
|
||||
4. 연결을 구성합니다. 필드는 선택한 통합에 따라 달라집니다:
|
||||
|
||||
<Frame></Frame>
|
||||
<Tabs>
|
||||
<Tab title="OpenTelemetry Traces / Logs">
|
||||
**OpenTelemetry Traces**와 **OpenTelemetry Logs**는 동일한 필드를 공유하는 별개의 통합입니다 — 내보내려는 신호에 맞는 것을 선택하세요.
|
||||
|
||||
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
|
||||
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
|
||||
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
<Tab title="Datadog">
|
||||
- **Datadog Site Domain** — Datadog 사이트의 OTLP 호스트만 입력합니다 (프로토콜이나 경로 제외). CrewAI가 전체 HTTPS OTLP 엔드포인트를 자동으로 구성합니다. [Datadog 사이트](https://docs.datadoghq.com/getting_started/site/)에 맞는 호스트를 사용하세요:
|
||||
- `otlp.datadoghq.com` (US1)
|
||||
- `otlp.us3.datadoghq.com` (US3)
|
||||
- `otlp.us5.datadoghq.com` (US5)
|
||||
- `otlp.datadoghq.eu` (EU1)
|
||||
- `otlp.ap1.datadoghq.com` (AP1)
|
||||
- **API Key** — Datadog API 키입니다. [키 생성 방법](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys)을 참고하세요.
|
||||
|
||||
Datadog 통합은 **트레이스**를 내보냅니다.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
</Tabs>
|
||||
|
||||
5. *(선택 사항)* **Test Connection**을 클릭하여 제공한 자격 증명으로 CrewAI가 엔드포인트에 연결할 수 있는지 확인합니다.
|
||||
6. **Save**를 클릭합니다.
|
||||
|
||||
<Tip>
|
||||
여러 수집기를 추가할 수 있습니다 — 예를 들어, 트레이스용 하나와 로그용 하나를 추가하거나, 다른 목적을 위해 다른 백엔드로 전송할 수 있습니다.
|
||||
|
||||
@@ -161,6 +161,18 @@ crew = Crew(
|
||||
)
|
||||
```
|
||||
|
||||
<Note>
|
||||
`agent.i18n`은 이전 버전과의 호환성을 위해서만 유지되며 사용이 중단될 예정입니다. 런타임 프롬프트 커스터마이징에는 `Crew`에 `prompt_file`을 전달하세요. 프롬프트 슬라이스를 코드에서 직접 읽어야 한다면 i18n 유틸리티를 직접 사용하세요:
|
||||
</Note>
|
||||
|
||||
```python
|
||||
from crewai.utilities.i18n import get_i18n
|
||||
|
||||
i18n = get_i18n("custom_prompts.json")
|
||||
format_slice = i18n.slice("format")
|
||||
tool_prompt = i18n.tools("ask_question")
|
||||
```
|
||||
|
||||
#### 옵션 3: o1 모델에 대한 시스템 프롬프트 비활성화
|
||||
```python
|
||||
agent = Agent(
|
||||
@@ -208,6 +220,8 @@ agent = Agent(
|
||||
|
||||
그러면 CrewAI가 기본값과 사용자가 지정한 내용을 병합하므로, 모든 프롬프트를 다시 정의할 필요가 없습니다. 방법은 다음과 같습니다:
|
||||
|
||||
프롬프트 슬라이스를 코드에서 직접 읽어야 하는 경우에는 `agent.i18n`을 읽는 대신 동일한 프롬프트 파일로 `crewai.utilities.i18n.get_i18n()`을 사용하세요.
|
||||
|
||||
### 예시: 기본 프롬프트 커스터마이징
|
||||
|
||||
수정하고 싶은 프롬프트를 포함하는 `custom_prompts.json` 파일을 생성하세요. 변경 사항만이 아니라 포함해야 하는 모든 최상위 프롬프트를 반드시 나열해야 합니다:
|
||||
@@ -314,4 +328,4 @@ CrewAI에서의 저수준 prompt 커스터마이제이션은 매우 맞춤화되
|
||||
|
||||
<Check>
|
||||
이제 CrewAI에서 고급 prompt 커스터마이징을 위한 기초를 갖추었습니다. 모델별 구조나 도메인별 제약에 맞춰 적용하든, 이러한 저수준 접근 방식은 agent 상호작용을 매우 전문적으로 조정할 수 있게 해줍니다.
|
||||
</Check>
|
||||
</Check>
|
||||
|
||||
@@ -4,6 +4,126 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="11 jun 2026">
|
||||
## v1.14.7
|
||||
|
||||
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
|
||||
|
||||
## O que Mudou
|
||||
|
||||
### Recursos
|
||||
- Adicionar backends padrão plugáveis para memória, conhecimento, rag e fluxo.
|
||||
- Exibir o verdadeiro finish_reason, parâmetros de amostragem e response.id em eventos LLM.
|
||||
- Tipar os gatilhos DSL como decoradores cientes de rotas.
|
||||
- Adicionar API de chat para fluxos de conversa.
|
||||
- Tornar o backend de bloqueio substituível.
|
||||
- Construir FlowDefinition a partir de metadados Flow DSL.
|
||||
- Adicionar provedor nativo Snowflake Cortex LLM.
|
||||
- Adicionar suporte a arquivos de agentes treinados pela equipe.
|
||||
|
||||
### Correções de Bugs
|
||||
- Corrigir checkpoint para reconstruir BaseLLM personalizado como LLM concreto na restauração.
|
||||
- Controlar a restauração com uma flag para evitar que snapshots ao vivo sejam reproduzidos como retomar.
|
||||
- Escopar o estado de execução por execução para limitar o crescimento e isolar execuções concorrentes.
|
||||
- Corrigir configuração de telemetria no crewai-login.
|
||||
- Respeitar suppress_flow_events para eventos de execução de método.
|
||||
- Restaurar [project.scripts] no pacote crewai para instalação da ferramenta uv.
|
||||
- Resolver CVEs de pip-audit para aiohttp, docling e docling-core.
|
||||
- Corrigir entrada de arquivo que não estava funcionando de forma confiável.
|
||||
- Corrigir histórias de resultados de ferramentas incompletas do Snowflake Claude.
|
||||
|
||||
### Documentação
|
||||
- Atualizar changelog e versão para v1.14.7.
|
||||
- Atualizar documentação do coletor OpenTelemetry.
|
||||
- Atualizar guia do LLM NVIDIA Nemotron.
|
||||
- Adicionar guia de integração do Databricks.
|
||||
- Adicionar guia de integração do Snowflake.
|
||||
|
||||
### Desempenho
|
||||
- Melhorar a velocidade de importação do crewai através do carregamento preguiçoso de imports do docling.
|
||||
|
||||
### Refatoração
|
||||
- Simplificar a avaliação de condições de fluxo para ser sem estado por evento.
|
||||
- Desacoplar a lógica de conversa da execução e adicionar uma conversational_definition.
|
||||
- Dividir `flow.py` em DSL, definição e execução.
|
||||
|
||||
## Contribuidores
|
||||
|
||||
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="10 jun 2026">
|
||||
## v1.14.7rc2
|
||||
|
||||
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
|
||||
|
||||
## O que Mudou
|
||||
|
||||
### Correções de Bugs
|
||||
- Restauração de portão em uma flag para evitar que snapshots ao vivo sejam reproduzidos como retomar
|
||||
|
||||
### Documentação
|
||||
- Atualizar changelog e versão para v1.14.7rc1
|
||||
|
||||
## Contributors
|
||||
|
||||
@greysonlalonde
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="10 jun 2026">
|
||||
## v1.14.7rc1
|
||||
|
||||
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
|
||||
|
||||
## O que Mudou
|
||||
|
||||
### Recursos
|
||||
- Adicionar `reset_runtime_state` para liberar o estado acumulado do barramento
|
||||
- Lidar com suporte a ambos os prompts personalizados
|
||||
- Desacoplar a lógica de conversa do tempo de execução e adicionar uma `conversational_definition`
|
||||
|
||||
### Correções de Bugs
|
||||
- Corrigir o escopo do estado de tempo de execução por execução para limitar o crescimento e isolar execuções concorrentes
|
||||
- Corrigir a configuração de telemetria em `crewai-login`
|
||||
- Corrigir o respeito a `suppress_flow_events` para eventos de execução de método
|
||||
|
||||
### Documentação
|
||||
- Atualizar imagens do OpenTelemetry
|
||||
- Atualizar a documentação para refletir o novo estado do coletor OpenTelemetry
|
||||
- Atualizar o changelog e a versão para v1.14.7a4
|
||||
|
||||
### Refatoração
|
||||
- Simplificar a avaliação da condição de fluxo para ser sem estado por evento
|
||||
- Melhorar o ciclo de roteamento de conversas com uma rota a menos
|
||||
|
||||
## Contribuidores
|
||||
|
||||
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="09 jun 2026">
|
||||
## v1.14.7a4
|
||||
|
||||
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7a4)
|
||||
|
||||
## O Que Mudou
|
||||
|
||||
### Funcionalidades
|
||||
- Migrar a execução @listen/@router para ler a partir de FlowDefinition
|
||||
- Adicionar backends padrão plugáveis para memória, conhecimento, rag e flow
|
||||
|
||||
### Documentação
|
||||
- Atualizar changelog e versão para v1.14.7a3
|
||||
|
||||
## Contributors
|
||||
|
||||
@greysonlalonde, @mattatcha, @vinibrsl
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="08 jun 2026">
|
||||
## v1.14.7a3
|
||||
|
||||
|
||||
@@ -219,6 +219,49 @@ Após o término da execução, é possível acessar o estado final e observar a
|
||||
Ao garantir que a saída do método final seja retornada e oferecer acesso ao estado, o CrewAI Flows facilita a integração dos resultados dos seus workflows de IA em aplicações maiores,
|
||||
além de permitir o gerenciamento e o acesso ao estado durante toda a execução do Flow.
|
||||
|
||||
## Métricas de Uso do Flow
|
||||
|
||||
Após a execução de um Flow, você pode acessar a propriedade `usage_metrics` para visualizar o consumo agregado de tokens em **todas as chamadas de LLM** realizadas durante a execução — incluindo chamadas das Crews orquestradas pelo Flow, chamadas dentro de tools de Agents, e invocações diretas de `LLM.call(...)` feitas a partir de métodos do Flow. Esse é o equivalente, do lado do SDK, ao total exibido na interface do CrewAI Enterprise.
|
||||
|
||||
```python Code
|
||||
from crewai import LLM
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
|
||||
class UsageMetricsFlow(Flow):
|
||||
@start()
|
||||
def run_first_crew(self):
|
||||
self.state.first_result = FirstCrew().crew().kickoff()
|
||||
|
||||
@listen(run_first_crew)
|
||||
def call_llm_directly(self):
|
||||
# Chamada direta de LLM — também contabilizada por flow.usage_metrics
|
||||
llm = LLM(model="openai/gpt-4o-mini")
|
||||
self.state.summary = llm.call("Resuma os principais pontos.")
|
||||
|
||||
@listen(call_llm_directly)
|
||||
def run_second_crew(self):
|
||||
self.state.second_result = SecondCrew().crew().kickoff()
|
||||
|
||||
flow = UsageMetricsFlow()
|
||||
flow.kickoff()
|
||||
|
||||
print(flow.usage_metrics)
|
||||
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
|
||||
# cached_prompt_tokens=0, reasoning_tokens=0,
|
||||
# cache_creation_tokens=0, successful_requests=5)
|
||||
```
|
||||
|
||||
<Note>
|
||||
`flow.usage_metrics` **não** é o mesmo que `flow.kickoff().token_usage`. Este
|
||||
último retorna apenas o `CrewOutput.token_usage` do **último** método
|
||||
`@listen` que retornou um `CrewOutput`, ou seja, reflete somente a Crew
|
||||
final e ignora completamente as Crews anteriores e quaisquer chamadas
|
||||
diretas de `LLM.call(...)`. Use `flow.usage_metrics` sempre que precisar do
|
||||
rollup **completo** de tokens da execução do Flow.
|
||||
</Note>
|
||||
|
||||
Cada campo do [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) retornado representa a soma de todas as chamadas de LLM feitas em uma única invocação de `flow.kickoff()`. Os contadores são resetados a cada novo `kickoff()` (e em cada iteração de `kickoff_for_each`), de modo que execuções sucessivas não duplicam o total. A propriedade é segura para ser lida em qualquer momento após o `kickoff()`; lê-la durante a execução retorna o total parcial acumulado até aquele instante.
|
||||
|
||||
## Gerenciamento de Estado em Flows
|
||||
|
||||
Gerenciar o estado de forma eficaz é fundamental para construir fluxos de trabalho de IA confiáveis e de fácil manutenção. O CrewAI Flows oferece mecanismos robustos para o gerenciamento de estado tanto não estruturado quanto estruturado,
|
||||
|
||||
@@ -24,15 +24,39 @@ Os dados de telemetria seguem as [convenções semânticas GenAI do OpenTelemetr
|
||||
|
||||
1. No CrewAI AMP, vá para **Settings** > **OpenTelemetry Collectors**.
|
||||
2. Clique em **Add Collector**.
|
||||
3. Selecione um tipo de integração — **OpenTelemetry Traces** ou **OpenTelemetry Logs**.
|
||||
4. Configure a conexão:
|
||||
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
|
||||
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
|
||||
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
|
||||
5. Clique em **Save**.
|
||||
3. Selecione uma integração:
|
||||
- **OpenTelemetry Traces** e **OpenTelemetry Logs** — exporte para qualquer coletor ou backend compatível com OTLP.
|
||||
- **Datadog** — envie traces diretamente para a ingestão OTLP do Datadog, sem precisar de um coletor separado ou do Datadog Agent.
|
||||
4. Configure a conexão. Os campos dependem da integração selecionada:
|
||||
|
||||
<Frame></Frame>
|
||||
<Tabs>
|
||||
<Tab title="OpenTelemetry Traces / Logs">
|
||||
**OpenTelemetry Traces** e **OpenTelemetry Logs** são integrações separadas que compartilham os mesmos campos — escolha a que corresponde ao sinal que você quer exportar.
|
||||
|
||||
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
|
||||
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
|
||||
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
|
||||
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
<Tab title="Datadog">
|
||||
- **Datadog Site Domain** — Apenas o host OTLP do seu site Datadog, sem protocolo ou caminho. O CrewAI monta o endpoint HTTPS OTLP completo para você. Use o host correspondente ao seu [site Datadog](https://docs.datadoghq.com/getting_started/site/):
|
||||
- `otlp.datadoghq.com` (US1)
|
||||
- `otlp.us3.datadoghq.com` (US3)
|
||||
- `otlp.us5.datadoghq.com` (US5)
|
||||
- `otlp.datadoghq.eu` (EU1)
|
||||
- `otlp.ap1.datadoghq.com` (AP1)
|
||||
- **API Key** — Sua chave de API do Datadog. Veja [como criar uma](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
|
||||
|
||||
A integração com o Datadog exporta **traces**.
|
||||
|
||||
<Frame></Frame>
|
||||
</Tab>
|
||||
</Tabs>
|
||||
|
||||
5. *(opcional)* Clique em **Test Connection** para verificar se o CrewAI consegue acessar o endpoint com as credenciais fornecidas.
|
||||
6. Clique em **Save**.
|
||||
|
||||
<Tip>
|
||||
Você pode adicionar múltiplos coletores — por exemplo, um para traces e outro para logs, ou enviar para diferentes backends para diferentes propósitos.
|
||||
|
||||
@@ -161,6 +161,18 @@ crew = Crew(
|
||||
)
|
||||
```
|
||||
|
||||
<Note>
|
||||
`agent.i18n` é mantido apenas para compatibilidade retroativa e está obsoleto. Para customização de prompts em tempo de execução, passe `prompt_file` para `Crew`. Para acesso programático aos slices de prompt, use diretamente o utilitário de i18n:
|
||||
</Note>
|
||||
|
||||
```python
|
||||
from crewai.utilities.i18n import get_i18n
|
||||
|
||||
i18n = get_i18n("custom_prompts.json")
|
||||
format_slice = i18n.slice("format")
|
||||
tool_prompt = i18n.tools("ask_question")
|
||||
```
|
||||
|
||||
#### Opção 3: Desativar Prompts de Sistema para Modelos o1
|
||||
```python
|
||||
agent = Agent(
|
||||
@@ -208,6 +220,8 @@ Uma abordagem direta é criar um arquivo JSON para os prompts que deseja sobresc
|
||||
|
||||
O CrewAI então mescla suas customizações com os padrões, assim você não precisa redefinir todos os prompts. Veja como:
|
||||
|
||||
Para código que precisa ler slices de prompt diretamente, use `crewai.utilities.i18n.get_i18n()` com o mesmo arquivo de prompts em vez de ler `agent.i18n`.
|
||||
|
||||
### Exemplo: Customização Básica de Prompt
|
||||
|
||||
Crie um arquivo `custom_prompts.json` com os prompts que deseja modificar. Certifique-se de listar todos os prompts de nível superior que ele deve conter, não apenas suas alterações:
|
||||
|
||||
@@ -8,7 +8,7 @@ authors = [
|
||||
]
|
||||
requires-python = ">=3.10, <3.14"
|
||||
dependencies = [
|
||||
"crewai-core==1.14.7a3",
|
||||
"crewai-core==1.14.7",
|
||||
"click>=8.1.7,<9",
|
||||
"pydantic>=2.11.9,<2.13",
|
||||
"pydantic-settings~=2.10.1",
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "1.14.7a3"
|
||||
__version__ = "1.14.7"
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.14.7a3"
|
||||
"crewai[tools]==1.14.7"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.14.7a3"
|
||||
"crewai[tools]==1.14.7"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.14.7a3"
|
||||
"crewai[tools]==1.14.7"
|
||||
]
|
||||
|
||||
[tool.crewai]
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "1.14.7a3"
|
||||
__version__ = "1.14.7"
|
||||
|
||||
@@ -17,7 +17,7 @@ import contextlib
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
from typing import Any, Final
|
||||
from typing import Any, ClassVar, Final
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||
@@ -27,7 +27,7 @@ from opentelemetry.sdk.trace.export import (
|
||||
BatchSpanProcessor,
|
||||
SpanExportResult,
|
||||
)
|
||||
from opentelemetry.trace import Span, Status, StatusCode
|
||||
from opentelemetry.trace import ProxyTracerProvider, Span, Status, StatusCode
|
||||
from typing_extensions import Self
|
||||
|
||||
|
||||
@@ -72,8 +72,8 @@ class Telemetry:
|
||||
and event-bus signal handlers (see ``crewai.telemetry.telemetry``).
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
_lock = threading.Lock()
|
||||
_instance: ClassVar[Self | None] = None
|
||||
_lock: ClassVar[threading.Lock] = threading.Lock()
|
||||
|
||||
def __new__(cls) -> Self:
|
||||
if cls._instance is None:
|
||||
@@ -149,6 +149,10 @@ class Telemetry:
|
||||
if self.ready and not self.trace_set:
|
||||
try:
|
||||
with suppress_warnings():
|
||||
existing_provider = trace.get_tracer_provider()
|
||||
if not isinstance(existing_provider, ProxyTracerProvider):
|
||||
self.trace_set = True
|
||||
return
|
||||
trace.set_tracer_provider(self.provider)
|
||||
self.trace_set = True
|
||||
except Exception as e:
|
||||
|
||||
@@ -14,6 +14,7 @@ from crewai_core import (
|
||||
version,
|
||||
)
|
||||
import pytest
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
|
||||
|
||||
def test_version_returns_string() -> None:
|
||||
@@ -94,3 +95,36 @@ def test_user_data_decline_blocks(
|
||||
def test_unused_var_warning_silenced() -> None:
|
||||
# Touch os to keep the import (used by env-var fixtures above)
|
||||
assert os.environ is not None
|
||||
|
||||
|
||||
def test_core_telemetry_skips_duplicate_tracer_provider(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
from crewai_core.telemetry import Telemetry
|
||||
|
||||
Telemetry._instance = None
|
||||
monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False)
|
||||
monkeypatch.delenv("CREWAI_DISABLE_TELEMETRY", raising=False)
|
||||
monkeypatch.delenv("CREWAI_DISABLE_TRACKING", raising=False)
|
||||
|
||||
monkeypatch.setattr(
|
||||
"crewai_core.telemetry.trace.get_tracer_provider",
|
||||
lambda: TracerProvider(),
|
||||
)
|
||||
|
||||
called = False
|
||||
|
||||
def fail_if_called(provider: object) -> None:
|
||||
nonlocal called
|
||||
called = True
|
||||
|
||||
monkeypatch.setattr(
|
||||
"crewai_core.telemetry.trace.set_tracer_provider",
|
||||
fail_if_called,
|
||||
)
|
||||
|
||||
telemetry = Telemetry()
|
||||
telemetry.set_tracer()
|
||||
|
||||
assert called is False
|
||||
assert telemetry.trace_set is True
|
||||
|
||||
@@ -152,4 +152,4 @@ __all__ = [
|
||||
"wrap_file_source",
|
||||
]
|
||||
|
||||
__version__ = "1.14.7a3"
|
||||
__version__ = "1.14.7"
|
||||
|
||||
@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
|
||||
dependencies = [
|
||||
"pytube~=15.0.0",
|
||||
"requests>=2.33.0,<3",
|
||||
"crewai==1.14.7a3",
|
||||
"crewai==1.14.7",
|
||||
"tiktoken>=0.8.0,<0.13",
|
||||
"beautifulsoup4~=4.13.4",
|
||||
"python-docx~=1.2.0",
|
||||
|
||||
@@ -330,4 +330,4 @@ __all__ = [
|
||||
"ZapierActionTools",
|
||||
]
|
||||
|
||||
__version__ = "1.14.7a3"
|
||||
__version__ = "1.14.7"
|
||||
|
||||
@@ -22,6 +22,31 @@ logger = logging.getLogger(__name__)
|
||||
_UNSAFE_PATHS_ENV = "CREWAI_TOOLS_ALLOW_UNSAFE_PATHS"
|
||||
|
||||
|
||||
def format_path_for_display(path: str, base_dir: str | None = None) -> str:
|
||||
"""Return a path label that does not expose absolute directory prefixes."""
|
||||
if base_dir is None:
|
||||
base_dir = os.getcwd()
|
||||
|
||||
try:
|
||||
resolved_base = os.path.realpath(base_dir)
|
||||
resolved_path = os.path.realpath(
|
||||
os.path.join(resolved_base, path) if not os.path.isabs(path) else path
|
||||
)
|
||||
if os.path.commonpath([resolved_base, resolved_path]) == resolved_base:
|
||||
return os.path.relpath(resolved_path, resolved_base)
|
||||
except (OSError, ValueError) as exc:
|
||||
logger.debug("Falling back to basename for display path formatting: %s", exc)
|
||||
|
||||
return os.path.basename(os.path.realpath(path)) or "[redacted path]"
|
||||
|
||||
|
||||
def format_error_for_display(error: Exception) -> str:
|
||||
"""Return exception details without OS-added absolute path context."""
|
||||
if isinstance(error, OSError):
|
||||
return error.strerror or error.__class__.__name__
|
||||
return str(error)
|
||||
|
||||
|
||||
def _is_escape_hatch_enabled() -> bool:
|
||||
"""Check if the unsafe paths escape hatch is enabled."""
|
||||
return os.environ.get(_UNSAFE_PATHS_ENV, "").lower() in ("true", "1", "yes")
|
||||
@@ -66,8 +91,8 @@ def validate_file_path(path: str, base_dir: str | None = None) -> str:
|
||||
prefix = resolved_base if resolved_base.endswith(os.sep) else resolved_base + os.sep
|
||||
if not resolved_path.startswith(prefix) and resolved_path != resolved_base:
|
||||
raise ValueError(
|
||||
f"Path '{path}' resolves to '{resolved_path}' which is outside "
|
||||
f"the allowed directory '{resolved_base}'. "
|
||||
f"Path '{format_path_for_display(resolved_path, resolved_base)}' is "
|
||||
f"outside the allowed directory. "
|
||||
f"Set {_UNSAFE_PATHS_ENV}=true to bypass this check."
|
||||
)
|
||||
|
||||
|
||||
@@ -3,7 +3,11 @@ from typing import Any
|
||||
from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai_tools.security.safe_path import validate_file_path
|
||||
from crewai_tools.security.safe_path import (
|
||||
format_error_for_display,
|
||||
format_path_for_display,
|
||||
validate_file_path,
|
||||
)
|
||||
|
||||
|
||||
class FileReadToolSchema(BaseModel):
|
||||
@@ -58,8 +62,9 @@ class FileReadTool(BaseTool):
|
||||
**kwargs: Additional keyword arguments passed to BaseTool.
|
||||
"""
|
||||
if file_path is not None:
|
||||
display_path = format_path_for_display(file_path)
|
||||
kwargs["description"] = (
|
||||
f"A tool that reads file content. The default file is {file_path}, but you can provide a different 'file_path' parameter to read another file. You can also specify 'start_line' and 'line_count' to read specific parts of the file."
|
||||
f"A tool that reads file content. The default file is {display_path}, but you can provide a different 'file_path' parameter to read another file. You can also specify 'start_line' and 'line_count' to read specific parts of the file."
|
||||
)
|
||||
|
||||
super().__init__(**kwargs)
|
||||
@@ -78,7 +83,12 @@ class FileReadTool(BaseTool):
|
||||
if file_path is None:
|
||||
return "Error: No file path provided. Please provide a file path either in the constructor or as an argument."
|
||||
|
||||
file_path = validate_file_path(file_path)
|
||||
try:
|
||||
file_path = validate_file_path(file_path)
|
||||
except ValueError as e:
|
||||
return f"Error: Invalid file path: {e!s}"
|
||||
|
||||
display_path = format_path_for_display(file_path)
|
||||
try:
|
||||
with open(file_path, "r") as file:
|
||||
if start_line == 1 and line_count is None:
|
||||
@@ -98,8 +108,11 @@ class FileReadTool(BaseTool):
|
||||
|
||||
return "".join(selected_lines)
|
||||
except FileNotFoundError:
|
||||
return f"Error: File not found at path: {file_path}"
|
||||
return f"Error: File not found at path: {display_path}"
|
||||
except PermissionError:
|
||||
return f"Error: Permission denied when trying to read file: {file_path}"
|
||||
return f"Error: Permission denied when trying to read file: {display_path}"
|
||||
except Exception as e:
|
||||
return f"Error: Failed to read file {file_path}. {e!s}"
|
||||
return (
|
||||
f"Error: Failed to read file {display_path}. "
|
||||
f"{format_error_for_display(e)}"
|
||||
)
|
||||
|
||||
@@ -5,6 +5,11 @@ from typing import Any
|
||||
from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai_tools.security.safe_path import (
|
||||
format_error_for_display,
|
||||
format_path_for_display,
|
||||
)
|
||||
|
||||
|
||||
def strtobool(val: str | bool) -> bool:
|
||||
if isinstance(val, bool):
|
||||
@@ -44,6 +49,9 @@ class FileWriterTool(BaseTool):
|
||||
# itself, since that is not a valid file target.
|
||||
real_directory = Path(directory).resolve()
|
||||
real_filepath = Path(filepath).resolve()
|
||||
display_filepath = format_path_for_display(
|
||||
str(real_filepath), str(real_directory)
|
||||
)
|
||||
if (
|
||||
not real_filepath.is_relative_to(real_directory)
|
||||
or real_filepath == real_directory
|
||||
@@ -56,15 +64,18 @@ class FileWriterTool(BaseTool):
|
||||
kwargs["overwrite"] = strtobool(kwargs["overwrite"])
|
||||
|
||||
if os.path.exists(real_filepath) and not kwargs["overwrite"]:
|
||||
return f"File {real_filepath} already exists and overwrite option was not passed."
|
||||
return f"File {display_filepath} already exists and overwrite option was not passed."
|
||||
|
||||
mode = "w" if kwargs["overwrite"] else "x"
|
||||
with open(real_filepath, mode) as file:
|
||||
file.write(kwargs["content"])
|
||||
return f"Content successfully written to {real_filepath}"
|
||||
return f"Content successfully written to {display_filepath}"
|
||||
except FileExistsError:
|
||||
return f"File {real_filepath} already exists and overwrite option was not passed."
|
||||
return f"File {display_filepath} already exists and overwrite option was not passed."
|
||||
except KeyError as e:
|
||||
return f"An error occurred while accessing key: {e!s}"
|
||||
except Exception as e:
|
||||
return f"An error occurred while writing to the file: {e!s}"
|
||||
return (
|
||||
"An error occurred while writing to the file: "
|
||||
f"{format_error_for_display(e)}"
|
||||
)
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import os
|
||||
from unittest.mock import mock_open, patch
|
||||
|
||||
from crewai_tools import FileReadTool
|
||||
@@ -6,21 +5,16 @@ from crewai_tools import FileReadTool
|
||||
|
||||
def test_file_read_tool_constructor():
|
||||
"""Test FileReadTool initialization with file_path."""
|
||||
test_file = "/tmp/test_file.txt"
|
||||
test_content = "Hello, World!"
|
||||
with open(test_file, "w") as f:
|
||||
f.write(test_content)
|
||||
test_file = "test_file.txt"
|
||||
|
||||
tool = FileReadTool(file_path=test_file)
|
||||
assert tool.file_path == test_file
|
||||
assert "test_file.txt" in tool.description
|
||||
|
||||
os.remove(test_file)
|
||||
|
||||
|
||||
def test_file_read_tool_run():
|
||||
"""Test FileReadTool _run method with file_path at runtime."""
|
||||
test_file = "/tmp/test_file.txt"
|
||||
test_file = "test_file.txt"
|
||||
test_content = "Hello, World!"
|
||||
|
||||
# Use mock_open to mock file operations
|
||||
@@ -36,18 +30,18 @@ def test_file_read_tool_error_handling():
|
||||
result = tool._run()
|
||||
assert "Error: No file path provided" in result
|
||||
|
||||
result = tool._run(file_path="/nonexistent/file.txt")
|
||||
result = tool._run(file_path="nonexistent/file.txt")
|
||||
assert "Error: File not found at path:" in result
|
||||
|
||||
with patch("builtins.open", side_effect=PermissionError()):
|
||||
result = tool._run(file_path="/tmp/no_permission.txt")
|
||||
result = tool._run(file_path="no_permission.txt")
|
||||
assert "Error: Permission denied" in result
|
||||
|
||||
|
||||
def test_file_read_tool_constructor_and_run():
|
||||
"""Test FileReadTool using both constructor and runtime file paths."""
|
||||
test_file1 = "/tmp/test1.txt"
|
||||
test_file2 = "/tmp/test2.txt"
|
||||
test_file1 = "test1.txt"
|
||||
test_file2 = "test2.txt"
|
||||
content1 = "File 1 content"
|
||||
content2 = "File 2 content"
|
||||
|
||||
@@ -64,7 +58,7 @@ def test_file_read_tool_constructor_and_run():
|
||||
|
||||
def test_file_read_tool_chunk_reading():
|
||||
"""Test FileReadTool reading specific chunks of a file."""
|
||||
test_file = "/tmp/multiline_test.txt"
|
||||
test_file = "multiline_test.txt"
|
||||
lines = [
|
||||
"Line 1\n",
|
||||
"Line 2\n",
|
||||
@@ -104,7 +98,7 @@ def test_file_read_tool_chunk_reading():
|
||||
|
||||
def test_file_read_tool_chunk_error_handling():
|
||||
"""Test error handling for chunk reading."""
|
||||
test_file = "/tmp/short_test.txt"
|
||||
test_file = "short_test.txt"
|
||||
lines = ["Line 1\n", "Line 2\n", "Line 3\n"]
|
||||
file_content = "".join(lines)
|
||||
|
||||
@@ -122,7 +116,7 @@ def test_file_read_tool_chunk_error_handling():
|
||||
|
||||
def test_file_read_tool_zero_or_negative_start_line():
|
||||
"""Test that start_line values of 0 or negative read from the start of the file."""
|
||||
test_file = "/tmp/negative_test.txt"
|
||||
test_file = "negative_test.txt"
|
||||
lines = ["Line 1\n", "Line 2\n", "Line 3\n", "Line 4\n", "Line 5\n"]
|
||||
file_content = "".join(lines)
|
||||
|
||||
@@ -150,3 +144,45 @@ def test_file_read_tool_zero_or_negative_start_line():
|
||||
result = tool._run(file_path=test_file, start_line=-10, line_count=2)
|
||||
expected = "".join(lines[0:2]) # Should read first 2 lines
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_file_read_tool_error_messages_do_not_disclose_absolute_paths(
|
||||
tmp_path, monkeypatch
|
||||
):
|
||||
"""FileReadTool should redact absolute prefixes from user-visible errors."""
|
||||
monkeypatch.chdir(tmp_path)
|
||||
tool = FileReadTool()
|
||||
target = tmp_path / "secret.txt"
|
||||
|
||||
result = tool._run(file_path=str(target))
|
||||
assert "secret.txt" in result
|
||||
assert str(tmp_path) not in result
|
||||
|
||||
target.touch()
|
||||
with patch("builtins.open", side_effect=PermissionError()):
|
||||
result = tool._run(file_path=str(target))
|
||||
assert "secret.txt" in result
|
||||
assert str(tmp_path) not in result
|
||||
|
||||
with patch(
|
||||
"builtins.open",
|
||||
side_effect=OSError(5, "Input/output error", str(target)),
|
||||
):
|
||||
result = tool._run(file_path=str(target))
|
||||
assert "secret.txt" in result
|
||||
assert str(tmp_path) not in result
|
||||
|
||||
|
||||
def test_file_read_tool_invalid_path_error_does_not_disclose_workspace(
|
||||
tmp_path, monkeypatch
|
||||
):
|
||||
"""Validation errors should not echo the resolved workspace path."""
|
||||
monkeypatch.chdir(tmp_path)
|
||||
outside = tmp_path.parent / "outside.txt"
|
||||
|
||||
result = FileReadTool()._run(file_path=str(outside))
|
||||
|
||||
assert "Invalid file path" in result
|
||||
assert "outside.txt" in result
|
||||
assert str(tmp_path) not in result
|
||||
assert str(tmp_path.parent) not in result
|
||||
|
||||
@@ -47,6 +47,8 @@ def test_basic_file_write(tool, temp_env):
|
||||
assert os.path.exists(path)
|
||||
assert read_file(path) == temp_env["test_content"]
|
||||
assert "successfully written" in result
|
||||
assert temp_env["test_file"] in result
|
||||
assert temp_env["temp_dir"] not in result
|
||||
|
||||
|
||||
def test_directory_creation(tool, temp_env):
|
||||
@@ -62,6 +64,8 @@ def test_directory_creation(tool, temp_env):
|
||||
assert os.path.exists(new_dir)
|
||||
assert os.path.exists(path)
|
||||
assert "successfully written" in result
|
||||
assert temp_env["test_file"] in result
|
||||
assert new_dir not in result
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -134,6 +138,8 @@ def test_file_exists_error_handling(tool, temp_env, overwrite):
|
||||
)
|
||||
|
||||
assert "already exists and overwrite option was not passed" in result
|
||||
assert temp_env["test_file"] in result
|
||||
assert temp_env["temp_dir"] not in result
|
||||
assert read_file(path) == "Pre-existing content"
|
||||
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import os
|
||||
import pytest
|
||||
|
||||
from crewai_tools.security.safe_path import (
|
||||
format_path_for_display,
|
||||
validate_directory_path,
|
||||
validate_file_path,
|
||||
validate_url,
|
||||
@@ -66,6 +67,37 @@ class TestValidateFilePath:
|
||||
result = validate_file_path("/etc/passwd", str(tmp_path))
|
||||
assert result == os.path.realpath("/etc/passwd")
|
||||
|
||||
def test_rejection_message_redacts_absolute_prefixes(self, tmp_path):
|
||||
outside = tmp_path.parent / "outside.txt"
|
||||
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
validate_file_path(str(outside), str(tmp_path))
|
||||
|
||||
message = str(exc_info.value)
|
||||
assert "outside.txt" in message
|
||||
assert str(tmp_path) not in message
|
||||
assert str(tmp_path.parent) not in message
|
||||
|
||||
|
||||
class TestFormatPathForDisplay:
|
||||
"""Tests for user-visible path labels."""
|
||||
|
||||
def test_returns_relative_path_inside_base(self, tmp_path):
|
||||
nested_file = tmp_path / "nested" / "file.txt"
|
||||
nested_file.parent.mkdir()
|
||||
nested_file.touch()
|
||||
|
||||
result = format_path_for_display(str(nested_file), str(tmp_path))
|
||||
|
||||
assert result == os.path.join("nested", "file.txt")
|
||||
|
||||
def test_redacts_absolute_prefix_outside_base(self, tmp_path):
|
||||
outside_file = tmp_path.parent / "outside.txt"
|
||||
|
||||
result = format_path_for_display(str(outside_file), str(tmp_path))
|
||||
|
||||
assert result == "outside.txt"
|
||||
|
||||
|
||||
class TestValidateDirectoryPath:
|
||||
"""Tests for validate_directory_path."""
|
||||
|
||||
@@ -8,8 +8,8 @@ authors = [
|
||||
]
|
||||
requires-python = ">=3.10, <3.14"
|
||||
dependencies = [
|
||||
"crewai-core==1.14.7a3",
|
||||
"crewai-cli==1.14.7a3",
|
||||
"crewai-core==1.14.7",
|
||||
"crewai-cli==1.14.7",
|
||||
# Core Dependencies
|
||||
"pydantic>=2.11.9,<2.13",
|
||||
"openai>=2.30.0,<3",
|
||||
@@ -54,7 +54,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
|
||||
|
||||
[project.optional-dependencies]
|
||||
tools = [
|
||||
"crewai-tools==1.14.7a3",
|
||||
"crewai-tools==1.14.7",
|
||||
]
|
||||
embeddings = [
|
||||
"tiktoken>=0.8.0,<0.13"
|
||||
|
||||
@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
|
||||
|
||||
_suppress_pydantic_deprecation_warnings()
|
||||
|
||||
__version__ = "1.14.7a3"
|
||||
__version__ = "1.14.7"
|
||||
|
||||
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
|
||||
"Memory": ("crewai.memory.unified_memory", "Memory"),
|
||||
|
||||
@@ -46,6 +46,7 @@ from crewai.state.checkpoint_config import CheckpointConfig, _coerce_checkpoint
|
||||
from crewai.tools.base_tool import BaseTool, Tool
|
||||
from crewai.types.callback import SerializableCallable
|
||||
from crewai.utilities.config import process_config
|
||||
from crewai.utilities.i18n import I18N, get_i18n
|
||||
from crewai.utilities.logger import Logger
|
||||
from crewai.utilities.rpm_controller import RPMController
|
||||
from crewai.utilities.string_utils import interpolate_only
|
||||
@@ -81,6 +82,7 @@ _LLM_TYPE_REGISTRY: dict[str, str] = {
|
||||
def _validate_llm_ref(value: Any) -> Any:
|
||||
if isinstance(value, dict):
|
||||
import importlib
|
||||
import inspect
|
||||
|
||||
llm_type = value.get("llm_type")
|
||||
if not llm_type or llm_type not in _LLM_TYPE_REGISTRY:
|
||||
@@ -91,6 +93,12 @@ def _validate_llm_ref(value: Any) -> Any:
|
||||
dotted = _LLM_TYPE_REGISTRY[llm_type]
|
||||
mod_path, cls_name = dotted.rsplit(".", 1)
|
||||
cls = getattr(importlib.import_module(mod_path), cls_name)
|
||||
if inspect.isabstract(cls):
|
||||
from crewai.llm import LLM
|
||||
|
||||
return LLM(
|
||||
**{k: v for k, v in value.items() if v is not None and k != "llm_type"}
|
||||
)
|
||||
return cls(**value)
|
||||
return value
|
||||
|
||||
@@ -186,6 +194,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
|
||||
tools (list[Any] | None): Tools at the agent's disposal.
|
||||
max_iter (int): Maximum iterations for an agent to execute a task.
|
||||
agent_executor: An instance of the CrewAgentExecutor class.
|
||||
i18n (I18N): Internationalization settings.
|
||||
llm (Any): Language model that will run the agent.
|
||||
crew (Any): Crew to which the agent belongs.
|
||||
|
||||
@@ -265,6 +274,14 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
|
||||
_serialize_executor_ref, return_type=dict | None, when_used="json"
|
||||
),
|
||||
] = Field(default=None, description="An instance of the CrewAgentExecutor class.")
|
||||
i18n: I18N = Field(
|
||||
default_factory=get_i18n,
|
||||
description="Internationalization settings.",
|
||||
deprecated=(
|
||||
"Agent.i18n is deprecated and will be removed in a future release. "
|
||||
"Use crewai.utilities.i18n.get_i18n() or Crew(prompt_file=...) instead."
|
||||
),
|
||||
)
|
||||
|
||||
llm: Annotated[
|
||||
str | BaseLLM | None,
|
||||
|
||||
@@ -117,8 +117,10 @@ def capture_execution_context(
|
||||
)
|
||||
|
||||
|
||||
def apply_execution_context(ctx: ExecutionContext) -> None:
|
||||
def apply_execution_context(ctx: ExecutionContext | dict[str, Any]) -> None:
|
||||
"""Write an ExecutionContext back into the ContextVars."""
|
||||
if isinstance(ctx, dict):
|
||||
ctx = ExecutionContext.model_validate(ctx)
|
||||
_current_task_id.set(ctx.current_task_id)
|
||||
current_flow_request_id.set(ctx.flow_request_id)
|
||||
current_flow_id.set(ctx.flow_id)
|
||||
|
||||
@@ -1013,6 +1013,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
)
|
||||
token = attach(baggage_ctx)
|
||||
|
||||
runtime_scope = crewai_event_bus._enter_runtime_scope()
|
||||
try:
|
||||
inputs = prepare_kickoff(self, inputs, input_files)
|
||||
|
||||
@@ -1048,6 +1049,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self._memory.drain_writes()
|
||||
clear_files(self.id)
|
||||
detach(token)
|
||||
crewai_event_bus._exit_runtime_scope(runtime_scope)
|
||||
|
||||
def _post_kickoff(self, result: CrewOutput) -> CrewOutput:
|
||||
return result
|
||||
@@ -1223,6 +1225,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
)
|
||||
token = attach(baggage_ctx)
|
||||
|
||||
runtime_scope = crewai_event_bus._enter_runtime_scope()
|
||||
try:
|
||||
inputs = prepare_kickoff(self, inputs, input_files)
|
||||
|
||||
@@ -1256,6 +1259,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
finally:
|
||||
clear_files(self.id)
|
||||
detach(token)
|
||||
crewai_event_bus._exit_runtime_scope(runtime_scope)
|
||||
|
||||
async def akickoff_for_each(
|
||||
self,
|
||||
|
||||
@@ -80,6 +80,17 @@ def is_replaying() -> bool:
|
||||
return _replaying.get()
|
||||
|
||||
|
||||
_runtime_state_var: contextvars.ContextVar[RuntimeState | None] = (
|
||||
contextvars.ContextVar("crewai_runtime_state", default=None)
|
||||
)
|
||||
_registered_entity_ids_var: contextvars.ContextVar[set[int] | None] = (
|
||||
contextvars.ContextVar("crewai_registered_entity_ids", default=None)
|
||||
)
|
||||
_runtime_scope_depth: contextvars.ContextVar[int] = contextvars.ContextVar(
|
||||
"crewai_runtime_scope_depth", default=0
|
||||
)
|
||||
|
||||
|
||||
class CrewAIEventsBus:
|
||||
"""Singleton event bus for handling events in CrewAI.
|
||||
|
||||
@@ -116,7 +127,6 @@ class CrewAIEventsBus:
|
||||
_futures_lock: threading.Lock
|
||||
_executor_initialized: bool
|
||||
_has_pending_events: bool
|
||||
_runtime_state: RuntimeState | None
|
||||
|
||||
def __new__(cls) -> Self:
|
||||
"""Create or return the singleton instance.
|
||||
@@ -151,8 +161,6 @@ class CrewAIEventsBus:
|
||||
self._console = ConsoleFormatter()
|
||||
self._executor_initialized = False
|
||||
self._has_pending_events = False
|
||||
self._runtime_state: RuntimeState | None = None
|
||||
self._registered_entity_ids: set[int] = set()
|
||||
|
||||
def _ensure_executor_initialized(self) -> None:
|
||||
"""Lazily initialize the thread pool executor and event loop.
|
||||
@@ -281,6 +289,51 @@ class CrewAIEventsBus:
|
||||
"""The RuntimeState currently attached to the bus, if any."""
|
||||
return self._runtime_state
|
||||
|
||||
@property
|
||||
def _runtime_state(self) -> RuntimeState | None:
|
||||
return _runtime_state_var.get()
|
||||
|
||||
@_runtime_state.setter
|
||||
def _runtime_state(self, value: RuntimeState | None) -> None:
|
||||
_runtime_state_var.set(value)
|
||||
|
||||
@property
|
||||
def _registered_entity_ids(self) -> set[int]:
|
||||
ids = _registered_entity_ids_var.get()
|
||||
if ids is None:
|
||||
ids = set()
|
||||
_registered_entity_ids_var.set(ids)
|
||||
return ids
|
||||
|
||||
@_registered_entity_ids.setter
|
||||
def _registered_entity_ids(self, value: set[int]) -> None:
|
||||
_registered_entity_ids_var.set(value)
|
||||
|
||||
def reset_runtime_state(self) -> None:
|
||||
"""Detach the RuntimeState and clear the entity registry."""
|
||||
self._runtime_state = None
|
||||
self._registered_entity_ids = set()
|
||||
|
||||
def _enter_runtime_scope(self) -> bool:
|
||||
depth = _runtime_scope_depth.get()
|
||||
_runtime_scope_depth.set(depth + 1)
|
||||
if depth != 0:
|
||||
return False
|
||||
if _runtime_state_var.get() is None:
|
||||
from crewai import RuntimeState
|
||||
|
||||
if RuntimeState is not None:
|
||||
_runtime_state_var.set(RuntimeState(root=[]))
|
||||
_registered_entity_ids_var.set(set())
|
||||
return True
|
||||
|
||||
def _exit_runtime_scope(self, outermost: bool) -> None:
|
||||
depth = _runtime_scope_depth.get()
|
||||
_runtime_scope_depth.set(depth - 1 if depth > 0 else 0)
|
||||
if outermost:
|
||||
_runtime_state_var.set(None)
|
||||
_registered_entity_ids_var.set(None)
|
||||
|
||||
def register_entity(self, entity: Any) -> None:
|
||||
"""Add an entity to the RuntimeState, creating it if needed.
|
||||
|
||||
@@ -349,6 +402,7 @@ class CrewAIEventsBus:
|
||||
source: Any,
|
||||
event: BaseEvent,
|
||||
handlers: SyncHandlerSet,
|
||||
state: RuntimeState | None,
|
||||
) -> None:
|
||||
"""Call provided synchronous handlers.
|
||||
|
||||
@@ -356,8 +410,8 @@ class CrewAIEventsBus:
|
||||
source: The emitting object
|
||||
event: The event instance
|
||||
handlers: Frozenset of sync handlers to call
|
||||
state: The RuntimeState captured on the emitting context
|
||||
"""
|
||||
state = self._runtime_state
|
||||
errors: list[tuple[SyncHandler, Exception]] = [
|
||||
(handler, error)
|
||||
for handler in handlers
|
||||
@@ -376,6 +430,7 @@ class CrewAIEventsBus:
|
||||
source: Any,
|
||||
event: BaseEvent,
|
||||
handlers: AsyncHandlerSet,
|
||||
state: RuntimeState | None,
|
||||
) -> None:
|
||||
"""Asynchronously call provided async handlers.
|
||||
|
||||
@@ -383,8 +438,8 @@ class CrewAIEventsBus:
|
||||
source: The object that emitted the event
|
||||
event: The event instance
|
||||
handlers: Frozenset of async handlers to call
|
||||
state: The RuntimeState captured on the emitting context
|
||||
"""
|
||||
state = self._runtime_state
|
||||
|
||||
async def _call(handler: AsyncHandler) -> Any:
|
||||
if _get_param_count(handler) >= 3:
|
||||
@@ -399,7 +454,9 @@ class CrewAIEventsBus:
|
||||
f"[CrewAIEventsBus] Async handler error in {getattr(handler, '__name__', handler)}: {result}"
|
||||
)
|
||||
|
||||
async def _emit_with_dependencies(self, source: Any, event: BaseEvent) -> None:
|
||||
async def _emit_with_dependencies(
|
||||
self, source: Any, event: BaseEvent, state: RuntimeState | None
|
||||
) -> None:
|
||||
"""Emit an event with dependency-aware handler execution.
|
||||
|
||||
Handlers are grouped into execution levels based on their dependencies.
|
||||
@@ -450,18 +507,18 @@ class CrewAIEventsBus:
|
||||
|
||||
if level_sync:
|
||||
if event_type is LLMStreamChunkEvent:
|
||||
self._call_handlers(source, event, level_sync)
|
||||
self._call_handlers(source, event, level_sync, state)
|
||||
else:
|
||||
ctx = contextvars.copy_context()
|
||||
future = self._sync_executor.submit(
|
||||
ctx.run, self._call_handlers, source, event, level_sync
|
||||
ctx.run, self._call_handlers, source, event, level_sync, state
|
||||
)
|
||||
await asyncio.get_running_loop().run_in_executor(
|
||||
None, future.result
|
||||
)
|
||||
|
||||
if level_async:
|
||||
await self._acall_handlers(source, event, level_async)
|
||||
await self._acall_handlers(source, event, level_async, state)
|
||||
|
||||
def _register_source(self, source: Any) -> None:
|
||||
"""Register the source entity in RuntimeState if applicable."""
|
||||
@@ -556,21 +613,23 @@ class CrewAIEventsBus:
|
||||
self._ensure_executor_initialized()
|
||||
self._has_pending_events = True
|
||||
|
||||
state = self._runtime_state
|
||||
|
||||
if has_dependencies:
|
||||
return self._track_future(
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._emit_with_dependencies(source, event),
|
||||
self._emit_with_dependencies(source, event, state),
|
||||
self._loop,
|
||||
)
|
||||
)
|
||||
|
||||
if sync_handlers:
|
||||
if event_type is LLMStreamChunkEvent:
|
||||
self._call_handlers(source, event, sync_handlers)
|
||||
self._call_handlers(source, event, sync_handlers, state)
|
||||
else:
|
||||
ctx = contextvars.copy_context()
|
||||
sync_future = self._sync_executor.submit(
|
||||
ctx.run, self._call_handlers, source, event, sync_handlers
|
||||
ctx.run, self._call_handlers, source, event, sync_handlers, state
|
||||
)
|
||||
if not async_handlers:
|
||||
return self._track_future(sync_future)
|
||||
@@ -578,7 +637,7 @@ class CrewAIEventsBus:
|
||||
if async_handlers:
|
||||
return self._track_future(
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._acall_handlers(source, event, async_handlers),
|
||||
self._acall_handlers(source, event, async_handlers, state),
|
||||
self._loop,
|
||||
)
|
||||
)
|
||||
@@ -590,21 +649,22 @@ class CrewAIEventsBus:
|
||||
source: Any,
|
||||
event: BaseEvent,
|
||||
handlers: AsyncHandlerSet,
|
||||
state: RuntimeState | None,
|
||||
) -> None:
|
||||
"""Call async handlers with the replaying flag set on the loop thread."""
|
||||
token = _replaying.set(True)
|
||||
try:
|
||||
await self._acall_handlers(source, event, handlers)
|
||||
await self._acall_handlers(source, event, handlers, state)
|
||||
finally:
|
||||
_replaying.reset(token)
|
||||
|
||||
async def _emit_with_dependencies_replaying(
|
||||
self, source: Any, event: BaseEvent
|
||||
self, source: Any, event: BaseEvent, state: RuntimeState | None
|
||||
) -> None:
|
||||
"""Dependency-aware dispatch with the replaying flag set."""
|
||||
token = _replaying.set(True)
|
||||
try:
|
||||
await self._emit_with_dependencies(source, event)
|
||||
await self._emit_with_dependencies(source, event, state)
|
||||
finally:
|
||||
_replaying.reset(token)
|
||||
|
||||
@@ -638,12 +698,13 @@ class CrewAIEventsBus:
|
||||
self._ensure_executor_initialized()
|
||||
self._has_pending_events = True
|
||||
|
||||
state = self._runtime_state
|
||||
token = _replaying.set(True)
|
||||
try:
|
||||
if has_dependencies:
|
||||
return self._track_future(
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._emit_with_dependencies_replaying(source, event),
|
||||
self._emit_with_dependencies_replaying(source, event, state),
|
||||
self._loop,
|
||||
)
|
||||
)
|
||||
@@ -651,7 +712,7 @@ class CrewAIEventsBus:
|
||||
if sync_handlers:
|
||||
ctx = contextvars.copy_context()
|
||||
sync_future = self._sync_executor.submit(
|
||||
ctx.run, self._call_handlers, source, event, sync_handlers
|
||||
ctx.run, self._call_handlers, source, event, sync_handlers, state
|
||||
)
|
||||
self._track_future(sync_future)
|
||||
if not async_handlers:
|
||||
@@ -659,7 +720,9 @@ class CrewAIEventsBus:
|
||||
|
||||
return self._track_future(
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._acall_handlers_replaying(source, event, async_handlers),
|
||||
self._acall_handlers_replaying(
|
||||
source, event, async_handlers, state
|
||||
),
|
||||
self._loop,
|
||||
)
|
||||
)
|
||||
@@ -727,7 +790,9 @@ class CrewAIEventsBus:
|
||||
async_handlers = self._async_handlers.get(event_type, frozenset())
|
||||
|
||||
if async_handlers:
|
||||
await self._acall_handlers(source, event, async_handlers)
|
||||
await self._acall_handlers(
|
||||
source, event, async_handlers, self._runtime_state
|
||||
)
|
||||
|
||||
def register_handler(
|
||||
self,
|
||||
|
||||
@@ -292,7 +292,7 @@ class TraceCollectionListener(BaseEventListener):
|
||||
@event_bus.on(CrewKickoffCompletedEvent)
|
||||
def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None:
|
||||
self._handle_trace_event("crew_kickoff_completed", source, event)
|
||||
if self.batch_manager.defer_session_finalization:
|
||||
if self._should_defer_session_finalization():
|
||||
return
|
||||
if self._nested_in_flow_execution():
|
||||
return
|
||||
@@ -306,7 +306,7 @@ class TraceCollectionListener(BaseEventListener):
|
||||
@event_bus.on(CrewKickoffFailedEvent)
|
||||
def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None:
|
||||
self._handle_trace_event("crew_kickoff_failed", source, event)
|
||||
if self.batch_manager.defer_session_finalization:
|
||||
if self._should_defer_session_finalization():
|
||||
return
|
||||
if self._nested_in_flow_execution():
|
||||
return
|
||||
@@ -734,7 +734,7 @@ class TraceCollectionListener(BaseEventListener):
|
||||
if not self.batch_manager.is_batch_initialized():
|
||||
return
|
||||
# Multi-turn flows defer batch finalization to finalize_session_traces().
|
||||
if self.batch_manager.defer_session_finalization:
|
||||
if self._should_defer_session_finalization():
|
||||
return
|
||||
self.batch_manager.finalize_batch()
|
||||
|
||||
@@ -745,6 +745,15 @@ class TraceCollectionListener(BaseEventListener):
|
||||
|
||||
return current_flow_id.get() is not None
|
||||
|
||||
def _should_defer_session_finalization(self) -> bool:
|
||||
"""True when the active trace belongs to a deferred flow session."""
|
||||
from crewai.flow.flow_context import current_flow_defer_trace_finalization
|
||||
|
||||
return (
|
||||
self.batch_manager.defer_session_finalization
|
||||
or current_flow_defer_trace_finalization.get()
|
||||
)
|
||||
|
||||
def _flow_owns_trace_batch(self) -> bool:
|
||||
"""True when an in-flight conversational flow already owns the trace batch."""
|
||||
if self.batch_manager.batch_owner_type == "flow":
|
||||
@@ -780,12 +789,17 @@ class TraceCollectionListener(BaseEventListener):
|
||||
def _try_initialize_flow_batch_from_context(self, event: Any) -> bool:
|
||||
"""Claim a flow trace batch when an action event fires inside kickoff.
|
||||
|
||||
When ``suppress_flow_events=True``, console panels are hidden but
|
||||
``FlowStartedEvent`` and method lifecycle events still emit; if no
|
||||
batch exists yet, LLM/tool events must not fall back to implicit crew
|
||||
batches.
|
||||
When ``suppress_flow_events=True`` (infrastructure flows such as
|
||||
``AgentExecutor`` and the memory flows), flow and method lifecycle
|
||||
events are not emitted, so the batch is claimed from the flow context
|
||||
(``current_flow_id``) to keep LLM/tool events from falling back to an
|
||||
implicit crew batch.
|
||||
"""
|
||||
from crewai.flow.flow_context import current_flow_id, current_flow_name
|
||||
from crewai.flow.flow_context import (
|
||||
current_flow_defer_trace_finalization,
|
||||
current_flow_id,
|
||||
current_flow_name,
|
||||
)
|
||||
|
||||
flow_id = current_flow_id.get()
|
||||
if flow_id is None:
|
||||
@@ -801,6 +815,8 @@ class TraceCollectionListener(BaseEventListener):
|
||||
}
|
||||
self.batch_manager.batch_owner_type = "flow"
|
||||
self.batch_manager.batch_owner_id = flow_id
|
||||
if current_flow_defer_trace_finalization.get():
|
||||
self.batch_manager.defer_session_finalization = True
|
||||
self._initialize_batch(user_context, execution_metadata)
|
||||
return True
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from typing import Any, Literal
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
from pydantic import BaseModel, ConfigDict, field_serializer
|
||||
|
||||
from crewai.events.base_events import BaseEvent
|
||||
|
||||
@@ -57,6 +57,10 @@ class MethodExecutionFailedEvent(FlowEvent):
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
@field_serializer("error")
|
||||
def _serialize_error(self, error: Exception) -> str:
|
||||
return str(error)
|
||||
|
||||
|
||||
class MethodExecutionPausedEvent(FlowEvent):
|
||||
"""Event emitted when a flow method is paused waiting for human feedback.
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
"""Conversational graph + helpers as a mixin for ``Flow`` (experimental).
|
||||
"""Conversational graph + helpers as an experimental Flow extension.
|
||||
|
||||
The experimental conversational chat surface lives here as a mixin so that
|
||||
``crewai.flow.runtime`` stays focused on the execution engine. ``Flow``
|
||||
inherits from ``_ConversationalMixin``; the methods only register on
|
||||
subclasses that opt in via ``conversational = True`` (enforced by the
|
||||
``_conversational_only`` marker + ``FlowMeta`` gating in
|
||||
``crewai.flow.runtime``).
|
||||
The conversational chat surface remains experimental and may change before the
|
||||
v2 graduation path. It lives here so ``crewai.flow.runtime`` can stay focused
|
||||
on the execution engine. ``crewai.flow.flow`` composes this mixin onto the
|
||||
public ``Flow`` class for backwards compatibility.
|
||||
|
||||
The built-in conversational graph only registers for subclasses that opt in
|
||||
with ``conversational = True``. Static conversational metadata is projected
|
||||
into ``FlowDefinition.conversational`` via the Python DSL builder.
|
||||
|
||||
Import surface:
|
||||
- :class:`_ConversationalMixin` — internal; ``Flow`` mixes it in. Users
|
||||
don't import it directly.
|
||||
- :class:`_ConversationalMixin` — internal; the public ``Flow`` class
|
||||
composes it in. Users don't import it directly.
|
||||
- The data types this mixin uses live in
|
||||
:mod:`crewai.experimental.conversational`.
|
||||
"""
|
||||
@@ -20,7 +22,7 @@ from collections.abc import Callable, Mapping, Sequence
|
||||
from enum import Enum
|
||||
import json
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypeVar, cast
|
||||
|
||||
from pydantic import BaseModel, Field, create_model
|
||||
|
||||
@@ -44,26 +46,69 @@ from crewai.flow.conversation import (
|
||||
get_conversation_messages,
|
||||
receive_user_message as _receive_user_message,
|
||||
)
|
||||
from crewai.flow.dsl import listen, router, start
|
||||
from crewai.flow.dsl import listen, start
|
||||
from crewai.flow.dsl._utils import _method_action, _set_flow_method_definition
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
from crewai.utilities.types import LLMMessage
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.runtime import Flow
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _ConversationalMixin:
|
||||
"""Built-in conversational graph for ``Flow`` (gated on ``conversational``).
|
||||
def _iter_condition_labels(condition: Any) -> set[str]:
|
||||
if isinstance(condition, str):
|
||||
return {condition}
|
||||
if isinstance(condition, dict):
|
||||
labels: set[str] = set()
|
||||
for value in condition.values():
|
||||
if isinstance(value, list):
|
||||
for item in value:
|
||||
labels.update(_iter_condition_labels(item))
|
||||
else:
|
||||
labels.update(_iter_condition_labels(value))
|
||||
return labels
|
||||
return set()
|
||||
|
||||
Mixed into ``Flow`` so its execution engine (``runtime.py``) stays focused
|
||||
on running graphs. The methods here only register on subclasses that set
|
||||
``conversational = True``; non-chat flows see them as inert attributes.
|
||||
|
||||
def _conversation_start_router(func: Callable[..., Any]) -> Any:
|
||||
wrapper = start()(func)
|
||||
_set_flow_method_definition(
|
||||
cast(Any, wrapper),
|
||||
FlowMethodDefinition(do=_method_action(func), start=True, router=True),
|
||||
)
|
||||
return wrapper
|
||||
|
||||
|
||||
class _ConversationalMixin:
|
||||
"""Experimental conversational graph for ``Flow``.
|
||||
|
||||
This mixin owns chat behavior and runtime hooks. Non-chat flows see these
|
||||
methods as inert attributes unless they opt in with ``conversational = True``.
|
||||
"""
|
||||
|
||||
# === EXPERIMENTAL: conversational mode ===
|
||||
# When ``conversational = True`` on a Flow subclass, this mixin's built-in
|
||||
# graph registers and ``handle_turn`` / ``chat`` become chat entry points.
|
||||
conversational: ClassVar[bool] = False
|
||||
conversational_config: ClassVar[ConversationConfig | None] = None
|
||||
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
|
||||
internal_routes: ClassVar[tuple[str, ...]] = ("answer_from_history",)
|
||||
builtin_route_descriptions: ClassVar[dict[str, str]] = {
|
||||
"converse": (
|
||||
"Ordinary chat, follow-ups, summaries, clarifications, and "
|
||||
"questions answerable from prior conversation history."
|
||||
),
|
||||
"end": ("User signals the conversation is finished (goodbye, exit, done)."),
|
||||
"answer_from_history": (
|
||||
"Answer directly from prior conversation history without invoking "
|
||||
"tools, agents, or custom routes."
|
||||
),
|
||||
}
|
||||
|
||||
# The metaclass + state attributes referenced below live on ``Flow`` —
|
||||
# this mixin is never instantiated standalone. These type-only
|
||||
# declarations exist so static analyzers don't flag attribute access.
|
||||
@@ -71,22 +116,15 @@ class _ConversationalMixin:
|
||||
# (otherwise mypy flags "Cannot override instance variable with class
|
||||
# variable" when Flow declares them as ``ClassVar``).
|
||||
if TYPE_CHECKING:
|
||||
conversational: ClassVar[bool]
|
||||
conversational_config: ClassVar[ConversationConfig | None]
|
||||
builtin_routes: ClassVar[tuple[str, ...]]
|
||||
internal_routes: ClassVar[tuple[str, ...]]
|
||||
builtin_route_descriptions: ClassVar[dict[str, str]]
|
||||
# Registry ClassVars populated by ``FlowMeta`` at class creation.
|
||||
_listeners: ClassVar[dict[Any, Any]]
|
||||
|
||||
# Instance attrs from ``Flow``.
|
||||
state: Any
|
||||
name: str | None
|
||||
_completed_methods: set[Any]
|
||||
_method_outputs: list[Any]
|
||||
_pending_and_listeners: dict[Any, Any]
|
||||
_pending_events: dict[Any, Any]
|
||||
_method_call_counts: dict[Any, int]
|
||||
_is_execution_resuming: bool
|
||||
_conversation_messages: list[LLMMessage]
|
||||
_pending_user_message: str | dict[str, Any] | None
|
||||
_pending_intents: Sequence[str] | None
|
||||
_pending_intent_llm: str | BaseLLM | None
|
||||
@@ -97,8 +135,8 @@ class _ConversationalMixin:
|
||||
def _collapse_to_outcome(
|
||||
self,
|
||||
feedback: str,
|
||||
outcomes: tuple[str, ...],
|
||||
llm: str | BaseLLM | Any,
|
||||
outcomes: Sequence[str],
|
||||
llm: str | BaseLLM,
|
||||
) -> str:
|
||||
pass
|
||||
|
||||
@@ -108,23 +146,24 @@ class _ConversationalMixin:
|
||||
def kickoff(self, *args: Any, **kwargs: Any) -> Any:
|
||||
pass
|
||||
|
||||
@start()
|
||||
@_conversational_only
|
||||
def conversation_start(self) -> str | None:
|
||||
"""Internal Flow entrypoint that hands the user message to the router.
|
||||
"""Return the current user message for conversational route selection.
|
||||
|
||||
In conversational mode, ``Flow.kickoff_async`` runs all ``@start``
|
||||
methods sequentially and this one is registered last, so any user
|
||||
``@start`` methods (e.g. permission loading) have already finished
|
||||
before the returned value triggers ``route_conversation``.
|
||||
This remains as a plain overridable helper for compatibility. It is not
|
||||
registered as a Flow method; ``route_conversation`` is the synthetic
|
||||
built-in start/router that begins a conversational turn.
|
||||
"""
|
||||
state = cast(ConversationState, self.state)
|
||||
return state.current_user_message
|
||||
|
||||
@router(conversation_start)
|
||||
@_conversation_start_router
|
||||
@_conversational_only
|
||||
def route_conversation(self) -> str:
|
||||
"""Route the current turn to a listener label."""
|
||||
if "conversation_start" not in {
|
||||
str(method_name) for method_name in self._completed_methods
|
||||
}:
|
||||
self.conversation_start()
|
||||
state = cast(ConversationState, self.state)
|
||||
context = self.build_router_context()
|
||||
previous_intent = state.last_intent
|
||||
@@ -238,8 +277,8 @@ class _ConversationalMixin:
|
||||
state = cast(ConversationState, self.state)
|
||||
sid = session_id or state.id
|
||||
|
||||
# Stash the pending turn so ``_apply_pending_conversational_turn``
|
||||
# picks it up AFTER persist restore.
|
||||
# Stash the pending turn so the kickoff extension hook picks it up
|
||||
# after persist restore.
|
||||
self._pending_user_message = message
|
||||
self._pending_intents = list(intents) if intents else None
|
||||
self._pending_intent_llm = intent_llm
|
||||
@@ -286,7 +325,7 @@ class _ConversationalMixin:
|
||||
callers can customize prompts or exercise the loop without patching
|
||||
builtins.
|
||||
"""
|
||||
if not getattr(type(self), "conversational", False):
|
||||
if not self._is_conversational_enabled():
|
||||
raise ValueError("Flow.chat() is only available on conversational flows")
|
||||
|
||||
exit_set = {command.lower() for command in exit_commands}
|
||||
@@ -491,14 +530,14 @@ class _ConversationalMixin:
|
||||
**extra: Any,
|
||||
) -> None:
|
||||
"""Append a message to conversation history (legacy ChatState path)."""
|
||||
_append_conversation_message(cast("Flow[Any]", self), role, content, **extra)
|
||||
_append_conversation_message(cast(Any, self), role, content, **extra)
|
||||
|
||||
@property
|
||||
def conversation_messages(self) -> list[LLMMessage]:
|
||||
"""Message history from state, coerced to LLM-shaped dicts."""
|
||||
return [
|
||||
message_to_llm_dict(message)
|
||||
for message in get_conversation_messages(cast("Flow[Any]", self))
|
||||
for message in get_conversation_messages(cast(Any, self))
|
||||
]
|
||||
|
||||
def receive_user_message(
|
||||
@@ -514,7 +553,7 @@ class _ConversationalMixin:
|
||||
``state.messages`` and preserve ``last_intent`` across turns.
|
||||
Non-conversational flows fall through to the legacy helper.
|
||||
"""
|
||||
if self.conversational:
|
||||
if self._is_conversational_enabled():
|
||||
state = cast(ConversationState, self.state)
|
||||
state.messages.append(ConversationMessage(role="user", content=text))
|
||||
self._emit_conversation_message_added(
|
||||
@@ -535,9 +574,7 @@ class _ConversationalMixin:
|
||||
return intent
|
||||
return text
|
||||
|
||||
return _receive_user_message(
|
||||
cast("Flow[Any]", self), text, outcomes=outcomes, llm=llm
|
||||
)
|
||||
return _receive_user_message(cast(Any, self), text, outcomes=outcomes, llm=llm)
|
||||
|
||||
def classify_intent(
|
||||
self,
|
||||
@@ -561,27 +598,104 @@ class _ConversationalMixin:
|
||||
def _conversation_config(self) -> ConversationConfig | None:
|
||||
return getattr(type(self), "conversational_config", None)
|
||||
|
||||
@property
|
||||
def _conversation_definition(self) -> Any | None:
|
||||
return self._conversation_flow_definition().conversational
|
||||
|
||||
def _conversation_flow_definition(self) -> Any:
|
||||
flow_definition = getattr(type(self), "flow_definition", None)
|
||||
if not callable(flow_definition):
|
||||
raise AttributeError(
|
||||
f"{type(self).__name__} does not expose flow_definition()"
|
||||
)
|
||||
return flow_definition()
|
||||
|
||||
@classmethod
|
||||
def _conversational_definition(cls) -> Any | None:
|
||||
flow_definition = getattr(cls, "flow_definition", None)
|
||||
if not callable(flow_definition):
|
||||
return None
|
||||
return flow_definition().conversational
|
||||
|
||||
@classmethod
|
||||
def _is_conversational(cls) -> bool:
|
||||
definition = cls._conversational_definition()
|
||||
return bool(definition and definition.enabled)
|
||||
|
||||
def _is_conversational_enabled(self) -> bool:
|
||||
definition = self._conversation_definition
|
||||
return bool(definition and definition.enabled)
|
||||
|
||||
def _initialize_runtime_extension_attrs(self) -> None:
|
||||
if not isinstance(getattr(self, "_conversation_messages", None), list):
|
||||
object.__setattr__(self, "_conversation_messages", [])
|
||||
if not hasattr(self, "_pending_user_message"):
|
||||
object.__setattr__(self, "_pending_user_message", None)
|
||||
if not hasattr(self, "_pending_intents"):
|
||||
object.__setattr__(self, "_pending_intents", None)
|
||||
if not hasattr(self, "_pending_intent_llm"):
|
||||
object.__setattr__(self, "_pending_intent_llm", None)
|
||||
|
||||
def _create_default_extension_state(self) -> ConversationState | None:
|
||||
initial_state_t = getattr(self, "_initial_state_t", None)
|
||||
if type(self)._is_conversational() and (
|
||||
not hasattr(self, "_initial_state_t")
|
||||
or isinstance(initial_state_t, TypeVar)
|
||||
):
|
||||
return ConversationState()
|
||||
return None
|
||||
|
||||
def _should_apply_pending_kickoff_context(self) -> bool:
|
||||
return (
|
||||
type(self)._is_conversational() and self._pending_user_message is not None
|
||||
)
|
||||
|
||||
def _apply_pending_kickoff_context(self) -> None:
|
||||
self._apply_pending_conversational_turn()
|
||||
|
||||
def _order_start_methods_for_kickoff(
|
||||
self,
|
||||
start_methods: list[Any],
|
||||
) -> tuple[list[Any], bool]:
|
||||
if not type(self)._is_conversational():
|
||||
return start_methods, False
|
||||
|
||||
route_conversation = "route_conversation"
|
||||
if route_conversation not in {str(method) for method in start_methods}:
|
||||
return start_methods, False
|
||||
|
||||
ordered_starts = [
|
||||
method for method in start_methods if str(method) != route_conversation
|
||||
]
|
||||
ordered_starts.append(
|
||||
next(
|
||||
method for method in start_methods if str(method) == route_conversation
|
||||
)
|
||||
)
|
||||
return ordered_starts, True
|
||||
|
||||
def _should_defer_trace_finalization(self) -> bool:
|
||||
"""Whether per-turn ``FlowFinished`` + ``finalize_batch`` should be skipped.
|
||||
|
||||
True when either:
|
||||
- ``flow.defer_trace_finalization`` is set on the instance, OR
|
||||
- the class-level ``ConversationConfig.defer_trace_finalization``
|
||||
on a conversational subclass is True.
|
||||
- the static conversational definition enables deferred finalization.
|
||||
|
||||
Either source enables the deferred-session pattern. The caller
|
||||
eventually invokes ``finalize_session_traces()`` to close the batch.
|
||||
"""
|
||||
if getattr(self, "defer_trace_finalization", False):
|
||||
return True
|
||||
config = self._conversation_config
|
||||
return bool(config and config.defer_trace_finalization)
|
||||
definition = self._conversation_definition
|
||||
return bool(
|
||||
definition and definition.enabled and definition.defer_trace_finalization
|
||||
)
|
||||
|
||||
def _reset_turn_execution_state(self) -> None:
|
||||
"""Clear per-execution tracking so the next turn re-runs the graph."""
|
||||
self._completed_methods.clear()
|
||||
self._method_outputs.clear()
|
||||
self._pending_and_listeners.clear()
|
||||
self._pending_events.clear()
|
||||
self._method_call_counts.clear()
|
||||
self._clear_or_listeners()
|
||||
self._is_execution_resuming = False
|
||||
@@ -733,11 +847,12 @@ class _ConversationalMixin:
|
||||
router_config: RouterConfig | None,
|
||||
) -> dict[str, str]:
|
||||
label_to_method: dict[str, str] = {}
|
||||
for listener_name, condition in self._listeners.items():
|
||||
if isinstance(condition, tuple):
|
||||
_, trigger_labels = condition
|
||||
for trigger_label in trigger_labels:
|
||||
label_to_method.setdefault(str(trigger_label), str(listener_name))
|
||||
flow_definition = self._conversation_flow_definition()
|
||||
for listener_name, method_definition in flow_definition.methods.items():
|
||||
if method_definition.listen is None or method_definition.router:
|
||||
continue
|
||||
for trigger_label in _iter_condition_labels(method_definition.listen):
|
||||
label_to_method.setdefault(trigger_label, listener_name)
|
||||
|
||||
routes = self._effective_routes(router_config)
|
||||
overrides = (
|
||||
@@ -788,21 +903,31 @@ class _ConversationalMixin:
|
||||
|
||||
def _valid_route_labels(self) -> set[str]:
|
||||
labels: set[str] = set()
|
||||
for condition in self._listeners.values():
|
||||
if isinstance(condition, tuple):
|
||||
_, methods = condition
|
||||
labels.update(str(method) for method in methods)
|
||||
flow_definition = self._conversation_flow_definition()
|
||||
for method_definition in flow_definition.methods.values():
|
||||
if method_definition.listen is None or method_definition.router:
|
||||
continue
|
||||
labels.update(_iter_condition_labels(method_definition.listen))
|
||||
return labels
|
||||
|
||||
def _effective_routes(self, router_config: RouterConfig | None = None) -> set[str]:
|
||||
custom_routes = set(router_config.routes or ()) if router_config else set()
|
||||
definition = self._conversation_definition
|
||||
builtin_routes = (
|
||||
tuple(definition.builtin_routes)
|
||||
if definition is not None
|
||||
else self.builtin_routes
|
||||
)
|
||||
internal_routes = (
|
||||
tuple(definition.internal_routes)
|
||||
if definition is not None
|
||||
else self.internal_routes
|
||||
)
|
||||
if not custom_routes:
|
||||
custom_routes = (
|
||||
self._valid_route_labels()
|
||||
- set(self.builtin_routes)
|
||||
- set(self.internal_routes)
|
||||
self._valid_route_labels() - set(builtin_routes) - set(internal_routes)
|
||||
)
|
||||
return custom_routes | set(self.builtin_routes)
|
||||
return custom_routes | set(builtin_routes)
|
||||
|
||||
def _default_conversation_llm(self) -> Any | None:
|
||||
config = self._conversation_config
|
||||
@@ -931,12 +1056,15 @@ class _ConversationalMixin:
|
||||
|
||||
trace_listener = TraceCollectionListener()
|
||||
batch_manager = trace_listener.batch_manager
|
||||
if batch_manager.batch_owner_type == "flow":
|
||||
if trace_listener.first_time_handler.is_first_time:
|
||||
trace_listener.first_time_handler.mark_events_collected()
|
||||
trace_listener.first_time_handler.handle_execution_completion()
|
||||
else:
|
||||
batch_manager.finalize_batch()
|
||||
try:
|
||||
if batch_manager.batch_owner_type == "flow":
|
||||
if trace_listener.first_time_handler.is_first_time:
|
||||
trace_listener.first_time_handler.mark_events_collected()
|
||||
trace_listener.first_time_handler.handle_execution_completion()
|
||||
else:
|
||||
batch_manager.finalize_batch()
|
||||
finally:
|
||||
batch_manager.defer_session_finalization = False
|
||||
|
||||
|
||||
__all__ = ["_ConversationalMixin"]
|
||||
|
||||
48
lib/crewai/src/crewai/flow/conversational_definition.py
Normal file
48
lib/crewai/src/crewai/flow/conversational_definition.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Static conversational Flow definition models.
|
||||
|
||||
This module is part of the serializable Flow Definition contract. It should
|
||||
only contain static data shapes. Experimental conversational runtime behavior
|
||||
continues to live in ``crewai.experimental.conversational_mixin``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class FlowConversationalRouterDefinition(BaseModel):
|
||||
"""Static conversational router configuration."""
|
||||
|
||||
prompt: str | None = None
|
||||
response_format: Any = None
|
||||
llm: Any = None
|
||||
routes: list[str] | None = None
|
||||
route_descriptions: dict[str, str] | None = None
|
||||
default_intent: str | None = "converse"
|
||||
fallback_intent: str | None = "converse"
|
||||
intent_field: str = "intent"
|
||||
|
||||
|
||||
class FlowConversationalDefinition(BaseModel):
|
||||
"""Static conversational Flow configuration."""
|
||||
|
||||
enabled: bool = False
|
||||
system_prompt: str | None = None
|
||||
llm: Any = None
|
||||
router: FlowConversationalRouterDefinition | None = None
|
||||
answer_from_history_prompt: str | None = None
|
||||
default_intents: list[str] | None = None
|
||||
intent_llm: Any = None
|
||||
answer_from_history_llm: Any = None
|
||||
visible_agent_outputs: list[str] | Literal["all"] | None = None
|
||||
defer_trace_finalization: bool = True
|
||||
builtin_routes: list[str] = Field(default_factory=lambda: ["converse", "end"])
|
||||
internal_routes: list[str] = Field(default_factory=lambda: ["answer_from_history"])
|
||||
|
||||
|
||||
__all__ = [
|
||||
"FlowConversationalDefinition",
|
||||
"FlowConversationalRouterDefinition",
|
||||
]
|
||||
@@ -3,11 +3,10 @@ from __future__ import annotations
|
||||
from collections.abc import Callable, Sequence
|
||||
from typing import TYPE_CHECKING, Any, TypeVar
|
||||
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
from crewai.flow.human_feedback import (
|
||||
HumanFeedbackConfig,
|
||||
HumanFeedbackResult,
|
||||
_build_human_feedback_runtime_decorator,
|
||||
_validate_human_feedback_options,
|
||||
)
|
||||
|
||||
|
||||
@@ -21,32 +20,6 @@ F = TypeVar("F", bound=Callable[..., Any])
|
||||
__all__ = ["HumanFeedbackResult", "human_feedback"]
|
||||
|
||||
|
||||
def _stamp_human_feedback_metadata(
|
||||
wrapper: Any,
|
||||
func: Callable[..., Any],
|
||||
config: HumanFeedbackConfig,
|
||||
) -> None:
|
||||
for attr in [
|
||||
"__is_flow_method__",
|
||||
"__flow_persistence_config__",
|
||||
"__flow_method_definition__",
|
||||
]:
|
||||
if hasattr(func, attr):
|
||||
setattr(wrapper, attr, getattr(func, attr))
|
||||
|
||||
wrapper.__human_feedback_config__ = config
|
||||
wrapper.__is_flow_method__ = True
|
||||
|
||||
if config.emit:
|
||||
fragment = getattr(wrapper, "__flow_method_definition__", None)
|
||||
if isinstance(fragment, FlowMethodDefinition):
|
||||
wrapper.__flow_method_definition__ = fragment.model_copy(
|
||||
update={"router": True, "emit": list(config.emit)}
|
||||
)
|
||||
|
||||
wrapper._human_feedback_llm = config.llm
|
||||
|
||||
|
||||
def human_feedback(
|
||||
message: str,
|
||||
emit: Sequence[str] | None = None,
|
||||
@@ -58,21 +31,18 @@ def human_feedback(
|
||||
learn_source: str = "hitl",
|
||||
learn_strict: bool = False,
|
||||
) -> Callable[[F], F]:
|
||||
"""Decorator for Flow methods that require human feedback."""
|
||||
runtime_decorator = _build_human_feedback_runtime_decorator(
|
||||
message=message,
|
||||
emit=emit,
|
||||
llm=llm,
|
||||
default_outcome=default_outcome,
|
||||
metadata=metadata,
|
||||
provider=provider,
|
||||
learn=learn,
|
||||
learn_source=learn_source,
|
||||
learn_strict=learn_strict,
|
||||
"""Decorator for Flow methods that require human feedback.
|
||||
|
||||
The decorator is a pure metadata stamper: it records the feedback
|
||||
configuration on the method, and the Flow engine collects and routes
|
||||
feedback after the method completes, driven by the flow's definition.
|
||||
"""
|
||||
_validate_human_feedback_options(
|
||||
emit=emit, llm=llm, default_outcome=default_outcome
|
||||
)
|
||||
config = HumanFeedbackConfig(
|
||||
message=message,
|
||||
emit=emit,
|
||||
emit=list(emit) if emit is not None else None,
|
||||
llm=llm,
|
||||
default_outcome=default_outcome,
|
||||
metadata=metadata,
|
||||
@@ -83,8 +53,7 @@ def human_feedback(
|
||||
)
|
||||
|
||||
def decorator(func: F) -> F:
|
||||
wrapper = runtime_decorator(func)
|
||||
_stamp_human_feedback_metadata(wrapper, func, config)
|
||||
return wrapper
|
||||
func.__human_feedback_config__ = config # type: ignore[attr-defined]
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
|
||||
from crewai.flow.dsl._utils import (
|
||||
P,
|
||||
R,
|
||||
_method_action,
|
||||
_set_flow_method_definition,
|
||||
)
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
@@ -45,7 +46,11 @@ def listen(condition: FlowTrigger) -> FlowMethodDecorator:
|
||||
wrapper = ListenMethod(func)
|
||||
|
||||
_set_flow_method_definition(
|
||||
wrapper, FlowMethodDefinition(listen=_to_definition_condition(condition))
|
||||
wrapper,
|
||||
FlowMethodDefinition(
|
||||
do=_method_action(func),
|
||||
listen=_to_definition_condition(condition),
|
||||
),
|
||||
)
|
||||
return wrapper
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
|
||||
from crewai.flow.dsl._utils import (
|
||||
P,
|
||||
R,
|
||||
_method_action,
|
||||
_set_flow_method_definition,
|
||||
)
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
@@ -148,6 +149,7 @@ def router(
|
||||
_set_flow_method_definition(
|
||||
wrapper,
|
||||
FlowMethodDefinition(
|
||||
do=_method_action(func),
|
||||
listen=_to_definition_condition(condition),
|
||||
router=True,
|
||||
emit=router_events or None,
|
||||
|
||||
@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
|
||||
from crewai.flow.dsl._utils import (
|
||||
P,
|
||||
R,
|
||||
_method_action,
|
||||
_set_flow_method_definition,
|
||||
)
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
@@ -53,13 +54,17 @@ def start(
|
||||
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
|
||||
wrapper = StartMethod(func)
|
||||
|
||||
if condition is not None:
|
||||
_set_flow_method_definition(
|
||||
wrapper,
|
||||
FlowMethodDefinition(start=_to_definition_condition(condition)),
|
||||
)
|
||||
else:
|
||||
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
|
||||
_set_flow_method_definition(
|
||||
wrapper,
|
||||
FlowMethodDefinition(
|
||||
do=_method_action(func),
|
||||
start=(
|
||||
_to_definition_condition(condition)
|
||||
if condition is not None
|
||||
else True
|
||||
),
|
||||
),
|
||||
)
|
||||
return wrapper
|
||||
|
||||
return cast(FlowMethodDecorator, decorator)
|
||||
|
||||
@@ -8,13 +8,17 @@ from pydantic import BaseModel
|
||||
from typing_extensions import TypeIs
|
||||
|
||||
from crewai.flow.flow_definition import (
|
||||
FlowActionDefinition,
|
||||
FlowConfigDefinition,
|
||||
FlowConversationalDefinition,
|
||||
FlowConversationalRouterDefinition,
|
||||
FlowDefinition,
|
||||
FlowDefinitionDiagnostic,
|
||||
FlowHumanFeedbackDefinition,
|
||||
FlowMethodDefinition,
|
||||
FlowPersistenceDefinition,
|
||||
FlowStateDefinition,
|
||||
_object_ref,
|
||||
)
|
||||
from crewai.flow.flow_wrappers import (
|
||||
FlowMethod,
|
||||
@@ -27,13 +31,17 @@ R = TypeVar("R")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_FLOW_METHOD_DEFINITION_ATTR = "__flow_method_definition__"
|
||||
_FLOW_METHOD_METADATA_ATTRS = [
|
||||
"__conversational_only__",
|
||||
"__flow_method_definition__",
|
||||
"__flow_persistence_config__",
|
||||
"__human_feedback_config__",
|
||||
]
|
||||
|
||||
|
||||
def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]:
|
||||
"""Check if the object carries Flow method wrapper metadata."""
|
||||
return hasattr(obj, "__is_flow_method__") or hasattr(
|
||||
obj, _FLOW_METHOD_DEFINITION_ATTR
|
||||
)
|
||||
return hasattr(obj, _FLOW_METHOD_DEFINITION_ATTR)
|
||||
|
||||
|
||||
def _should_include_flow_method(flow_class: type, method: Any) -> bool:
|
||||
@@ -42,6 +50,42 @@ def _should_include_flow_method(flow_class: type, method: Any) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def _is_conversational_flow(flow_class: type) -> bool:
|
||||
return bool(getattr(flow_class, "conversational", False))
|
||||
|
||||
|
||||
def _get_inherited_conversational_method(
|
||||
flow_class: type,
|
||||
attr_name: str,
|
||||
) -> Any | None:
|
||||
if not _is_conversational_flow(flow_class):
|
||||
return None
|
||||
|
||||
for base in flow_class.__mro__[1:]:
|
||||
inherited = base.__dict__.get(attr_name)
|
||||
if inherited is None:
|
||||
continue
|
||||
if getattr(inherited, "__conversational_only__", False) and is_flow_method(
|
||||
inherited
|
||||
):
|
||||
return inherited
|
||||
return None
|
||||
|
||||
|
||||
def _stamp_inherited_conversational_metadata(
|
||||
method: Any,
|
||||
inherited: Any,
|
||||
) -> Any:
|
||||
for attr in _FLOW_METHOD_METADATA_ATTRS:
|
||||
if hasattr(inherited, attr):
|
||||
setattr(method, attr, getattr(inherited, attr))
|
||||
return method
|
||||
|
||||
|
||||
def _method_action(method: Any) -> FlowActionDefinition:
|
||||
return FlowActionDefinition(ref=f"{method.__module__}:{method.__qualname__}")
|
||||
|
||||
|
||||
def _set_flow_method_definition(
|
||||
wrapper: FlowMethod[P, R],
|
||||
definition: FlowMethodDefinition,
|
||||
@@ -58,13 +102,6 @@ def _get_flow_method_definition(method: Any) -> FlowMethodDefinition | None:
|
||||
return None
|
||||
|
||||
|
||||
def _object_ref(value: Any) -> str:
|
||||
target = value if isinstance(value, type) else type(value)
|
||||
module = getattr(target, "__module__", "")
|
||||
qualname = getattr(target, "__qualname__", getattr(target, "__name__", ""))
|
||||
return f"{module}:{qualname}" if module and qualname else repr(value)
|
||||
|
||||
|
||||
def _is_json_serializable(value: Any) -> bool:
|
||||
try:
|
||||
json.dumps(value)
|
||||
@@ -135,6 +172,8 @@ def _build_state_definition(
|
||||
from pydantic import BaseModel as PydanticBaseModel
|
||||
|
||||
state_value = getattr(flow_class, "_initial_state_t", None)
|
||||
if isinstance(state_value, TypeVar):
|
||||
state_value = None
|
||||
initial_state = getattr(flow_class, "initial_state", None)
|
||||
if initial_state is not None:
|
||||
state_value = initial_state
|
||||
@@ -170,16 +209,22 @@ def _build_config_definition(
|
||||
) -> FlowConfigDefinition:
|
||||
config_field_names = set(FlowConfigDefinition.model_fields)
|
||||
field_defaults = {
|
||||
name: field.default
|
||||
name: field.get_default(call_default_factory=True)
|
||||
for name, field in getattr(flow_class, "model_fields", {}).items()
|
||||
if name in config_field_names
|
||||
}
|
||||
values: dict[str, Any] = {}
|
||||
for field_name, default in field_defaults.items():
|
||||
value = getattr(flow_class, field_name, default)
|
||||
values[field_name] = _serialize_static_value(
|
||||
value, diagnostics, f"config.{field_name}"
|
||||
)
|
||||
if field_name == "input_provider":
|
||||
# A string value is already a ref; only live objects degrade.
|
||||
values[field_name] = (
|
||||
value if value is None or isinstance(value, str) else _object_ref(value)
|
||||
)
|
||||
else:
|
||||
values[field_name] = _serialize_static_value(
|
||||
value, diagnostics, f"config.{field_name}"
|
||||
)
|
||||
return FlowConfigDefinition(**values)
|
||||
|
||||
|
||||
@@ -195,38 +240,123 @@ def _build_human_feedback_definition(
|
||||
return FlowHumanFeedbackDefinition(
|
||||
message=str(config.message),
|
||||
emit=[str(value) for value in emit] if emit is not None else None,
|
||||
llm=_serialize_static_value(
|
||||
getattr(config, "llm", None), diagnostics, f"{path}.llm"
|
||||
),
|
||||
# llm and provider stay live: the engine consumes them in-process and
|
||||
# the contract degrades them to serializable forms at JSON dump time.
|
||||
llm=getattr(config, "llm", None),
|
||||
default_outcome=getattr(config, "default_outcome", None),
|
||||
metadata=_serialize_static_value(
|
||||
getattr(config, "metadata", None), diagnostics, f"{path}.metadata"
|
||||
),
|
||||
provider=_serialize_static_value(
|
||||
getattr(config, "provider", None), diagnostics, f"{path}.provider"
|
||||
),
|
||||
provider=getattr(config, "provider", None),
|
||||
learn=bool(getattr(config, "learn", False)),
|
||||
learn_source=str(getattr(config, "learn_source", "hitl")),
|
||||
learn_strict=bool(getattr(config, "learn_strict", False)),
|
||||
)
|
||||
|
||||
|
||||
def _build_persistence_definition(
|
||||
value: Any,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
path: str,
|
||||
) -> FlowPersistenceDefinition | None:
|
||||
def _build_persistence_definition(value: Any) -> FlowPersistenceDefinition | None:
|
||||
config = getattr(value, "__flow_persistence_config__", None)
|
||||
if config is None:
|
||||
return None
|
||||
persistence = getattr(config, "persistence", None)
|
||||
verbose = bool(getattr(config, "verbose", False))
|
||||
return FlowPersistenceDefinition(
|
||||
enabled=True,
|
||||
verbose=verbose,
|
||||
persistence=_serialize_static_value(
|
||||
persistence, diagnostics, f"{path}.persistence"
|
||||
verbose=bool(getattr(config, "verbose", False)),
|
||||
# The backend stays live: the engine persists through the exact
|
||||
# instance the user configured; the contract degrades it to a
|
||||
# serialized config at JSON dump time.
|
||||
persistence=getattr(config, "persistence", None),
|
||||
)
|
||||
|
||||
|
||||
def _build_conversational_router_definition(
|
||||
router_config: Any,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
path: str,
|
||||
) -> FlowConversationalRouterDefinition | None:
|
||||
if router_config is None:
|
||||
return None
|
||||
|
||||
routes = getattr(router_config, "routes", None)
|
||||
return FlowConversationalRouterDefinition(
|
||||
prompt=getattr(router_config, "prompt", None),
|
||||
response_format=_serialize_static_value(
|
||||
getattr(router_config, "response_format", None),
|
||||
diagnostics,
|
||||
f"{path}.response_format",
|
||||
),
|
||||
llm=_serialize_static_value(
|
||||
getattr(router_config, "llm", None), diagnostics, f"{path}.llm"
|
||||
),
|
||||
routes=[str(route) for route in routes] if routes is not None else None,
|
||||
route_descriptions=getattr(router_config, "route_descriptions", None),
|
||||
default_intent=getattr(router_config, "default_intent", "converse"),
|
||||
fallback_intent=getattr(router_config, "fallback_intent", "converse"),
|
||||
intent_field=str(getattr(router_config, "intent_field", "intent")),
|
||||
)
|
||||
|
||||
|
||||
def _build_conversational_definition(
|
||||
flow_class: type,
|
||||
diagnostics: list[FlowDefinitionDiagnostic],
|
||||
) -> FlowConversationalDefinition | None:
|
||||
if not _is_conversational_flow(flow_class):
|
||||
return None
|
||||
|
||||
config = getattr(flow_class, "conversational_config", None)
|
||||
builtin_routes = getattr(flow_class, "builtin_routes", ("converse", "end"))
|
||||
internal_routes = getattr(
|
||||
flow_class,
|
||||
"internal_routes",
|
||||
("answer_from_history",),
|
||||
)
|
||||
if config is None:
|
||||
return FlowConversationalDefinition(
|
||||
enabled=True,
|
||||
builtin_routes=[str(route) for route in builtin_routes],
|
||||
internal_routes=[str(route) for route in internal_routes],
|
||||
)
|
||||
|
||||
default_intents = getattr(config, "default_intents", None)
|
||||
visible_agent_outputs = getattr(config, "visible_agent_outputs", None)
|
||||
return FlowConversationalDefinition(
|
||||
enabled=True,
|
||||
system_prompt=getattr(config, "system_prompt", None),
|
||||
llm=_serialize_static_value(
|
||||
getattr(config, "llm", None), diagnostics, "conversational.llm"
|
||||
),
|
||||
router=_build_conversational_router_definition(
|
||||
getattr(config, "router", None),
|
||||
diagnostics,
|
||||
"conversational.router",
|
||||
),
|
||||
answer_from_history_prompt=getattr(config, "answer_from_history_prompt", None),
|
||||
default_intents=(
|
||||
[str(intent) for intent in default_intents]
|
||||
if default_intents is not None
|
||||
else None
|
||||
),
|
||||
intent_llm=_serialize_static_value(
|
||||
getattr(config, "intent_llm", None),
|
||||
diagnostics,
|
||||
"conversational.intent_llm",
|
||||
),
|
||||
answer_from_history_llm=_serialize_static_value(
|
||||
getattr(config, "answer_from_history_llm", None),
|
||||
diagnostics,
|
||||
"conversational.answer_from_history_llm",
|
||||
),
|
||||
visible_agent_outputs=(
|
||||
"all"
|
||||
if visible_agent_outputs == "all"
|
||||
else [str(output) for output in visible_agent_outputs]
|
||||
if visible_agent_outputs is not None
|
||||
else None
|
||||
),
|
||||
defer_trace_finalization=bool(
|
||||
getattr(config, "defer_trace_finalization", True)
|
||||
),
|
||||
builtin_routes=[str(route) for route in builtin_routes],
|
||||
internal_routes=[str(route) for route in internal_routes],
|
||||
)
|
||||
|
||||
|
||||
@@ -237,9 +367,11 @@ def _build_method_definition(
|
||||
) -> FlowMethodDefinition:
|
||||
fragment = _get_flow_method_definition(method)
|
||||
if fragment is None:
|
||||
method_definition = FlowMethodDefinition()
|
||||
method_definition = FlowMethodDefinition(do=_method_action(method))
|
||||
else:
|
||||
method_definition = fragment.model_copy(deep=True)
|
||||
method_definition = fragment.model_copy(
|
||||
deep=True, update={"do": _method_action(method)}
|
||||
)
|
||||
|
||||
human_feedback = _build_human_feedback_definition(
|
||||
method, diagnostics, f"{path}.human_feedback"
|
||||
@@ -250,9 +382,7 @@ def _build_method_definition(
|
||||
method_definition.router = True
|
||||
method_definition.emit = None
|
||||
|
||||
method_definition.persist = _build_persistence_definition(
|
||||
method, diagnostics, f"{path}.persist"
|
||||
)
|
||||
method_definition.persist = _build_persistence_definition(method)
|
||||
|
||||
return method_definition
|
||||
|
||||
@@ -270,6 +400,29 @@ def _iter_flow_methods(flow_class: type) -> dict[str, Any]:
|
||||
flow_class, attr_value
|
||||
):
|
||||
methods[attr_name] = attr_value
|
||||
continue
|
||||
|
||||
inherited = _get_inherited_conversational_method(flow_class, attr_name)
|
||||
if inherited is not None and callable(attr_value):
|
||||
methods[attr_name] = _stamp_inherited_conversational_metadata(
|
||||
attr_value, inherited
|
||||
)
|
||||
|
||||
if _is_conversational_flow(flow_class):
|
||||
for base in reversed(flow_class.__mro__[1:]):
|
||||
for attr_name, raw_value in base.__dict__.items():
|
||||
if attr_name.startswith("_") or attr_name in methods:
|
||||
continue
|
||||
if not getattr(raw_value, "__conversational_only__", False):
|
||||
continue
|
||||
try:
|
||||
attr_value = getattr(flow_class, attr_name)
|
||||
except AttributeError:
|
||||
continue
|
||||
if is_flow_method(attr_value) and _should_include_flow_method(
|
||||
flow_class, attr_value
|
||||
):
|
||||
methods[attr_name] = attr_value
|
||||
|
||||
# A wrapped method whose name collides with a base Flow model field
|
||||
# (e.g. ``checkpoint``) is absorbed by Pydantic as a field; the underlying
|
||||
@@ -313,7 +466,8 @@ def _build_flow_definition_from_class(
|
||||
description=description,
|
||||
state=_build_state_definition(flow_class, diagnostics),
|
||||
config=_build_config_definition(flow_class, diagnostics),
|
||||
persist=_build_persistence_definition(flow_class, diagnostics, "persist"),
|
||||
persist=_build_persistence_definition(flow_class),
|
||||
conversational=_build_conversational_definition(flow_class, diagnostics),
|
||||
methods=methods,
|
||||
diagnostics=diagnostics,
|
||||
)
|
||||
|
||||
@@ -6,15 +6,22 @@ The implementation now lives in three modules, split by concern:
|
||||
``@router``, ``or_`` / ``and_``) and Python Flow class projection
|
||||
- ``crewai.flow.flow_definition`` -- the serializable Flow Definition contract
|
||||
- ``crewai.flow.runtime`` -- the Flow execution engine and state
|
||||
- ``crewai.experimental.conversational_mixin`` -- experimental conversational
|
||||
runtime extension composed onto the public ``Flow`` class
|
||||
|
||||
Prefer importing from those modules in new code; this module preserves the
|
||||
historical ``crewai.flow.flow`` import path.
|
||||
"""
|
||||
|
||||
from typing import Any, TypeVar
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.experimental.conversational_mixin import _ConversationalMixin
|
||||
from crewai.flow.dsl import and_, listen, or_, router, start
|
||||
from crewai.flow.runtime import (
|
||||
_INITIAL_STATE_CLASS_MARKER,
|
||||
Flow,
|
||||
Flow as RuntimeFlow,
|
||||
FlowMeta,
|
||||
FlowState,
|
||||
LockedDictProxy,
|
||||
@@ -23,6 +30,13 @@ from crewai.flow.runtime import (
|
||||
)
|
||||
|
||||
|
||||
T = TypeVar("T", bound=dict[str, Any] | BaseModel)
|
||||
|
||||
|
||||
class Flow(_ConversationalMixin, RuntimeFlow[T]):
|
||||
"""Public Flow class with experimental conversational extension behavior."""
|
||||
|
||||
|
||||
__all__ = [
|
||||
"_INITIAL_STATE_CLASS_MARKER",
|
||||
"Flow",
|
||||
|
||||
@@ -15,6 +15,10 @@ current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
|
||||
"flow_id", default=None
|
||||
)
|
||||
|
||||
current_flow_defer_trace_finalization: contextvars.ContextVar[bool] = (
|
||||
contextvars.ContextVar("flow_defer_trace_finalization", default=False)
|
||||
)
|
||||
|
||||
current_flow_method_name: contextvars.ContextVar[str] = contextvars.ContextVar(
|
||||
"flow_method_name", default="unknown"
|
||||
)
|
||||
|
||||
@@ -13,16 +13,24 @@ import json
|
||||
import logging
|
||||
from typing import Any, Literal as TypingLiteral
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
from pydantic import BaseModel, ConfigDict, Field, field_serializer, model_validator
|
||||
import yaml
|
||||
|
||||
from crewai.flow.conversational_definition import (
|
||||
FlowConversationalDefinition,
|
||||
FlowConversationalRouterDefinition,
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
FlowDefinitionCondition = str | dict[str, Any]
|
||||
|
||||
__all__ = [
|
||||
"FlowActionDefinition",
|
||||
"FlowConfigDefinition",
|
||||
"FlowConversationalDefinition",
|
||||
"FlowConversationalRouterDefinition",
|
||||
"FlowDefinition",
|
||||
"FlowDefinitionCondition",
|
||||
"FlowDefinitionDiagnostic",
|
||||
@@ -33,6 +41,14 @@ __all__ = [
|
||||
]
|
||||
|
||||
|
||||
def _object_ref(value: Any) -> str:
|
||||
"""Format a class or instance as the canonical ``module:qualname`` ref."""
|
||||
target = value if isinstance(value, type) else type(value)
|
||||
module = getattr(target, "__module__", "")
|
||||
qualname = getattr(target, "__qualname__", getattr(target, "__name__", ""))
|
||||
return f"{module}:{qualname}" if module and qualname else repr(value)
|
||||
|
||||
|
||||
class FlowDefinitionDiagnostic(BaseModel):
|
||||
"""A non-fatal Flow Definition build or validation diagnostic."""
|
||||
|
||||
@@ -45,9 +61,10 @@ class FlowDefinitionDiagnostic(BaseModel):
|
||||
class FlowStateDefinition(BaseModel):
|
||||
"""Static description of a Flow state contract."""
|
||||
|
||||
type: TypingLiteral["dict", "pydantic", "unknown"] = "dict"
|
||||
type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict"
|
||||
ref: str | None = None
|
||||
default: Any = None
|
||||
json_schema: dict[str, Any] | None = None
|
||||
default: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class FlowConfigDefinition(BaseModel):
|
||||
@@ -55,22 +72,50 @@ class FlowConfigDefinition(BaseModel):
|
||||
|
||||
tracing: bool | None = None
|
||||
stream: bool = False
|
||||
memory: Any = None
|
||||
input_provider: Any = None
|
||||
memory: dict[str, Any] | None = None
|
||||
input_provider: str | None = None
|
||||
suppress_flow_events: bool = False
|
||||
max_method_calls: int = 100
|
||||
defer_trace_finalization: bool = False
|
||||
checkpoint: bool | dict[str, Any] | None = None
|
||||
|
||||
|
||||
class FlowPersistenceDefinition(BaseModel):
|
||||
"""Static persistence configuration."""
|
||||
"""Static persistence configuration.
|
||||
|
||||
``persistence`` may hold a live backend when the definition is built from
|
||||
a decorated class — the engine then persists through the exact instance
|
||||
the user configured; the JSON/YAML projection degrades it to its
|
||||
serialized config.
|
||||
"""
|
||||
|
||||
enabled: bool = False
|
||||
verbose: bool = False
|
||||
persistence: Any = None
|
||||
|
||||
@field_serializer("persistence", when_used="json")
|
||||
def _serialize_persistence(self, value: Any) -> Any:
|
||||
if value is None or isinstance(value, dict):
|
||||
return value
|
||||
if isinstance(value, BaseModel):
|
||||
try:
|
||||
return value.model_dump(mode="json")
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Persistence backend %s is not fully serializable; "
|
||||
"preserved import reference only.",
|
||||
_object_ref(value),
|
||||
)
|
||||
return {"ref": _object_ref(value)}
|
||||
|
||||
|
||||
class FlowHumanFeedbackDefinition(BaseModel):
|
||||
"""Static human feedback configuration."""
|
||||
"""Static human feedback configuration.
|
||||
|
||||
``llm`` and ``provider`` may hold live Python objects when the definition
|
||||
is built from a decorated class; the JSON/YAML projection degrades them to
|
||||
a serialized config (``llm``) or a ``module:qualname`` ref (``provider``).
|
||||
"""
|
||||
|
||||
message: str
|
||||
emit: list[str] | None = None
|
||||
@@ -82,10 +127,32 @@ class FlowHumanFeedbackDefinition(BaseModel):
|
||||
learn_source: str = "hitl"
|
||||
learn_strict: bool = False
|
||||
|
||||
@field_serializer("llm", when_used="json")
|
||||
def _serialize_llm(self, value: Any) -> dict[str, Any] | str | None:
|
||||
if value is None or isinstance(value, (str, dict)):
|
||||
return value
|
||||
from crewai.flow.human_feedback import _serialize_llm_for_context
|
||||
|
||||
return _serialize_llm_for_context(value)
|
||||
|
||||
@field_serializer("provider", when_used="json")
|
||||
def _serialize_provider(self, value: Any) -> str | None:
|
||||
if value is None or isinstance(value, str):
|
||||
return value
|
||||
return _object_ref(value)
|
||||
|
||||
|
||||
class FlowActionDefinition(BaseModel):
|
||||
"""What a Flow method node executes, independent of when it fires."""
|
||||
|
||||
call: TypingLiteral["code"] = "code"
|
||||
ref: str
|
||||
|
||||
|
||||
class FlowMethodDefinition(BaseModel):
|
||||
"""Static definition of one Flow method and its execution roles."""
|
||||
|
||||
do: FlowActionDefinition
|
||||
start: bool | FlowDefinitionCondition | None = None
|
||||
listen: FlowDefinitionCondition | None = None
|
||||
router: bool = False
|
||||
@@ -93,6 +160,16 @@ class FlowMethodDefinition(BaseModel):
|
||||
human_feedback: FlowHumanFeedbackDefinition | None = None
|
||||
persist: FlowPersistenceDefinition | None = None
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _canonicalize_human_feedback_routing(self) -> FlowMethodDefinition:
|
||||
# Canonical shape: a method whose human_feedback declares emit
|
||||
# outcomes routes like a router, regardless of how the definition
|
||||
# was authored.
|
||||
if self.human_feedback is not None and self.human_feedback.emit:
|
||||
self.router = True
|
||||
self.emit = None
|
||||
return self
|
||||
|
||||
@property
|
||||
def is_start(self) -> bool:
|
||||
"""Whether this method is a start method.
|
||||
@@ -109,12 +186,15 @@ class FlowDefinition(BaseModel):
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
|
||||
|
||||
schema_: str = Field(default="crewai.flow/v1", alias="schema")
|
||||
schema_: TypingLiteral["crewai.flow/v1"] = Field(
|
||||
default="crewai.flow/v1", alias="schema"
|
||||
)
|
||||
name: str
|
||||
description: str | None = None
|
||||
state: FlowStateDefinition | None = None
|
||||
config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition)
|
||||
persist: FlowPersistenceDefinition | None = None
|
||||
conversational: FlowConversationalDefinition | None = None
|
||||
methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict)
|
||||
diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list)
|
||||
|
||||
|
||||
@@ -83,7 +83,6 @@ class FlowMethod(Generic[P, R]):
|
||||
"__conversational_only__", # gates registration on Flow.conversational
|
||||
"__flow_persistence_config__",
|
||||
"__flow_method_definition__",
|
||||
"_human_feedback_llm", # Live LLM object for HITL resume
|
||||
]:
|
||||
if hasattr(meth, attr):
|
||||
setattr(self, attr, getattr(meth, attr))
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
"""Human feedback decorator for Flow methods.
|
||||
"""Human feedback support for Flow methods.
|
||||
|
||||
This module provides the @human_feedback decorator that enables human-in-the-loop
|
||||
workflows within CrewAI Flows. It allows collecting human feedback on method outputs
|
||||
and optionally routing to different listeners based on the feedback.
|
||||
This module backs the @human_feedback decorator that enables human-in-the-loop
|
||||
workflows within CrewAI Flows. The decorator is a pure metadata stamper: it
|
||||
records a :class:`HumanFeedbackConfig` on the method, the Flow definition
|
||||
builder lifts it into ``FlowHumanFeedbackDefinition``, and the Flow engine
|
||||
collects feedback after each decorated method completes, driven by the flow's
|
||||
definition.
|
||||
|
||||
Supports both synchronous (blocking) and asynchronous (non-blocking) feedback
|
||||
collection through the provider parameter.
|
||||
@@ -55,22 +58,18 @@ Example (asynchronous with custom provider):
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable, Sequence
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from functools import wraps
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, TypeVar
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai.flow.flow_wrappers import FlowMethod
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.async_feedback.types import HumanFeedbackProvider
|
||||
from crewai.flow.flow import Flow
|
||||
from crewai.flow.runtime import Flow
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
|
||||
@@ -160,8 +159,8 @@ class HumanFeedbackResult:
|
||||
class HumanFeedbackConfig:
|
||||
"""Configuration for the @human_feedback decorator.
|
||||
|
||||
Stores the parameters passed to the decorator for later use during
|
||||
method execution and for introspection by visualization tools.
|
||||
Stores the parameters passed to the decorator for later use by the
|
||||
Flow definition builder and for introspection by visualization tools.
|
||||
|
||||
Attributes:
|
||||
message: The message shown to the human when requesting feedback.
|
||||
@@ -183,19 +182,6 @@ class HumanFeedbackConfig:
|
||||
learn_strict: bool = False
|
||||
|
||||
|
||||
class HumanFeedbackMethod(FlowMethod[Any, Any]):
|
||||
"""Wrapper for methods decorated with @human_feedback.
|
||||
|
||||
This wrapper extends FlowMethod to add human feedback specific attributes
|
||||
used by the FlowDefinition builder and runtime feedback handling.
|
||||
|
||||
Attributes:
|
||||
__human_feedback_config__: The HumanFeedbackConfig for this method.
|
||||
"""
|
||||
|
||||
__human_feedback_config__: HumanFeedbackConfig | None = None
|
||||
|
||||
|
||||
class PreReviewResult(BaseModel):
|
||||
"""Structured output from the HITL pre-review LLM call."""
|
||||
|
||||
@@ -217,17 +203,11 @@ class DistilledLessons(BaseModel):
|
||||
)
|
||||
|
||||
|
||||
def _build_human_feedback_runtime_decorator(
|
||||
message: str,
|
||||
emit: Sequence[str] | None = None,
|
||||
llm: str | BaseLLM | None = "gpt-4o-mini",
|
||||
default_outcome: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
provider: HumanFeedbackProvider | None = None,
|
||||
learn: bool = False,
|
||||
learn_source: str = "hitl",
|
||||
learn_strict: bool = False,
|
||||
) -> Callable[[F], F]:
|
||||
def _validate_human_feedback_options(
|
||||
emit: Sequence[str] | None,
|
||||
llm: Any,
|
||||
default_outcome: str | None,
|
||||
) -> None:
|
||||
if emit is not None:
|
||||
if not llm:
|
||||
raise ValueError(
|
||||
@@ -244,295 +224,139 @@ def _build_human_feedback_runtime_decorator(
|
||||
elif default_outcome is not None:
|
||||
raise ValueError("default_outcome requires emit to be specified.")
|
||||
|
||||
def decorator(func: F) -> F:
|
||||
def _get_hitl_prompt(key: str) -> str:
|
||||
from crewai.utilities.i18n import I18N_DEFAULT
|
||||
|
||||
return I18N_DEFAULT.slice(key)
|
||||
def _get_hitl_prompt(key: str) -> str:
|
||||
from crewai.utilities.i18n import I18N_DEFAULT
|
||||
|
||||
def _resolve_llm_instance() -> Any:
|
||||
if llm is None:
|
||||
from crewai.llm import LLM
|
||||
return I18N_DEFAULT.slice(key)
|
||||
|
||||
return LLM(model="gpt-4o-mini")
|
||||
if isinstance(llm, str):
|
||||
from crewai.llm import LLM
|
||||
|
||||
return LLM(model=llm)
|
||||
return llm # already a BaseLLM instance
|
||||
def _resolve_llm_instance(llm: Any) -> Any:
|
||||
from crewai.llm import LLM
|
||||
|
||||
def _pre_review_with_lessons(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> Any:
|
||||
try:
|
||||
mem = flow_instance.memory
|
||||
if mem is None:
|
||||
return method_output
|
||||
query = f"human feedback lessons for {func.__name__}: {method_output!s}"
|
||||
matches = mem.recall(query, source=learn_source)
|
||||
if not matches:
|
||||
return method_output
|
||||
if llm is None:
|
||||
return LLM(model="gpt-4o-mini")
|
||||
if isinstance(llm, str):
|
||||
return LLM(model=llm)
|
||||
if isinstance(llm, dict):
|
||||
deserialized = _deserialize_llm_from_context(llm)
|
||||
return deserialized if deserialized is not None else LLM(model="gpt-4o-mini")
|
||||
return llm # already a BaseLLM instance
|
||||
|
||||
lessons = "\n".join(f"- {m.record.content}" for m in matches)
|
||||
llm_inst = _resolve_llm_instance()
|
||||
prompt = _get_hitl_prompt("hitl_pre_review_user").format(
|
||||
output=str(method_output),
|
||||
lessons=lessons,
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": _get_hitl_prompt("hitl_pre_review_system"),
|
||||
},
|
||||
{"role": "user", "content": prompt},
|
||||
]
|
||||
if getattr(llm_inst, "supports_function_calling", lambda: False)():
|
||||
response = llm_inst.call(messages, response_model=PreReviewResult)
|
||||
if isinstance(response, PreReviewResult):
|
||||
return response.improved_output
|
||||
return PreReviewResult.model_validate(response).improved_output
|
||||
reviewed = llm_inst.call(messages)
|
||||
return reviewed if isinstance(reviewed, str) else str(reviewed)
|
||||
except Exception:
|
||||
if learn_strict:
|
||||
logger.warning(
|
||||
"HITL pre-review failed for %s; re-raising (learn_strict=True)",
|
||||
func.__name__,
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
logger.warning(
|
||||
"HITL pre-review failed for %s; falling back to raw output",
|
||||
func.__name__,
|
||||
exc_info=True,
|
||||
)
|
||||
return method_output
|
||||
|
||||
def _distill_and_store_lessons(
|
||||
flow_instance: Flow[Any], method_output: Any, raw_feedback: str
|
||||
) -> None:
|
||||
try:
|
||||
mem = flow_instance.memory
|
||||
if mem is None:
|
||||
return
|
||||
llm_inst = _resolve_llm_instance()
|
||||
prompt = _get_hitl_prompt("hitl_distill_user").format(
|
||||
method_name=func.__name__,
|
||||
output=str(method_output),
|
||||
feedback=raw_feedback,
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": _get_hitl_prompt("hitl_distill_system"),
|
||||
},
|
||||
{"role": "user", "content": prompt},
|
||||
]
|
||||
def _pre_review_with_lessons(
|
||||
flow_instance: Flow[Any],
|
||||
method_name: str,
|
||||
method_output: Any,
|
||||
*,
|
||||
llm: Any,
|
||||
learn_source: str,
|
||||
learn_strict: bool,
|
||||
) -> Any:
|
||||
try:
|
||||
mem = flow_instance.memory
|
||||
if mem is None:
|
||||
return method_output
|
||||
query = f"human feedback lessons for {method_name}: {method_output!s}"
|
||||
matches = mem.recall(query, source=learn_source)
|
||||
if not matches:
|
||||
return method_output
|
||||
|
||||
lessons: list[str] = []
|
||||
if getattr(llm_inst, "supports_function_calling", lambda: False)():
|
||||
response = llm_inst.call(messages, response_model=DistilledLessons)
|
||||
if isinstance(response, DistilledLessons):
|
||||
lessons = response.lessons
|
||||
else:
|
||||
lessons = DistilledLessons.model_validate(response).lessons
|
||||
else:
|
||||
response = llm_inst.call(messages)
|
||||
if isinstance(response, str):
|
||||
lessons = [
|
||||
line.strip("- ").strip()
|
||||
for line in response.strip().split("\n")
|
||||
if line.strip() and line.strip() != "NONE"
|
||||
]
|
||||
|
||||
if lessons:
|
||||
mem.remember_many(lessons, source=learn_source) # type: ignore[union-attr]
|
||||
except Exception:
|
||||
if learn_strict:
|
||||
logger.warning(
|
||||
"HITL lesson distillation failed for %s; re-raising (learn_strict=True)",
|
||||
func.__name__,
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
logger.warning(
|
||||
"HITL lesson distillation failed for %s; no lessons stored",
|
||||
func.__name__,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
def _build_feedback_context(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> tuple[Any, Any]:
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
|
||||
context = PendingFeedbackContext(
|
||||
flow_id=flow_instance.flow_id or "unknown",
|
||||
flow_class=f"{flow_instance.__class__.__module__}.{flow_instance.__class__.__name__}",
|
||||
method_name=func.__name__,
|
||||
method_output=method_output,
|
||||
message=message,
|
||||
emit=list(emit) if emit else None,
|
||||
default_outcome=default_outcome,
|
||||
metadata=metadata or {},
|
||||
llm=llm if isinstance(llm, str) else _serialize_llm_for_context(llm),
|
||||
lessons = "\n".join(f"- {m.record.content}" for m in matches)
|
||||
llm_inst = _resolve_llm_instance(llm)
|
||||
prompt = _get_hitl_prompt("hitl_pre_review_user").format(
|
||||
output=str(method_output),
|
||||
lessons=lessons,
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": _get_hitl_prompt("hitl_pre_review_system"),
|
||||
},
|
||||
{"role": "user", "content": prompt},
|
||||
]
|
||||
if getattr(llm_inst, "supports_function_calling", lambda: False)():
|
||||
response = llm_inst.call(messages, response_model=PreReviewResult)
|
||||
if isinstance(response, PreReviewResult):
|
||||
return response.improved_output
|
||||
return PreReviewResult.model_validate(response).improved_output
|
||||
reviewed = llm_inst.call(messages)
|
||||
return reviewed if isinstance(reviewed, str) else str(reviewed)
|
||||
except Exception:
|
||||
if learn_strict:
|
||||
logger.warning(
|
||||
"HITL pre-review failed for %s; re-raising (learn_strict=True)",
|
||||
method_name,
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
logger.warning(
|
||||
"HITL pre-review failed for %s; falling back to raw output",
|
||||
method_name,
|
||||
exc_info=True,
|
||||
)
|
||||
return method_output
|
||||
|
||||
effective_provider = provider
|
||||
if effective_provider is None:
|
||||
from crewai.flow.flow_config import flow_config
|
||||
|
||||
effective_provider = flow_config.hitl_provider
|
||||
def _distill_and_store_lessons(
|
||||
flow_instance: Flow[Any],
|
||||
method_name: str,
|
||||
method_output: Any,
|
||||
raw_feedback: str,
|
||||
*,
|
||||
llm: Any,
|
||||
learn_source: str,
|
||||
learn_strict: bool,
|
||||
) -> None:
|
||||
try:
|
||||
mem = flow_instance.memory
|
||||
if mem is None:
|
||||
return
|
||||
llm_inst = _resolve_llm_instance(llm)
|
||||
prompt = _get_hitl_prompt("hitl_distill_user").format(
|
||||
method_name=method_name,
|
||||
output=str(method_output),
|
||||
feedback=raw_feedback,
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": _get_hitl_prompt("hitl_distill_system"),
|
||||
},
|
||||
{"role": "user", "content": prompt},
|
||||
]
|
||||
|
||||
return context, effective_provider
|
||||
|
||||
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
|
||||
context, effective_provider = _build_feedback_context(
|
||||
flow_instance, method_output
|
||||
)
|
||||
|
||||
if effective_provider is not None:
|
||||
feedback_result = effective_provider.request_feedback(
|
||||
context, flow_instance
|
||||
)
|
||||
if asyncio.iscoroutine(feedback_result):
|
||||
raise TypeError(
|
||||
f"Provider {type(effective_provider).__name__}.request_feedback() "
|
||||
"returned a coroutine in a sync flow method. Use an async flow "
|
||||
"method or a synchronous provider."
|
||||
)
|
||||
return str(feedback_result)
|
||||
return flow_instance._request_human_feedback(
|
||||
message=message,
|
||||
output=method_output,
|
||||
metadata=metadata,
|
||||
emit=emit,
|
||||
)
|
||||
|
||||
async def _request_feedback_async(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> str:
|
||||
context, effective_provider = _build_feedback_context(
|
||||
flow_instance, method_output
|
||||
)
|
||||
|
||||
if effective_provider is not None:
|
||||
feedback_result = effective_provider.request_feedback(
|
||||
context, flow_instance
|
||||
)
|
||||
if asyncio.iscoroutine(feedback_result):
|
||||
return str(await feedback_result)
|
||||
return str(feedback_result)
|
||||
return flow_instance._request_human_feedback(
|
||||
message=message,
|
||||
output=method_output,
|
||||
metadata=metadata,
|
||||
emit=emit,
|
||||
)
|
||||
|
||||
def _process_feedback(
|
||||
flow_instance: Flow[Any],
|
||||
method_output: Any,
|
||||
raw_feedback: str,
|
||||
) -> HumanFeedbackResult | str:
|
||||
collapsed_outcome: str | None = None
|
||||
|
||||
if not raw_feedback.strip():
|
||||
if default_outcome:
|
||||
collapsed_outcome = default_outcome
|
||||
elif emit:
|
||||
collapsed_outcome = emit[0]
|
||||
elif emit:
|
||||
if llm is not None:
|
||||
collapsed_outcome = flow_instance._collapse_to_outcome(
|
||||
feedback=raw_feedback,
|
||||
outcomes=emit,
|
||||
llm=llm,
|
||||
)
|
||||
else:
|
||||
collapsed_outcome = emit[0]
|
||||
|
||||
result = HumanFeedbackResult(
|
||||
output=method_output,
|
||||
feedback=raw_feedback,
|
||||
outcome=collapsed_outcome,
|
||||
timestamp=datetime.now(),
|
||||
method_name=func.__name__,
|
||||
metadata=metadata or {},
|
||||
)
|
||||
|
||||
flow_instance.human_feedback_history.append(result)
|
||||
flow_instance.last_human_feedback = result
|
||||
|
||||
if emit:
|
||||
if collapsed_outcome is None:
|
||||
collapsed_outcome = default_outcome or emit[0]
|
||||
result.outcome = collapsed_outcome
|
||||
return collapsed_outcome
|
||||
return result
|
||||
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
|
||||
@wraps(func)
|
||||
async def async_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
|
||||
method_output = await func(self, *args, **kwargs)
|
||||
|
||||
if learn and getattr(self, "memory", None) is not None:
|
||||
method_output = _pre_review_with_lessons(self, method_output)
|
||||
|
||||
raw_feedback = await _request_feedback_async(self, method_output)
|
||||
result = _process_feedback(self, method_output, raw_feedback)
|
||||
|
||||
if (
|
||||
learn
|
||||
and getattr(self, "memory", None) is not None
|
||||
and raw_feedback.strip()
|
||||
):
|
||||
_distill_and_store_lessons(self, method_output, raw_feedback)
|
||||
|
||||
# Stash the real method output for final flow result when emit is set:
|
||||
# result is the collapsed outcome string for routing, but we preserve the
|
||||
# actual method output as the flow's final result. Uses per-method dict for
|
||||
# concurrency safety and to handle None returns.
|
||||
if emit:
|
||||
self._human_feedback_method_outputs[func.__name__] = method_output
|
||||
|
||||
return result
|
||||
|
||||
wrapper: Any = async_wrapper
|
||||
lessons: list[str] = []
|
||||
if getattr(llm_inst, "supports_function_calling", lambda: False)():
|
||||
response = llm_inst.call(messages, response_model=DistilledLessons)
|
||||
if isinstance(response, DistilledLessons):
|
||||
lessons = response.lessons
|
||||
else:
|
||||
lessons = DistilledLessons.model_validate(response).lessons
|
||||
else:
|
||||
response = llm_inst.call(messages)
|
||||
if isinstance(response, str):
|
||||
lessons = [
|
||||
line.strip("- ").strip()
|
||||
for line in response.strip().split("\n")
|
||||
if line.strip() and line.strip() != "NONE"
|
||||
]
|
||||
|
||||
@wraps(func)
|
||||
def sync_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
|
||||
method_output = func(self, *args, **kwargs)
|
||||
|
||||
if learn and getattr(self, "memory", None) is not None:
|
||||
method_output = _pre_review_with_lessons(self, method_output)
|
||||
|
||||
raw_feedback = _request_feedback(self, method_output)
|
||||
result = _process_feedback(self, method_output, raw_feedback)
|
||||
|
||||
if (
|
||||
learn
|
||||
and getattr(self, "memory", None) is not None
|
||||
and raw_feedback.strip()
|
||||
):
|
||||
_distill_and_store_lessons(self, method_output, raw_feedback)
|
||||
|
||||
# Stash the real method output for final flow result when emit is set:
|
||||
# result is the collapsed outcome string for routing, but we preserve the
|
||||
# actual method output as the flow's final result. Uses per-method dict for
|
||||
# concurrency safety and to handle None returns.
|
||||
if emit:
|
||||
self._human_feedback_method_outputs[func.__name__] = method_output
|
||||
|
||||
return result
|
||||
|
||||
wrapper = sync_wrapper
|
||||
|
||||
return wrapper # type: ignore[no-any-return]
|
||||
|
||||
return decorator
|
||||
if lessons:
|
||||
mem.remember_many(lessons, source=learn_source) # type: ignore[union-attr]
|
||||
except Exception:
|
||||
if learn_strict:
|
||||
logger.warning(
|
||||
"HITL lesson distillation failed for %s; re-raising (learn_strict=True)",
|
||||
method_name,
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
logger.warning(
|
||||
"HITL lesson distillation failed for %s; no lessons stored",
|
||||
method_name,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
|
||||
def human_feedback(
|
||||
|
||||
@@ -24,12 +24,10 @@ Example:
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
import functools
|
||||
import logging
|
||||
from types import SimpleNamespace
|
||||
from typing import TYPE_CHECKING, Any, Final, TypeVar, cast
|
||||
from typing import TYPE_CHECKING, Any, Final, TypeVar
|
||||
|
||||
from crewai_core.printer import PRINTER
|
||||
from pydantic import BaseModel
|
||||
@@ -39,7 +37,7 @@ from crewai.flow.persistence.factory import default_flow_persistence
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.flow import Flow
|
||||
from crewai.flow.runtime import Flow
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -66,14 +64,6 @@ def _stamp_persistence_metadata(
|
||||
)
|
||||
|
||||
|
||||
_PRESERVED_FLOW_ATTRS: Final[tuple[str, ...]] = (
|
||||
"__human_feedback_config__",
|
||||
"__flow_persistence_config__",
|
||||
"__flow_method_definition__",
|
||||
"_human_feedback_llm",
|
||||
)
|
||||
|
||||
|
||||
class PersistenceDecorator:
|
||||
"""Class to handle flow state persistence with consistent logging."""
|
||||
|
||||
@@ -164,6 +154,10 @@ def persist(
|
||||
states. When applied at the method level, it persists only that method's
|
||||
state.
|
||||
|
||||
The decorator is a pure metadata stamper: it records the persistence
|
||||
configuration on the class or method, and the Flow engine saves state
|
||||
after each persisted method completes, driven by the flow's definition.
|
||||
|
||||
Args:
|
||||
persistence: Optional FlowPersistence implementation to use.
|
||||
If not provided, uses ``default_flow_persistence()`` (the
|
||||
@@ -191,122 +185,7 @@ def persist(
|
||||
persistence if persistence is not None else default_flow_persistence()
|
||||
)
|
||||
|
||||
if isinstance(target, type):
|
||||
_stamp_persistence_metadata(target, actual_persistence, verbose)
|
||||
original_init = target.__init__ # type: ignore[misc]
|
||||
|
||||
@functools.wraps(original_init)
|
||||
def new_init(self: Any, *args: Any, **kwargs: Any) -> None:
|
||||
if "persistence" not in kwargs:
|
||||
kwargs["persistence"] = actual_persistence
|
||||
original_init(self, *args, **kwargs)
|
||||
|
||||
target.__init__ = new_init # type: ignore[misc]
|
||||
|
||||
# Preserve original methods' decorators
|
||||
original_methods = {
|
||||
name: method
|
||||
for name, method in target.__dict__.items()
|
||||
if callable(method)
|
||||
and (
|
||||
hasattr(method, "__is_flow_method__")
|
||||
or hasattr(method, "__flow_method_definition__")
|
||||
)
|
||||
}
|
||||
|
||||
for name, method in original_methods.items():
|
||||
if asyncio.iscoroutinefunction(method):
|
||||
# Closure captures the current name and method
|
||||
def create_async_wrapper(
|
||||
method_name: str, original_method: Callable[..., Any]
|
||||
) -> Callable[..., Any]:
|
||||
@functools.wraps(original_method)
|
||||
async def method_wrapper(
|
||||
self: Any, *args: Any, **kwargs: Any
|
||||
) -> Any:
|
||||
result = await original_method(self, *args, **kwargs)
|
||||
PersistenceDecorator.persist_state(
|
||||
self, method_name, actual_persistence, verbose
|
||||
)
|
||||
return result
|
||||
|
||||
return method_wrapper
|
||||
|
||||
wrapped = create_async_wrapper(name, method)
|
||||
|
||||
for attr in _PRESERVED_FLOW_ATTRS:
|
||||
if hasattr(method, attr):
|
||||
setattr(wrapped, attr, getattr(method, attr))
|
||||
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
|
||||
setattr(target, name, wrapped)
|
||||
else:
|
||||
|
||||
def create_sync_wrapper(
|
||||
method_name: str, original_method: Callable[..., Any]
|
||||
) -> Callable[..., Any]:
|
||||
@functools.wraps(original_method)
|
||||
def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
|
||||
result = original_method(self, *args, **kwargs)
|
||||
PersistenceDecorator.persist_state(
|
||||
self, method_name, actual_persistence, verbose
|
||||
)
|
||||
return result
|
||||
|
||||
return method_wrapper
|
||||
|
||||
wrapped = create_sync_wrapper(name, method)
|
||||
|
||||
for attr in _PRESERVED_FLOW_ATTRS:
|
||||
if hasattr(method, attr):
|
||||
setattr(wrapped, attr, getattr(method, attr))
|
||||
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
|
||||
setattr(target, name, wrapped)
|
||||
|
||||
return target
|
||||
method = target
|
||||
method.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
_stamp_persistence_metadata(method, actual_persistence, verbose)
|
||||
|
||||
if asyncio.iscoroutinefunction(method):
|
||||
|
||||
@functools.wraps(method)
|
||||
async def method_async_wrapper(
|
||||
flow_instance: Any, *args: Any, **kwargs: Any
|
||||
) -> T:
|
||||
method_coro = method(flow_instance, *args, **kwargs)
|
||||
if asyncio.iscoroutine(method_coro):
|
||||
result = await method_coro
|
||||
else:
|
||||
result = method_coro
|
||||
PersistenceDecorator.persist_state(
|
||||
flow_instance, method.__name__, actual_persistence, verbose
|
||||
)
|
||||
return cast(T, result)
|
||||
|
||||
for attr in _PRESERVED_FLOW_ATTRS:
|
||||
if hasattr(method, attr):
|
||||
setattr(method_async_wrapper, attr, getattr(method, attr))
|
||||
method_async_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
_stamp_persistence_metadata(
|
||||
method_async_wrapper, actual_persistence, verbose
|
||||
)
|
||||
return cast(Callable[..., T], method_async_wrapper)
|
||||
|
||||
@functools.wraps(method)
|
||||
def method_sync_wrapper(flow_instance: Any, *args: Any, **kwargs: Any) -> T:
|
||||
result = method(flow_instance, *args, **kwargs)
|
||||
PersistenceDecorator.persist_state(
|
||||
flow_instance, method.__name__, actual_persistence, verbose
|
||||
)
|
||||
return result
|
||||
|
||||
for attr in _PRESERVED_FLOW_ATTRS:
|
||||
if hasattr(method, attr):
|
||||
setattr(method_sync_wrapper, attr, getattr(method, attr))
|
||||
method_sync_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
_stamp_persistence_metadata(method_sync_wrapper, actual_persistence, verbose)
|
||||
return cast(Callable[..., T], method_sync_wrapper)
|
||||
_stamp_persistence_metadata(target, actual_persistence, verbose)
|
||||
return target
|
||||
|
||||
return decorator
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
70
lib/crewai/src/crewai/flow/runtime/_resolvers.py
Normal file
70
lib/crewai/src/crewai/flow/runtime/_resolvers.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""Resolution of FlowDefinition refs (``module:qualname``) into live objects.
|
||||
|
||||
Every ref-shaped value in a definition — ``do`` actions, ``state.ref``,
|
||||
``config.input_provider``, ``human_feedback.provider`` — resolves through
|
||||
:func:`resolve_ref`. Failures are loud and name the field and the ref.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
import importlib
|
||||
import inspect
|
||||
from operator import attrgetter
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from crewai.flow.flow_definition import FlowActionDefinition
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.runtime import Flow
|
||||
|
||||
|
||||
class InvalidRefError(ValueError):
|
||||
"""A definition ref that cannot be resolved to a live object."""
|
||||
|
||||
|
||||
def resolve_ref(ref: str, *, field: str) -> Any:
|
||||
"""Import the object a definition's `module:qualname` ref points to."""
|
||||
module_name, _, qualname = ref.partition(":")
|
||||
if "<" in ref or not module_name or not qualname:
|
||||
raise InvalidRefError(
|
||||
f"invalid {field} ref {ref!r}; expected 'module:qualname'"
|
||||
)
|
||||
try:
|
||||
return attrgetter(qualname)(importlib.import_module(module_name))
|
||||
except (ImportError, AttributeError) as e:
|
||||
raise InvalidRefError(f"unresolvable {field} ref {ref!r}") from e
|
||||
|
||||
|
||||
def resolve_instance_ref(ref: str, *, field: str) -> Any:
|
||||
"""Resolve a ref, auto-instantiating a no-arg class into an instance."""
|
||||
target = resolve_ref(ref, field=field)
|
||||
if not inspect.isclass(target):
|
||||
return target
|
||||
try:
|
||||
return target()
|
||||
except Exception as e:
|
||||
raise InvalidRefError(
|
||||
f"cannot instantiate {field} ref {ref!r} without arguments: {e}"
|
||||
) from e
|
||||
|
||||
|
||||
def _resolve_code_action(
|
||||
flow: Flow[Any], action: FlowActionDefinition
|
||||
) -> Callable[..., Any]:
|
||||
ref = action.ref
|
||||
target = resolve_ref(ref, field="do")
|
||||
if not callable(target):
|
||||
raise InvalidRefError(f"invalid do ref {ref!r}; object is not callable")
|
||||
handler = cast(Callable[..., Any], target)
|
||||
if getattr(handler, "__self__", None) is None:
|
||||
handler = handler.__get__(flow, type(flow))
|
||||
return handler
|
||||
|
||||
|
||||
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
|
||||
"""Turn one `do:` action into the callable the flow runs for that node."""
|
||||
if action.call == "code":
|
||||
return _resolve_code_action(flow, action)
|
||||
raise ValueError(f"unknown call type {action.call!r}")
|
||||
@@ -16,7 +16,7 @@ R = TypeVar("R", covariant=True)
|
||||
FlowMethodName = NewType("FlowMethodName", str)
|
||||
PendingListenerKey = NewType(
|
||||
"PendingListenerKey",
|
||||
Annotated[str, "nested flow conditions use 'listener_name:object_id'"],
|
||||
Annotated[str, "listener method name, or 'start:<method>' for conditional starts"],
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -890,41 +890,17 @@ class BaseLLM(BaseModel, ABC):
|
||||
Args:
|
||||
usage_data: Token usage data from the API response
|
||||
"""
|
||||
prompt_tokens = (
|
||||
usage_data.get("prompt_tokens")
|
||||
or usage_data.get("prompt_token_count")
|
||||
or usage_data.get("input_tokens")
|
||||
or 0
|
||||
)
|
||||
metrics = UsageMetrics.from_provider_dict(usage_data)
|
||||
if metrics is None:
|
||||
return
|
||||
|
||||
completion_tokens = (
|
||||
usage_data.get("completion_tokens")
|
||||
or usage_data.get("candidates_token_count")
|
||||
or usage_data.get("output_tokens")
|
||||
or 0
|
||||
)
|
||||
|
||||
cached_tokens = (
|
||||
usage_data.get("cached_tokens")
|
||||
or usage_data.get("cached_prompt_tokens")
|
||||
or usage_data.get("cache_read_input_tokens")
|
||||
or 0
|
||||
)
|
||||
if not cached_tokens:
|
||||
prompt_details = usage_data.get("prompt_tokens_details")
|
||||
if isinstance(prompt_details, dict):
|
||||
cached_tokens = prompt_details.get("cached_tokens", 0) or 0
|
||||
|
||||
reasoning_tokens = usage_data.get("reasoning_tokens", 0) or 0
|
||||
cache_creation_tokens = usage_data.get("cache_creation_tokens", 0) or 0
|
||||
|
||||
self._token_usage["prompt_tokens"] += prompt_tokens
|
||||
self._token_usage["completion_tokens"] += completion_tokens
|
||||
self._token_usage["total_tokens"] += prompt_tokens + completion_tokens
|
||||
self._token_usage["successful_requests"] += 1
|
||||
self._token_usage["cached_prompt_tokens"] += cached_tokens
|
||||
self._token_usage["reasoning_tokens"] += reasoning_tokens
|
||||
self._token_usage["cache_creation_tokens"] += cache_creation_tokens
|
||||
self._token_usage["prompt_tokens"] += metrics.prompt_tokens
|
||||
self._token_usage["completion_tokens"] += metrics.completion_tokens
|
||||
self._token_usage["total_tokens"] += metrics.total_tokens
|
||||
self._token_usage["successful_requests"] += metrics.successful_requests
|
||||
self._token_usage["cached_prompt_tokens"] += metrics.cached_prompt_tokens
|
||||
self._token_usage["reasoning_tokens"] += metrics.reasoning_tokens
|
||||
self._token_usage["cache_creation_tokens"] += metrics.cache_creation_tokens
|
||||
|
||||
def get_token_usage_summary(self) -> UsageMetrics:
|
||||
"""Get summary of token usage for this LLM instance.
|
||||
|
||||
@@ -259,8 +259,9 @@ class RecallFlow(Flow[RecallState]):
|
||||
candidates = []
|
||||
if not candidates:
|
||||
candidates = [scope_prefix]
|
||||
self.state.candidate_scopes = candidates[:20]
|
||||
return self.state.candidate_scopes
|
||||
selected_scopes = candidates[:20]
|
||||
self.state.candidate_scopes = selected_scopes
|
||||
return selected_scopes
|
||||
|
||||
@listen(filter_and_chunk)
|
||||
def search_chunks(self) -> list[Any]:
|
||||
@@ -368,9 +369,10 @@ class RecallFlow(Flow[RecallState]):
|
||||
)
|
||||
)
|
||||
matches.sort(key=lambda m: m.score, reverse=True)
|
||||
self.state.final_results = matches[: self.state.limit]
|
||||
final_results = matches[: self.state.limit]
|
||||
self.state.final_results = final_results
|
||||
|
||||
if self.state.evidence_gaps and self.state.final_results:
|
||||
self.state.final_results[0].evidence_gaps = list(self.state.evidence_gaps)
|
||||
|
||||
return self.state.final_results
|
||||
return final_results
|
||||
|
||||
@@ -30,7 +30,7 @@ from opentelemetry.sdk.trace.export import (
|
||||
BatchSpanProcessor,
|
||||
SpanExportResult,
|
||||
)
|
||||
from opentelemetry.trace import Span
|
||||
from opentelemetry.trace import ProxyTracerProvider, Span
|
||||
from typing_extensions import Self
|
||||
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
@@ -162,6 +162,10 @@ class Telemetry:
|
||||
if self.ready and not self.trace_set:
|
||||
try:
|
||||
with suppress_warnings():
|
||||
existing_provider = trace.get_tracer_provider()
|
||||
if not isinstance(existing_provider, ProxyTracerProvider):
|
||||
self.trace_set = True
|
||||
return
|
||||
trace.set_tracer_provider(self.provider)
|
||||
self.trace_set = True
|
||||
except Exception as e:
|
||||
|
||||
@@ -4,10 +4,31 @@ This module provides models for tracking token usage and request metrics
|
||||
during crew and agent execution.
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from typing_extensions import Self
|
||||
|
||||
|
||||
def _coerce_int(value: Any) -> int:
|
||||
if value is None:
|
||||
return 0
|
||||
try:
|
||||
return int(value)
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
|
||||
|
||||
def _first_int(usage_data: dict[str, Any], *keys: str) -> int:
|
||||
"""Return the first integer-coercible value from ``usage_data`` under any
|
||||
of ``keys``. Falls back to ``0`` when nothing matches."""
|
||||
for key in keys:
|
||||
coerced = _coerce_int(usage_data.get(key))
|
||||
if coerced:
|
||||
return coerced
|
||||
return 0
|
||||
|
||||
|
||||
class UsageMetrics(BaseModel):
|
||||
"""Track usage metrics for crew execution.
|
||||
|
||||
@@ -54,3 +75,50 @@ class UsageMetrics(BaseModel):
|
||||
self.reasoning_tokens += usage_metrics.reasoning_tokens
|
||||
self.cache_creation_tokens += usage_metrics.cache_creation_tokens
|
||||
self.successful_requests += usage_metrics.successful_requests
|
||||
|
||||
@classmethod
|
||||
def from_provider_dict(cls, usage_data: dict[str, Any] | None) -> Self | None:
|
||||
"""Normalize a provider's raw usage dict into a ``UsageMetrics``.
|
||||
|
||||
Accepts the full set of key aliases CrewAI providers emit:
|
||||
``prompt_tokens`` / ``prompt_token_count`` (Gemini) / ``input_tokens``
|
||||
(Anthropic), and the equivalent completion / cached-prompt aliases.
|
||||
Mirrors ``BaseLLM._track_token_usage_internal`` so per-LLM totals,
|
||||
flow-level aggregation, and OTel spans agree on every provider.
|
||||
|
||||
Returns ``None`` for missing/empty input so callers can decide
|
||||
whether to skip the event entirely or treat it as a zero-token
|
||||
successful request.
|
||||
"""
|
||||
if not usage_data:
|
||||
return None
|
||||
|
||||
prompt_tokens = _first_int(
|
||||
usage_data, "prompt_tokens", "prompt_token_count", "input_tokens"
|
||||
)
|
||||
completion_tokens = _first_int(
|
||||
usage_data,
|
||||
"completion_tokens",
|
||||
"candidates_token_count",
|
||||
"output_tokens",
|
||||
)
|
||||
cached_prompt_tokens = _first_int(
|
||||
usage_data,
|
||||
"cached_tokens",
|
||||
"cached_prompt_tokens",
|
||||
"cache_read_input_tokens",
|
||||
)
|
||||
if not cached_prompt_tokens:
|
||||
details = usage_data.get("prompt_tokens_details")
|
||||
if isinstance(details, dict):
|
||||
cached_prompt_tokens = _coerce_int(details.get("cached_tokens"))
|
||||
|
||||
return cls(
|
||||
total_tokens=prompt_tokens + completion_tokens,
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=completion_tokens,
|
||||
cached_prompt_tokens=cached_prompt_tokens,
|
||||
reasoning_tokens=_coerce_int(usage_data.get("reasoning_tokens")),
|
||||
cache_creation_tokens=_coerce_int(usage_data.get("cache_creation_tokens")),
|
||||
successful_requests=1,
|
||||
)
|
||||
|
||||
@@ -999,7 +999,11 @@ def _json_schema_to_pydantic_field(
|
||||
if examples:
|
||||
schema_extra["examples"] = examples
|
||||
|
||||
default = ... if is_required else None
|
||||
default = (
|
||||
json_schema["default"]
|
||||
if "default" in json_schema
|
||||
else (... if is_required else None)
|
||||
)
|
||||
|
||||
if isinstance(type_, type) and issubclass(type_, (int, float)):
|
||||
if "minimum" in json_schema:
|
||||
|
||||
@@ -4,6 +4,7 @@ import os
|
||||
import threading
|
||||
from unittest import mock
|
||||
from unittest.mock import MagicMock, patch
|
||||
import warnings
|
||||
|
||||
from crewai.agents.crew_agent_executor import AgentFinish, CrewAgentExecutor
|
||||
from crewai.constants import DEFAULT_LLM_MODEL
|
||||
@@ -77,6 +78,51 @@ def test_agent_creation():
|
||||
assert agent.backstory == "test backstory"
|
||||
|
||||
|
||||
def test_agent_exposes_i18n_for_backward_compatibility():
|
||||
from crewai.utilities.i18n import I18N_DEFAULT
|
||||
|
||||
agent = Agent(role="test role", goal="test goal", backstory="test backstory")
|
||||
|
||||
with pytest.warns(DeprecationWarning, match="Agent.i18n is deprecated"):
|
||||
i18n = agent.i18n
|
||||
|
||||
assert i18n is I18N_DEFAULT
|
||||
assert isinstance(i18n.slice("role_playing"), str)
|
||||
|
||||
|
||||
def test_agent_accepts_custom_i18n():
|
||||
from crewai.utilities.i18n import I18N
|
||||
|
||||
prompt_file = os.path.join(
|
||||
os.path.dirname(__file__), "..", "utilities", "prompts.json"
|
||||
)
|
||||
i18n = I18N(prompt_file=prompt_file)
|
||||
agent = Agent(
|
||||
role="test role",
|
||||
goal="test goal",
|
||||
backstory="test backstory",
|
||||
i18n=i18n,
|
||||
)
|
||||
|
||||
with pytest.warns(DeprecationWarning, match="Agent.i18n is deprecated"):
|
||||
agent_i18n = agent.i18n
|
||||
|
||||
assert agent_i18n is i18n
|
||||
assert agent_i18n.slice("role_playing") == "Lorem ipsum dolor sit amet"
|
||||
|
||||
|
||||
def test_agent_copy_does_not_emit_i18n_deprecation_warning():
|
||||
agent = Agent(role="test role", goal="test goal", backstory="test backstory")
|
||||
|
||||
with warnings.catch_warnings(record=True) as caught_warnings:
|
||||
warnings.simplefilter("always", DeprecationWarning)
|
||||
agent.copy()
|
||||
|
||||
assert not any(
|
||||
"Agent.i18n is deprecated" in str(w.message) for w in caught_warnings
|
||||
)
|
||||
|
||||
|
||||
def test_agent_with_only_system_template():
|
||||
"""Test that an agent with only system_template works without errors."""
|
||||
agent = Agent(
|
||||
|
||||
@@ -32,7 +32,7 @@ def _build_executor(**kwargs: Any) -> AgentExecutor:
|
||||
executor._method_outputs = []
|
||||
executor._completed_methods = set()
|
||||
executor._fired_or_listeners = set()
|
||||
executor._pending_and_listeners = {}
|
||||
executor._pending_events = {}
|
||||
executor._method_execution_counts = {}
|
||||
executor._method_call_counts = {}
|
||||
executor._event_futures = []
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
@@ -109,10 +110,79 @@ class TestCheckpointListenerOptsOut:
|
||||
assert do_cp.call_count == 0
|
||||
|
||||
|
||||
class TestFlowResumeReplaysEvents:
|
||||
"""End-to-end: a resumed flow emits MethodExecution* events for completed methods."""
|
||||
class TestCheckpointResumeReplaysEvents:
|
||||
"""A flow resumed from a checkpoint replays MethodExecution* events for
|
||||
completed methods and executes the pending ones. The checkpoint persists
|
||||
the event record, which is reloaded into the per-run runtime state.
|
||||
|
||||
def test_resume_dispatches_completed_method_events(self, tmp_path) -> None:
|
||||
``step_c`` is gated on a threading.Event so the flow is frozen with exactly
|
||||
``step_a`` and ``step_b`` completed when the checkpoint is written — the
|
||||
mid-run snapshot is deterministic rather than dependent on write timing.
|
||||
"""
|
||||
|
||||
def test_resume_replays_completed_and_executes_pending(self, tmp_path) -> None:
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from crewai.state.checkpoint_config import CheckpointConfig
|
||||
|
||||
at_step_c = threading.Event()
|
||||
release = threading.Event()
|
||||
captured: list[Any] = []
|
||||
|
||||
class ThreeStepFlow(Flow[dict]):
|
||||
@start()
|
||||
def step_a(self) -> str:
|
||||
return "a"
|
||||
|
||||
@listen(step_a)
|
||||
def step_b(self) -> str:
|
||||
return "b"
|
||||
|
||||
@listen(step_b)
|
||||
def step_c(self) -> str:
|
||||
captured.append(crewai_event_bus.runtime_state)
|
||||
at_step_c.set()
|
||||
release.wait(timeout=10)
|
||||
return "c"
|
||||
|
||||
runner = threading.Thread(target=ThreeStepFlow().kickoff)
|
||||
runner.start()
|
||||
try:
|
||||
assert at_step_c.wait(timeout=10)
|
||||
location = captured[0].checkpoint(str(tmp_path / "cp"))
|
||||
finally:
|
||||
release.set()
|
||||
runner.join(timeout=10)
|
||||
|
||||
captured_started: list[str] = []
|
||||
captured_finished: list[str] = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionStartedEvent)
|
||||
def _cs(_: Any, event: MethodExecutionStartedEvent) -> None:
|
||||
captured_started.append(event.method_name)
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionFinishedEvent)
|
||||
def _cf(_: Any, event: MethodExecutionFinishedEvent) -> None:
|
||||
captured_finished.append(event.method_name)
|
||||
|
||||
ThreeStepFlow().kickoff(
|
||||
from_checkpoint=CheckpointConfig(restore_from=location)
|
||||
)
|
||||
|
||||
assert captured_started == ["step_a", "step_b", "step_c"]
|
||||
assert captured_finished == ["step_a", "step_b", "step_c"]
|
||||
|
||||
|
||||
class TestPersistResumeDoesNotReplayCompletedEvents:
|
||||
"""A @persist resume continues from pending methods only.
|
||||
|
||||
@persist stores flow state, not the event record, so completed-method
|
||||
events have no persisted source to replay from. Runtime state is scoped
|
||||
per run, so flow1's events are not visible to flow2.
|
||||
"""
|
||||
|
||||
def test_persist_resume_executes_only_pending_methods(self, tmp_path) -> None:
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
|
||||
|
||||
@@ -132,9 +202,6 @@ class TestFlowResumeReplaysEvents:
|
||||
def step_c(self) -> str:
|
||||
return "c"
|
||||
|
||||
if crewai_event_bus.runtime_state is not None:
|
||||
crewai_event_bus.runtime_state.event_record.clear()
|
||||
|
||||
flow1 = ThreeStepFlow(persistence=persistence)
|
||||
flow1.kickoff()
|
||||
flow_id = flow1.state["id"]
|
||||
@@ -157,9 +224,5 @@ class TestFlowResumeReplaysEvents:
|
||||
|
||||
flow2.kickoff(inputs={"id": flow_id})
|
||||
|
||||
assert captured_started.count("step_a") == 1
|
||||
assert captured_started.count("step_b") == 1
|
||||
assert captured_started.count("step_c") == 1
|
||||
assert captured_finished.count("step_a") == 1
|
||||
assert captured_finished.count("step_b") == 1
|
||||
assert captured_finished.count("step_c") == 1
|
||||
assert captured_started == ["step_c"]
|
||||
assert captured_finished == ["step_c"]
|
||||
|
||||
@@ -6,6 +6,7 @@ import pytest
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.telemetry import Telemetry
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@@ -53,6 +54,23 @@ def test_telemetry_enabled_by_default():
|
||||
assert telemetry.ready is True
|
||||
|
||||
|
||||
def test_set_tracer_skips_when_provider_already_configured():
|
||||
"""A second telemetry instance must not re-install the global provider."""
|
||||
with (
|
||||
patch.dict(os.environ, {}, clear=True),
|
||||
patch(
|
||||
"crewai.telemetry.telemetry.trace.get_tracer_provider",
|
||||
return_value=TracerProvider(),
|
||||
),
|
||||
patch("crewai.telemetry.telemetry.trace.set_tracer_provider") as mock_set,
|
||||
):
|
||||
telemetry = Telemetry()
|
||||
telemetry.set_tracer()
|
||||
|
||||
mock_set.assert_not_called()
|
||||
assert telemetry.trace_set is True
|
||||
|
||||
|
||||
@patch("crewai.telemetry.telemetry.logger.error")
|
||||
@patch(
|
||||
"opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export",
|
||||
|
||||
@@ -1168,132 +1168,13 @@ class TestAsyncHumanFeedbackEdgeCases:
|
||||
|
||||
|
||||
|
||||
class TestLiveLLMPreservationOnResume:
|
||||
"""Tests for preserving the full LLM config across HITL resume."""
|
||||
|
||||
def test_human_feedback_llm_attribute_set_on_wrapper_with_basellm(self) -> None:
|
||||
"""Test that _human_feedback_llm is set on the wrapper when llm is a BaseLLM instance."""
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
mock_llm = MagicMock(spec=BaseLLM)
|
||||
mock_llm.model = "gemini/gemini-3-flash"
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=mock_llm,
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_human_feedback_llm")
|
||||
assert method._human_feedback_llm is mock_llm
|
||||
|
||||
def test_human_feedback_llm_attribute_set_on_wrapper_with_string(self) -> None:
|
||||
"""Test that _human_feedback_llm is set on the wrapper even when llm is a string."""
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_human_feedback_llm")
|
||||
assert method._human_feedback_llm == "gpt-4o-mini"
|
||||
class TestResumeLLMFromSerializedContext:
|
||||
"""Resume rebuilds the collapse LLM from the serialized context alone."""
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_resume_async_uses_live_basellm_over_serialized_string(
|
||||
def test_resume_builds_llm_from_serialized_context(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that resume_async uses the live BaseLLM from decorator instead of serialized string.
|
||||
|
||||
This is the main bug fix: when a flow resumes, it should use the fully-configured
|
||||
LLM from the re-imported decorator (with credentials, project, etc.) instead of
|
||||
creating a new LLM from just the model string.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
# Create a mock BaseLLM with full config (simulating Gemini with service account)
|
||||
live_llm = MagicMock(spec=BaseLLM)
|
||||
live_llm.model = "gemini/gemini-3-flash"
|
||||
|
||||
class TestFlow(Flow):
|
||||
result_path: str = ""
|
||||
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm=live_llm,
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
@listen("approved")
|
||||
def handle_approved(self):
|
||||
self.result_path = "approved"
|
||||
return "Approved!"
|
||||
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="live-llm-test",
|
||||
flow_class="TestFlow",
|
||||
method_name="review",
|
||||
method_output="content",
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gemini/gemini-3-flash", # Serialized string, NOT the live object
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="live-llm-test",
|
||||
context=context,
|
||||
state_data={"id": "live-llm-test"},
|
||||
)
|
||||
|
||||
flow = TestFlow.from_pending("live-llm-test", persistence)
|
||||
|
||||
captured_llm = []
|
||||
|
||||
def capture_llm(feedback, outcomes, llm):
|
||||
captured_llm.append(llm)
|
||||
return "approved"
|
||||
|
||||
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||
flow.resume("looks good!")
|
||||
|
||||
# NOT the serialized string. The live_llm was captured at class definition
|
||||
# time and stored on the method wrapper as _human_feedback_llm.
|
||||
assert len(captured_llm) == 1
|
||||
# (which is stored on the method's _human_feedback_llm attribute)
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert captured_llm[0] is method._human_feedback_llm
|
||||
# And verify it's a BaseLLM instance, not a string
|
||||
assert isinstance(captured_llm[0], BaseLLM)
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_resume_async_falls_back_to_serialized_string_when_no_human_feedback_llm(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that resume_async falls back to context.llm when _human_feedback_llm is not available.
|
||||
|
||||
This ensures backward compatibility with flows that were paused before this fix.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
@@ -1325,11 +1206,6 @@ class TestLiveLLMPreservationOnResume:
|
||||
|
||||
flow = TestFlow.from_pending("fallback-test", persistence)
|
||||
|
||||
# Remove _human_feedback_llm to simulate old decorator without this attribute
|
||||
method = flow._methods.get("review")
|
||||
if hasattr(method, "_human_feedback_llm"):
|
||||
delattr(method, "_human_feedback_llm")
|
||||
|
||||
captured_llm = []
|
||||
|
||||
def capture_llm(feedback, outcomes, llm):
|
||||
@@ -1343,85 +1219,3 @@ class TestLiveLLMPreservationOnResume:
|
||||
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
|
||||
assert isinstance(captured_llm[0], BaseLLMClass)
|
||||
assert captured_llm[0].model == "gpt-4o-mini"
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_resume_async_uses_string_from_context_when_human_feedback_llm_is_string(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that when _human_feedback_llm is a string (not BaseLLM), we still use context.llm.
|
||||
|
||||
String LLM values offer no benefit over the serialized context.llm,
|
||||
so we don't prefer them.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="string-llm-test",
|
||||
flow_class="TestFlow",
|
||||
method_name="review",
|
||||
method_output="content",
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="string-llm-test",
|
||||
context=context,
|
||||
state_data={"id": "string-llm-test"},
|
||||
)
|
||||
|
||||
flow = TestFlow.from_pending("string-llm-test", persistence)
|
||||
|
||||
method = flow._methods.get("review")
|
||||
assert method._human_feedback_llm == "gpt-4o-mini"
|
||||
|
||||
captured_llm = []
|
||||
|
||||
def capture_llm(feedback, outcomes, llm):
|
||||
captured_llm.append(llm)
|
||||
return "approved"
|
||||
|
||||
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||
flow.resume("looks good!")
|
||||
|
||||
# _human_feedback_llm is a string, so resume deserializes context.llm into an LLM instance
|
||||
assert len(captured_llm) == 1
|
||||
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
|
||||
assert isinstance(captured_llm[0], BaseLLMClass)
|
||||
assert captured_llm[0].model == "gpt-4o-mini"
|
||||
|
||||
def test_human_feedback_llm_set_for_async_wrapper(self) -> None:
|
||||
"""Test that _human_feedback_llm is set on async wrapper functions."""
|
||||
import asyncio
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
mock_llm = MagicMock(spec=BaseLLM)
|
||||
mock_llm.model = "gemini/gemini-3-flash"
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=mock_llm,
|
||||
)
|
||||
async def async_review(self):
|
||||
return "content"
|
||||
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("async_review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_human_feedback_llm")
|
||||
assert method._human_feedback_llm is mock_llm
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
@@ -16,6 +17,7 @@ from pydantic import BaseModel
|
||||
from crewai.agent.core import Agent
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.crew import Crew
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
from crewai.flow.flow import _INITIAL_STATE_CLASS_MARKER, Flow, start
|
||||
from crewai.state.checkpoint_config import CheckpointConfig
|
||||
from crewai.state.checkpoint_listener import (
|
||||
@@ -682,3 +684,85 @@ class TestAgentCheckpoint:
|
||||
cfg = CheckpointConfig(restore_from=loc)
|
||||
restored = Agent.from_checkpoint(cfg)
|
||||
assert restored._kickoff_event_id == "evt-456"
|
||||
|
||||
|
||||
class _FinalAnswerLLM(BaseLLM):
|
||||
"""Stub LLM that always returns a final answer without any API calls."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(model="stub")
|
||||
|
||||
def call(
|
||||
self,
|
||||
messages,
|
||||
tools=None,
|
||||
callbacks=None,
|
||||
available_functions=None,
|
||||
from_task=None,
|
||||
from_agent=None,
|
||||
response_model=None,
|
||||
):
|
||||
return "Final Answer: done."
|
||||
|
||||
def supports_function_calling(self) -> bool:
|
||||
return False
|
||||
|
||||
def supports_stop_words(self) -> bool:
|
||||
return False
|
||||
|
||||
def get_context_window_size(self) -> int:
|
||||
return 4096
|
||||
|
||||
async def acall(self, *args, **kwargs):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class TestCheckpointReusedExecutor:
|
||||
"""Checkpoint serialization stamps every live Flow's completed methods.
|
||||
|
||||
The agent executor is a Flow reused across a crew's tasks, so the stamp
|
||||
must not be read back as a restore signal on the next task — otherwise the
|
||||
second task replays as a resume and never reaches a final answer.
|
||||
"""
|
||||
|
||||
def test_second_task_runs_with_checkpointing_enabled(self) -> None:
|
||||
agent = Agent(role="r", goal="g", backstory="b", llm=_FinalAnswerLLM())
|
||||
task1 = Task(description="first", expected_output="x", agent=agent)
|
||||
task2 = Task(description="second", expected_output="y", agent=agent)
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task1, task2],
|
||||
verbose=False,
|
||||
checkpoint=CheckpointConfig(
|
||||
provider=JsonProvider(location=d),
|
||||
on_events=["task_started", "task_completed"],
|
||||
),
|
||||
)
|
||||
result = crew.kickoff()
|
||||
|
||||
assert len(result.tasks_output) == 2
|
||||
assert result.tasks_output[1].raw
|
||||
|
||||
|
||||
class TestCustomLLMCheckpointRestore:
|
||||
"""A custom BaseLLM subclass serializes with the inherited llm_type "base".
|
||||
|
||||
Restoring it must not try to instantiate the abstract BaseLLM; it is rebuilt
|
||||
as a concrete LLM from the saved config instead.
|
||||
"""
|
||||
|
||||
def test_restore_does_not_instantiate_abstract_base_llm(self) -> None:
|
||||
agent = Agent(role="r", goal="g", backstory="b", llm=_FinalAnswerLLM())
|
||||
task = Task(description="d", expected_output="e", agent=agent)
|
||||
crew = Crew(agents=[agent], tasks=[task], verbose=False)
|
||||
|
||||
raw = RuntimeState(root=[crew]).model_dump_json()
|
||||
restored = RuntimeState.model_validate_json(
|
||||
raw, context={"from_checkpoint": True}
|
||||
)
|
||||
|
||||
llm = restored.root[0].agents[0].llm
|
||||
assert isinstance(llm, BaseLLM)
|
||||
assert not inspect.isabstract(type(llm))
|
||||
assert llm.model == "stub"
|
||||
|
||||
@@ -409,4 +409,31 @@ class TestRuntimeStateIntegration:
|
||||
old_json, context={"from_checkpoint": True}
|
||||
)
|
||||
assert len(restored.root) == 1
|
||||
assert len(restored.event_record) == 0
|
||||
assert len(restored.event_record) == 0
|
||||
|
||||
def test_reset_runtime_state_clears_state_and_registry(self):
|
||||
from crewai import Agent, Crew, RuntimeState
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
|
||||
if RuntimeState is None:
|
||||
pytest.skip("RuntimeState unavailable (model_rebuild failed)")
|
||||
|
||||
agent = Agent(role="test", goal="test", backstory="test", llm="gpt-4o-mini")
|
||||
crew = Crew(agents=[agent], tasks=[], verbose=False)
|
||||
|
||||
previous_state = crewai_event_bus._runtime_state
|
||||
previous_ids = crewai_event_bus._registered_entity_ids
|
||||
crewai_event_bus._runtime_state = None
|
||||
crewai_event_bus._registered_entity_ids = set()
|
||||
try:
|
||||
crewai_event_bus.register_entity(crew)
|
||||
assert crewai_event_bus.runtime_state is not None
|
||||
assert crewai_event_bus._registered_entity_ids
|
||||
|
||||
crewai_event_bus.reset_runtime_state()
|
||||
|
||||
assert crewai_event_bus.runtime_state is None
|
||||
assert crewai_event_bus._registered_entity_ids == set()
|
||||
finally:
|
||||
crewai_event_bus._runtime_state = previous_state
|
||||
crewai_event_bus._registered_entity_ids = previous_ids
|
||||
@@ -1040,7 +1040,7 @@ def test_flow_plotting():
|
||||
received_events.append(event)
|
||||
event_received.set()
|
||||
|
||||
flow.plot("test_flow")
|
||||
flow.plot("test_flow", show=False)
|
||||
|
||||
assert event_received.wait(timeout=5), "Timeout waiting for plot event"
|
||||
assert len(received_events) == 1
|
||||
@@ -1157,6 +1157,26 @@ def test_flow_name():
|
||||
assert flow.name == "MyFlow"
|
||||
|
||||
|
||||
def test_flow_custom_name_overrides_class_name_in_events():
|
||||
class InternalFlowClass(Flow):
|
||||
name = "PublicName"
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
return "done"
|
||||
|
||||
received = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(FlowStartedEvent)
|
||||
def handle(source, event):
|
||||
received.append(event)
|
||||
|
||||
InternalFlowClass().kickoff()
|
||||
|
||||
assert received[0].flow_name == "PublicName"
|
||||
|
||||
|
||||
def test_nested_and_or_conditions():
|
||||
"""Test nested conditions like or_(and_(A, B), and_(C, D)).
|
||||
|
||||
@@ -1542,40 +1562,63 @@ def test_deeply_nested_conditions():
|
||||
|
||||
|
||||
def test_or_branch_does_not_leave_stale_and_state():
|
||||
"""or_() over nested and_() branches must not leave stale pending AND state.
|
||||
|
||||
Regression: evaluating an or_() condition stopped at the first branch that was
|
||||
satisfied, so a later and_() branch that the *same* trigger would have completed
|
||||
never cleared its pending state. On the next cycle that trigger alone then
|
||||
spuriously re-satisfied the whole condition. Both branches share the final
|
||||
event ``x`` here, so the shared trigger that completes branch ``(a AND x)`` also
|
||||
completes branch ``(c AND x)`` and both must be cleared together.
|
||||
"""
|
||||
fired = []
|
||||
|
||||
class StaleStateFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
pass
|
||||
|
||||
@listen(or_(and_("a", "x"), and_("c", "x")))
|
||||
def joined(self):
|
||||
@listen(begin)
|
||||
def a(self):
|
||||
pass
|
||||
|
||||
flow = StaleStateFlow()
|
||||
condition = type(flow)._listen_condition("joined")
|
||||
@listen(begin)
|
||||
def c(self):
|
||||
pass
|
||||
|
||||
def fires(trigger):
|
||||
return flow._evaluate_condition(condition, trigger, "joined")
|
||||
@listen(and_(a, c))
|
||||
def x(self):
|
||||
pass
|
||||
|
||||
# First cycle: "a" then "c" arrive, then the shared "x" completes (a AND x).
|
||||
assert fires("a") is False
|
||||
assert fires("c") is False
|
||||
assert fires("x") is True
|
||||
@listen(or_(and_("a", "x"), and_("c", "y")))
|
||||
def joined(self):
|
||||
fired.append("joined")
|
||||
|
||||
# Next cycle: "x" alone must NOT re-satisfy the condition. The "c" from the
|
||||
# previous cycle was consumed when "joined" fired, so neither branch is half
|
||||
# complete and "x" by itself is insufficient.
|
||||
assert fires("x") is False
|
||||
@router(joined)
|
||||
def emit_y(self):
|
||||
return "y"
|
||||
|
||||
StaleStateFlow().kickoff()
|
||||
|
||||
assert fired == ["joined"]
|
||||
|
||||
|
||||
def test_and_branch_inside_or_does_not_race():
|
||||
execution_order = []
|
||||
|
||||
class DiamondWithFallbackFlow(Flow):
|
||||
@start()
|
||||
def go(self):
|
||||
execution_order.append("go")
|
||||
|
||||
@listen(go)
|
||||
def a(self):
|
||||
execution_order.append("a")
|
||||
|
||||
@listen(go)
|
||||
def b(self):
|
||||
execution_order.append("b")
|
||||
|
||||
@listen(or_(and_(a, b), "fallback"))
|
||||
def done(self):
|
||||
execution_order.append("done")
|
||||
|
||||
DiamondWithFallbackFlow().kickoff()
|
||||
|
||||
assert "done" in execution_order
|
||||
assert execution_order.index("done") > execution_order.index("a")
|
||||
assert execution_order.index("done") > execution_order.index("b")
|
||||
|
||||
|
||||
def test_mixed_sync_async_execution_order():
|
||||
|
||||
@@ -26,7 +26,11 @@ from crewai.experimental import (
|
||||
RouterConfig,
|
||||
)
|
||||
from crewai.flow import Flow, ChatState, listen, start
|
||||
from crewai.flow.flow_context import current_flow_id, current_flow_name
|
||||
from crewai.flow.flow_context import (
|
||||
current_flow_defer_trace_finalization,
|
||||
current_flow_id,
|
||||
current_flow_name,
|
||||
)
|
||||
from crewai.flow.conversation import (
|
||||
append_message,
|
||||
get_conversation_messages,
|
||||
@@ -169,9 +173,6 @@ class TestConversationalFlow:
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="Experimental conversational registry behavior is out of scope for the definition-first start migration."
|
||||
)
|
||||
def test_handle_turn_routes_to_listener_and_records_public_result(self) -> None:
|
||||
@ConversationConfig(default_intents=["research"], intent_llm="gpt-4o-mini")
|
||||
class ResearchFlow(ConversationalFlow):
|
||||
@@ -595,18 +596,15 @@ class TestConversationalFlow:
|
||||
assert result == "legacy-searched"
|
||||
assert flow.state.last_intent == "search"
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="Experimental conversational sequential-start behavior is out of scope for the definition-first start migration."
|
||||
)
|
||||
def test_user_start_methods_run_sequentially_before_router_in_conversational_mode(
|
||||
self,
|
||||
) -> None:
|
||||
"""Conversational flows: user ``@start`` methods finish before router fires.
|
||||
|
||||
Non-chat flows run ``@start`` methods in parallel via ``asyncio.gather``,
|
||||
which would race with ``conversation_start`` and let the router fire
|
||||
which would race with ``route_conversation`` and let the router fire
|
||||
before user setup finished. In conversational mode the framework runs
|
||||
them sequentially, with ``conversation_start`` last.
|
||||
them sequentially, with ``route_conversation`` last.
|
||||
"""
|
||||
order: list[str] = []
|
||||
|
||||
@@ -649,18 +647,10 @@ class TestConversationalFlow:
|
||||
assert "attach_bus" in order # still fires every turn
|
||||
assert "route_turn" in order
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="Experimental inherited conversational start registration is out of scope for the definition-first start migration."
|
||||
)
|
||||
def test_subclass_can_override_conversation_start_without_redecorating(
|
||||
def test_subclass_can_override_conversation_start_helper(
|
||||
self,
|
||||
) -> None:
|
||||
"""Overriding an inherited ``@start`` method must not unregister it.
|
||||
|
||||
Before the metaclass fix, subclasses had to re-apply ``@start()`` on
|
||||
every override or the parent's ``conversation_start`` would silently
|
||||
drop out of the start registry — leaving the flow with nothing to fire.
|
||||
"""
|
||||
"""The compatibility helper remains overridable without adding a Flow node."""
|
||||
|
||||
bootstrap_calls: list[str] = []
|
||||
|
||||
@@ -681,6 +671,38 @@ class TestConversationalFlow:
|
||||
flow = BootstrapFlow()
|
||||
flow.handle_turn("hi")
|
||||
|
||||
assert bootstrap_calls == ["ran"]
|
||||
assert "conversation_start" not in BootstrapFlow.flow_definition().methods
|
||||
route_definition = BootstrapFlow.flow_definition().methods["route_conversation"]
|
||||
assert route_definition.start is True
|
||||
assert route_definition.router is True
|
||||
assert flow.state.messages[-1].content == "worked"
|
||||
|
||||
def test_legacy_decorated_conversation_start_runs_once_per_turn(
|
||||
self,
|
||||
) -> None:
|
||||
"""Legacy ``@start`` overrides are not invoked again by the router."""
|
||||
|
||||
bootstrap_calls: list[str] = []
|
||||
|
||||
@ConversationConfig()
|
||||
class BootstrapFlow(ConversationalFlow):
|
||||
@start()
|
||||
def conversation_start(self) -> str | None:
|
||||
bootstrap_calls.append("ran")
|
||||
return super().conversation_start()
|
||||
|
||||
def route_turn(self, context: dict[str, Any]) -> str | None:
|
||||
return "work"
|
||||
|
||||
@listen("work")
|
||||
def do_work(self) -> str:
|
||||
self.append_assistant_message("worked")
|
||||
return "worked"
|
||||
|
||||
flow = BootstrapFlow()
|
||||
flow.handle_turn("hi")
|
||||
|
||||
assert bootstrap_calls == ["ran"]
|
||||
assert flow.state.messages[-1].content == "worked"
|
||||
|
||||
@@ -1179,6 +1201,40 @@ class TestConversationalFlow:
|
||||
"finalize_session_traces must finalize the trace batch once"
|
||||
)
|
||||
|
||||
def test_deferred_resume_skips_per_resume_flow_finished_event(self) -> None:
|
||||
"""Deferred sessions do not emit terminal events while resuming."""
|
||||
from crewai.events.types.flow_events import FlowFinishedEvent
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
|
||||
class DeferredResumeFlow(Flow[ChatState]):
|
||||
defer_trace_finalization = True
|
||||
|
||||
@start()
|
||||
def begin(self) -> str:
|
||||
return "started"
|
||||
|
||||
flow = DeferredResumeFlow()
|
||||
flow._pending_feedback_context = PendingFeedbackContext(
|
||||
flow_id=flow.flow_id,
|
||||
flow_class="DeferredResumeFlow",
|
||||
method_name="begin",
|
||||
method_output="started",
|
||||
message="Review",
|
||||
)
|
||||
|
||||
finished_events: list[FlowFinishedEvent] = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(FlowFinishedEvent)
|
||||
def capture(_: Any, event: FlowFinishedEvent) -> None:
|
||||
finished_events.append(event)
|
||||
|
||||
flow.resume("approved")
|
||||
crewai_event_bus.flush()
|
||||
|
||||
assert finished_events == []
|
||||
|
||||
def test_finalize_session_traces_restores_event_scope(self, capsys) -> None:
|
||||
"""No ``empty scope stack`` warning when deferred ``flow_finished`` fires.
|
||||
|
||||
@@ -1281,7 +1337,11 @@ class TestFlowTracingWhenSuppressed:
|
||||
|
||||
assert started == ["QuietFlow"]
|
||||
|
||||
def test_method_execution_emitted_when_panel_events_suppressed(self) -> None:
|
||||
def test_method_execution_suppressed_when_flow_events_suppressed(self) -> None:
|
||||
"""``suppress_flow_events=True`` silences MethodExecution events so
|
||||
infrastructure flows (AgentExecutor, memory) don't emit one trace span
|
||||
per internal control-flow method."""
|
||||
|
||||
class QuietFlow(Flow[ChatState]):
|
||||
suppress_flow_events = True
|
||||
|
||||
@@ -1303,8 +1363,8 @@ class TestFlowTracingWhenSuppressed:
|
||||
with patch.object(crewai_event_bus, "emit", side_effect=track_emit):
|
||||
QuietFlow().kickoff()
|
||||
|
||||
assert started == ["begin"]
|
||||
assert finished == ["begin"]
|
||||
assert started == []
|
||||
assert finished == []
|
||||
|
||||
def test_llm_action_inside_flow_claims_flow_trace_batch(self) -> None:
|
||||
listener = TraceCollectionListener()
|
||||
@@ -1338,6 +1398,12 @@ class TestFlowTracingWhenSuppressed:
|
||||
|
||||
|
||||
class TestDeferTraceFinalization:
|
||||
def test_bare_conversational_flow_defers_by_default(self) -> None:
|
||||
class BareChat(ConversationalFlow):
|
||||
pass
|
||||
|
||||
assert BareChat()._should_defer_trace_finalization() is True
|
||||
|
||||
def test_conversation_config_drives_defer_flag(self) -> None:
|
||||
"""``ConversationConfig(defer_trace_finalization=...)`` controls whether
|
||||
a conversational subclass defers per-turn trace finalization."""
|
||||
@@ -1470,6 +1536,44 @@ class TestDeferredFlowLifecycleEvents:
|
||||
listener.batch_manager.finalize_batch()
|
||||
mock_finalize.assert_not_called()
|
||||
|
||||
def test_deferred_flow_kickoff_marks_trace_manager_session_deferred(
|
||||
self,
|
||||
) -> None:
|
||||
class DeferredTraceFlow(Flow[ChatState]):
|
||||
@start()
|
||||
def begin(self) -> str:
|
||||
return "done"
|
||||
|
||||
listener = TraceCollectionListener()
|
||||
listener.batch_manager.defer_session_finalization = False
|
||||
|
||||
flow = DeferredTraceFlow()
|
||||
flow.defer_trace_finalization = True
|
||||
|
||||
with patch.object(listener.batch_manager, "finalize_batch"):
|
||||
flow.kickoff()
|
||||
|
||||
assert listener.batch_manager.defer_session_finalization is True
|
||||
|
||||
flow.finalize_session_traces()
|
||||
|
||||
assert listener.batch_manager.defer_session_finalization is False
|
||||
|
||||
def test_non_deferred_flow_kickoff_clears_stale_trace_manager_flag(
|
||||
self,
|
||||
) -> None:
|
||||
class PlainTraceFlow(Flow[ChatState]):
|
||||
@start()
|
||||
def begin(self) -> str:
|
||||
return "done"
|
||||
|
||||
listener = TraceCollectionListener()
|
||||
listener.batch_manager.defer_session_finalization = True
|
||||
|
||||
PlainTraceFlow().kickoff()
|
||||
|
||||
assert listener.batch_manager.defer_session_finalization is False
|
||||
|
||||
|
||||
class TestNestedCrewTracing:
|
||||
def test_is_inside_active_flow_context_when_kickoff_running(self) -> None:
|
||||
@@ -1523,3 +1627,130 @@ class TestNestedCrewTracing:
|
||||
elif listener.batch_manager.batch_owner_type == "crew":
|
||||
listener.batch_manager.finalize_batch()
|
||||
mock_finalize.assert_not_called()
|
||||
|
||||
def test_lazy_flow_batch_from_context_preserves_deferred_parent(self) -> None:
|
||||
from crewai.events.listeners.tracing.trace_listener import (
|
||||
TraceCollectionListener,
|
||||
)
|
||||
|
||||
listener = TraceCollectionListener()
|
||||
listener.batch_manager.current_batch = None
|
||||
listener.batch_manager.batch_owner_type = None
|
||||
listener.batch_manager.batch_owner_id = None
|
||||
listener.batch_manager.defer_session_finalization = False
|
||||
listener.batch_manager.event_buffer.clear()
|
||||
|
||||
flow_id_token = current_flow_id.set("parent-flow-id")
|
||||
flow_name_token = current_flow_name.set("ParentChatFlow")
|
||||
defer_token = current_flow_defer_trace_finalization.set(True)
|
||||
try:
|
||||
initialized = listener._try_initialize_flow_batch_from_context(
|
||||
type("Event", (), {"timestamp": None})()
|
||||
)
|
||||
|
||||
assert initialized is True
|
||||
assert listener.batch_manager.batch_owner_type == "flow"
|
||||
assert listener.batch_manager.batch_owner_id == "parent-flow-id"
|
||||
assert listener.batch_manager.defer_session_finalization is True
|
||||
assert listener.batch_manager.current_batch is not None
|
||||
assert (
|
||||
listener.batch_manager.current_batch.execution_metadata[
|
||||
"execution_type"
|
||||
]
|
||||
== "flow"
|
||||
)
|
||||
assert (
|
||||
listener.batch_manager.current_batch.execution_metadata["flow_name"]
|
||||
== "ParentChatFlow"
|
||||
)
|
||||
finally:
|
||||
current_flow_defer_trace_finalization.reset(defer_token)
|
||||
current_flow_name.reset(flow_name_token)
|
||||
current_flow_id.reset(flow_id_token)
|
||||
listener.batch_manager.current_batch = None
|
||||
listener.batch_manager.batch_owner_type = None
|
||||
listener.batch_manager.batch_owner_id = None
|
||||
listener.batch_manager.trace_batch_id = None
|
||||
listener.batch_manager.defer_session_finalization = False
|
||||
listener.batch_manager.event_buffer.clear()
|
||||
|
||||
def test_nested_agent_executor_flow_does_not_finalize_parent_batch(
|
||||
self,
|
||||
) -> None:
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
class StaticLLM(BaseLLM):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(model="debug-static-llm", provider="debug")
|
||||
|
||||
def call(
|
||||
self,
|
||||
messages: Any,
|
||||
tools: Any = None,
|
||||
callbacks: Any = None,
|
||||
available_functions: Any = None,
|
||||
from_task: Any = None,
|
||||
from_agent: Any = None,
|
||||
response_model: Any = None,
|
||||
) -> str:
|
||||
return (
|
||||
"Thought: I can answer directly.\n"
|
||||
"Final Answer: nested crew result"
|
||||
)
|
||||
|
||||
class NestedCrewFlow(Flow[ChatState]):
|
||||
defer_trace_finalization = True
|
||||
tracing = True
|
||||
|
||||
@start()
|
||||
def begin(self) -> str:
|
||||
return "run_nested_crew"
|
||||
|
||||
@listen(begin)
|
||||
def run_nested_crew(self, _: str) -> str:
|
||||
agent = Agent(
|
||||
role="Debug Agent",
|
||||
goal="Return a short deterministic result",
|
||||
backstory="Used only for trace finalization debugging.",
|
||||
llm=StaticLLM(),
|
||||
verbose=False,
|
||||
)
|
||||
task = Task(
|
||||
description="Return the deterministic nested crew result.",
|
||||
expected_output="nested crew result",
|
||||
agent=agent,
|
||||
)
|
||||
return Crew(agents=[agent], tasks=[task], verbose=False).kickoff().raw
|
||||
|
||||
listener = TraceCollectionListener()
|
||||
listener.batch_manager.current_batch = None
|
||||
listener.batch_manager.batch_owner_type = None
|
||||
listener.batch_manager.batch_owner_id = None
|
||||
listener.batch_manager.trace_batch_id = None
|
||||
listener.batch_manager.defer_session_finalization = False
|
||||
listener.batch_manager.event_buffer.clear()
|
||||
listener.first_time_handler.is_first_time = False
|
||||
|
||||
def initialize_backend_batch(*_: Any, **__: Any) -> None:
|
||||
listener.batch_manager.trace_batch_id = "debug-trace-batch"
|
||||
|
||||
flow = NestedCrewFlow()
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
listener.batch_manager,
|
||||
"_initialize_backend_batch",
|
||||
side_effect=initialize_backend_batch,
|
||||
),
|
||||
patch.object(listener.batch_manager, "finalize_batch") as mock_finalize,
|
||||
):
|
||||
flow.kickoff()
|
||||
crewai_event_bus.flush()
|
||||
flow.kickoff()
|
||||
crewai_event_bus.flush()
|
||||
|
||||
assert mock_finalize.call_count == 0, (
|
||||
"nested AgentExecutor flows inside a deferred parent Flow must "
|
||||
"not finalize the parent trace batch"
|
||||
)
|
||||
|
||||
@@ -13,6 +13,7 @@ from pydantic import BaseModel
|
||||
import crewai.flow.dsl as flow_dsl
|
||||
import crewai.flow.flow_definition as flow_definition
|
||||
import crewai.flow.visualization.builder as visualization_builder
|
||||
from crewai.experimental import ConversationConfig, RouterConfig
|
||||
from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start
|
||||
|
||||
|
||||
@@ -35,7 +36,10 @@ def test_flow_public_exports_are_explicit():
|
||||
"start",
|
||||
}
|
||||
assert set(flow_definition.__all__) == {
|
||||
"FlowActionDefinition",
|
||||
"FlowConfigDefinition",
|
||||
"FlowConversationalDefinition",
|
||||
"FlowConversationalRouterDefinition",
|
||||
"FlowDefinition",
|
||||
"FlowDefinitionCondition",
|
||||
"FlowDefinitionDiagnostic",
|
||||
@@ -169,6 +173,7 @@ def test_flow_definition_maps_dsl_to_static_contract():
|
||||
assert definition.state.ref and "ContractState" in definition.state.ref
|
||||
assert definition.config.stream is True
|
||||
assert definition.config.max_method_calls == 7
|
||||
assert definition.conversational is None
|
||||
|
||||
assert definition.methods["begin"].start is True
|
||||
assert definition.methods["process"].listen == "begin"
|
||||
@@ -201,25 +206,75 @@ def test_flow_definition_excludes_conversational_builtins_for_regular_flows():
|
||||
|
||||
methods = RegularFlow.flow_definition().methods
|
||||
|
||||
assert RegularFlow.flow_definition().conversational is None
|
||||
assert set(methods) == {"begin"}
|
||||
assert "conversation_start" not in methods
|
||||
assert "route_conversation" not in methods
|
||||
assert "converse_turn" not in methods
|
||||
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="Experimental conversational inherited built-ins are out of scope for the definition-first start migration."
|
||||
)
|
||||
def test_flow_definition_includes_conversational_builtins_when_enabled():
|
||||
class ChatFlow(Flow):
|
||||
conversational = True
|
||||
|
||||
methods = ChatFlow.flow_definition().methods
|
||||
definition = ChatFlow.flow_definition()
|
||||
methods = definition.methods
|
||||
|
||||
assert "conversation_start" in methods
|
||||
assert definition.conversational is not None
|
||||
assert definition.conversational.enabled is True
|
||||
assert definition.conversational.defer_trace_finalization is True
|
||||
assert definition.conversational.builtin_routes == ["converse", "end"]
|
||||
assert "conversation_start" not in methods
|
||||
assert "route_conversation" in methods
|
||||
assert "converse_turn" in methods
|
||||
assert methods["conversation_start"].start is True
|
||||
assert methods["route_conversation"].start is True
|
||||
assert methods["route_conversation"].router is True
|
||||
|
||||
|
||||
def test_flow_definition_serializes_conversational_config():
|
||||
@ConversationConfig(
|
||||
system_prompt="Be concise.",
|
||||
llm="gpt-4o-mini",
|
||||
router=RouterConfig(
|
||||
prompt="Pick a route.",
|
||||
routes=["research"],
|
||||
default_intent="converse",
|
||||
fallback_intent="end",
|
||||
),
|
||||
default_intents=["research"],
|
||||
visible_agent_outputs=["researcher"],
|
||||
defer_trace_finalization=False,
|
||||
)
|
||||
class ChatFlow(Flow):
|
||||
conversational = True
|
||||
|
||||
conversational = ChatFlow.flow_definition().conversational
|
||||
|
||||
assert conversational is not None
|
||||
assert conversational.system_prompt == "Be concise."
|
||||
assert conversational.llm == "gpt-4o-mini"
|
||||
assert conversational.default_intents == ["research"]
|
||||
assert conversational.visible_agent_outputs == ["researcher"]
|
||||
assert conversational.defer_trace_finalization is False
|
||||
assert conversational.router is not None
|
||||
assert conversational.router.prompt == "Pick a route."
|
||||
assert conversational.router.routes == ["research"]
|
||||
assert conversational.router.fallback_intent == "end"
|
||||
|
||||
|
||||
def test_flow_definition_uses_collapsed_conversational_router_start():
|
||||
class ChatFlow(Flow):
|
||||
conversational = True
|
||||
|
||||
def conversation_start(self) -> str | None:
|
||||
return "custom"
|
||||
|
||||
methods = ChatFlow.flow_definition().methods
|
||||
|
||||
assert "conversation_start" not in methods
|
||||
assert "route_conversation" in methods
|
||||
assert methods["route_conversation"].start is True
|
||||
assert methods["route_conversation"].router is True
|
||||
|
||||
|
||||
def test_flow_definition_serializes_human_feedback_metadata():
|
||||
@@ -575,6 +630,7 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract():
|
||||
"name": "LoadedDiagnosticsFlow",
|
||||
"methods": {
|
||||
"decision": {
|
||||
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"},
|
||||
"router": True,
|
||||
"emit": ["continue"],
|
||||
}
|
||||
@@ -608,6 +664,7 @@ def test_router_start_false_without_listen_reports_missing_trigger():
|
||||
"name": "LoadedFlow",
|
||||
"methods": {
|
||||
"decision": {
|
||||
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
|
||||
"router": True,
|
||||
"start": False,
|
||||
"emit": ["continue"],
|
||||
@@ -686,8 +743,14 @@ def test_static_string_listener_is_allowed_by_contract():
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "TypoFlow",
|
||||
"methods": {
|
||||
"begin": {"start": True},
|
||||
"handle": {"listen": "begni"},
|
||||
"begin": {
|
||||
"do": {"ref": "loaded_flows:TypoFlow.begin"},
|
||||
"start": True,
|
||||
},
|
||||
"handle": {
|
||||
"do": {"ref": "loaded_flows:TypoFlow.handle"},
|
||||
"listen": "begni",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -700,8 +763,15 @@ def test_start_false_not_classified_as_start_method():
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "ExplicitNonStartFlow",
|
||||
"methods": {
|
||||
"begin": {"start": True},
|
||||
"handle": {"start": False, "listen": "begin"},
|
||||
"begin": {
|
||||
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.begin"},
|
||||
"start": True,
|
||||
},
|
||||
"handle": {
|
||||
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.handle"},
|
||||
"start": False,
|
||||
"listen": "begin",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -758,6 +828,7 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
|
||||
"name": "LoadedFlow",
|
||||
"methods": {
|
||||
"decision": {
|
||||
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
|
||||
"router": True,
|
||||
"emit": ["continue"],
|
||||
}
|
||||
|
||||
1422
lib/crewai/tests/test_flow_from_definition.py
Normal file
1422
lib/crewai/tests/test_flow_from_definition.py
Normal file
File diff suppressed because it is too large
Load Diff
511
lib/crewai/tests/test_flow_usage_metrics.py
Normal file
511
lib/crewai/tests/test_flow_usage_metrics.py
Normal file
@@ -0,0 +1,511 @@
|
||||
"""Tests for flow-level token usage aggregation
|
||||
|
||||
``flow.usage_metrics`` listens to ``LLMCallCompletedEvent`` for the duration
|
||||
of ``kickoff_async`` so it covers every LLM call inside the flow — crew-led,
|
||||
tool-led, AND bare ``LLM.call(...)`` from a flow method. We exercise the
|
||||
aggregator end-to-end through the real event bus with fabricated events and
|
||||
explicit contextvar control; no live LLM provider is required.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextvars
|
||||
import os
|
||||
import tempfile
|
||||
from typing import Any, Callable
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.llm_events import LLMCallCompletedEvent, LLMCallType
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from crewai.flow.flow_context import current_flow_id
|
||||
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
|
||||
from crewai.flow.runtime import _usage_dict_to_metrics
|
||||
from crewai.types.usage_metrics import UsageMetrics
|
||||
|
||||
|
||||
def _emit_llm_call(
|
||||
*,
|
||||
flow_id: str | None,
|
||||
prompt_tokens: int = 0,
|
||||
completion_tokens: int = 0,
|
||||
cached_prompt_tokens: int = 0,
|
||||
reasoning_tokens: int = 0,
|
||||
cache_creation_tokens: int = 0,
|
||||
) -> None:
|
||||
"""Emit one fake ``LLMCallCompletedEvent`` with ``current_flow_id`` pinned
|
||||
to ``flow_id``.
|
||||
|
||||
Runs in a freshly-copied context so the value the bus snapshots at emit
|
||||
time is exactly ``flow_id`` — independent of the calling thread's outer
|
||||
context. Mirrors how the real ``LLM.call`` emits events at runtime.
|
||||
"""
|
||||
usage: dict[str, Any] = {
|
||||
"prompt_tokens": prompt_tokens,
|
||||
"completion_tokens": completion_tokens,
|
||||
"total_tokens": prompt_tokens + completion_tokens,
|
||||
}
|
||||
for key, value in (
|
||||
("cached_prompt_tokens", cached_prompt_tokens),
|
||||
("reasoning_tokens", reasoning_tokens),
|
||||
("cache_creation_tokens", cache_creation_tokens),
|
||||
):
|
||||
if value:
|
||||
usage[key] = value
|
||||
event = LLMCallCompletedEvent(
|
||||
call_id=str(uuid4()),
|
||||
model="gpt-4o-mini",
|
||||
response="ok",
|
||||
call_type=LLMCallType.LLM_CALL,
|
||||
usage=usage,
|
||||
)
|
||||
|
||||
ctx = contextvars.copy_context()
|
||||
|
||||
def _emit() -> None:
|
||||
current_flow_id.set(flow_id)
|
||||
future = crewai_event_bus.emit(object(), event)
|
||||
if future is not None:
|
||||
future.result(timeout=5.0)
|
||||
|
||||
ctx.run(_emit)
|
||||
|
||||
|
||||
class _ScriptedFlow(Flow):
|
||||
"""A Flow whose ``@start`` delegates to a per-instance ``_script`` closure.
|
||||
|
||||
Each test attaches a script with ``flow._script = lambda f: ...`` so we
|
||||
don't redefine a Flow subclass for every scenario.
|
||||
"""
|
||||
|
||||
@start()
|
||||
def run(self) -> None:
|
||||
script: Callable[[Flow], None] = getattr(self, "_script", lambda _f: None)
|
||||
script(self)
|
||||
|
||||
|
||||
def _run(script: Callable[[Flow], None] = lambda _f: None) -> Flow:
|
||||
"""Build a ``_ScriptedFlow``, attach ``script``, kickoff. Returns the flow."""
|
||||
flow = _ScriptedFlow()
|
||||
flow._script = script
|
||||
flow.kickoff()
|
||||
return flow
|
||||
|
||||
|
||||
class TestUsageDictToMetrics:
|
||||
"""Unit tests for the dict-to-UsageMetrics normalizer."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"usage, expected",
|
||||
[
|
||||
(None, None),
|
||||
({}, None),
|
||||
(
|
||||
{"prompt_tokens": 10, "completion_tokens": 20, "total_tokens": 30},
|
||||
UsageMetrics(
|
||||
prompt_tokens=10,
|
||||
completion_tokens=20,
|
||||
total_tokens=30,
|
||||
successful_requests=1,
|
||||
),
|
||||
),
|
||||
# total_tokens missing → derived from prompt + completion
|
||||
(
|
||||
{"prompt_tokens": 4, "completion_tokens": 6},
|
||||
UsageMetrics(
|
||||
prompt_tokens=4,
|
||||
completion_tokens=6,
|
||||
total_tokens=10,
|
||||
successful_requests=1,
|
||||
),
|
||||
),
|
||||
# Extended provider-specific keys flow through normalization
|
||||
(
|
||||
{
|
||||
"prompt_tokens": 100,
|
||||
"completion_tokens": 80,
|
||||
"total_tokens": 180,
|
||||
"cached_prompt_tokens": 40,
|
||||
"reasoning_tokens": 25,
|
||||
"cache_creation_tokens": 10,
|
||||
},
|
||||
UsageMetrics(
|
||||
prompt_tokens=100,
|
||||
completion_tokens=80,
|
||||
total_tokens=180,
|
||||
cached_prompt_tokens=40,
|
||||
reasoning_tokens=25,
|
||||
cache_creation_tokens=10,
|
||||
successful_requests=1,
|
||||
),
|
||||
),
|
||||
# Garbage / non-int values coerce to 0 instead of crashing
|
||||
(
|
||||
{"prompt_tokens": "n/a", "completion_tokens": None, "total_tokens": 7},
|
||||
UsageMetrics(
|
||||
prompt_tokens=0,
|
||||
completion_tokens=0,
|
||||
total_tokens=0,
|
||||
successful_requests=1,
|
||||
),
|
||||
),
|
||||
# Native Anthropic provider emits input_tokens/output_tokens
|
||||
(
|
||||
{"input_tokens": 12, "output_tokens": 8},
|
||||
UsageMetrics(
|
||||
prompt_tokens=12,
|
||||
completion_tokens=8,
|
||||
total_tokens=20,
|
||||
successful_requests=1,
|
||||
),
|
||||
),
|
||||
# Native Gemini provider emits prompt_token_count/candidates_token_count
|
||||
(
|
||||
{
|
||||
"prompt_token_count": 30,
|
||||
"candidates_token_count": 20,
|
||||
"reasoning_tokens": 5,
|
||||
},
|
||||
UsageMetrics(
|
||||
prompt_tokens=30,
|
||||
completion_tokens=20,
|
||||
total_tokens=50,
|
||||
reasoning_tokens=5,
|
||||
successful_requests=1,
|
||||
),
|
||||
),
|
||||
# OpenAI nests cached_tokens under prompt_tokens_details
|
||||
(
|
||||
{
|
||||
"prompt_tokens": 100,
|
||||
"completion_tokens": 50,
|
||||
"prompt_tokens_details": {"cached_tokens": 30},
|
||||
},
|
||||
UsageMetrics(
|
||||
prompt_tokens=100,
|
||||
completion_tokens=50,
|
||||
total_tokens=150,
|
||||
cached_prompt_tokens=30,
|
||||
successful_requests=1,
|
||||
),
|
||||
),
|
||||
],
|
||||
ids=[
|
||||
"none",
|
||||
"empty",
|
||||
"all_keys",
|
||||
"no_total",
|
||||
"extended_keys",
|
||||
"garbage",
|
||||
"anthropic_aliases",
|
||||
"gemini_aliases",
|
||||
"openai_nested_cached",
|
||||
],
|
||||
)
|
||||
def test_normalization(
|
||||
self, usage: dict[str, Any] | None, expected: UsageMetrics | None
|
||||
) -> None:
|
||||
assert _usage_dict_to_metrics(usage) == expected
|
||||
|
||||
|
||||
class TestFlowUsageAggregation:
|
||||
"""End-to-end tests driving the listener through the real event bus."""
|
||||
|
||||
def test_sums_every_llm_call_in_the_flow(self) -> None:
|
||||
"""Multiple LLM calls — including bare ``LLM.call(...)`` made outside
|
||||
any crew — accumulate; ``successful_requests`` tracks the call count."""
|
||||
|
||||
def script(flow: Flow) -> None:
|
||||
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=300, completion_tokens=300)
|
||||
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=200, completion_tokens=100)
|
||||
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=20, completion_tokens=20)
|
||||
|
||||
flow = _run(script)
|
||||
|
||||
assert flow.usage_metrics.total_tokens == 940
|
||||
assert flow.usage_metrics.prompt_tokens == 520
|
||||
assert flow.usage_metrics.completion_tokens == 420
|
||||
assert flow.usage_metrics.successful_requests == 3
|
||||
|
||||
def test_returns_zero_when_no_calls_happen(self) -> None:
|
||||
flow = _run()
|
||||
assert flow.usage_metrics == UsageMetrics()
|
||||
|
||||
def test_ignores_events_from_other_flows(self) -> None:
|
||||
"""Concurrent flow runs share the singleton bus, so the listener must
|
||||
scope itself to its own flow via the contextvar match."""
|
||||
|
||||
def script(flow: Flow) -> None:
|
||||
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=50, completion_tokens=50)
|
||||
_emit_llm_call(flow_id="some-other-flow", prompt_tokens=49_000, completion_tokens=50_999)
|
||||
|
||||
flow = _run(script)
|
||||
|
||||
assert flow.usage_metrics.total_tokens == 100
|
||||
assert flow.usage_metrics.successful_requests == 1
|
||||
|
||||
def test_resets_between_kickoffs(self) -> None:
|
||||
flow = _ScriptedFlow()
|
||||
flow._script = lambda f: _emit_llm_call(
|
||||
flow_id=f._flow_match_id, prompt_tokens=250, completion_tokens=250
|
||||
)
|
||||
|
||||
flow.kickoff()
|
||||
flow.kickoff()
|
||||
|
||||
assert flow.usage_metrics.total_tokens == 500
|
||||
assert flow.usage_metrics.successful_requests == 1
|
||||
|
||||
def test_usage_metrics_returns_independent_copy(self) -> None:
|
||||
"""``usage_metrics`` must return a copy, not the internal instance —
|
||||
otherwise callers can clobber the in-flight accumulator."""
|
||||
|
||||
flow = _run(
|
||||
lambda f: _emit_llm_call(
|
||||
flow_id=f._flow_match_id, prompt_tokens=50, completion_tokens=50
|
||||
)
|
||||
)
|
||||
|
||||
snapshot = flow.usage_metrics
|
||||
snapshot.total_tokens = 999_999
|
||||
|
||||
assert flow.usage_metrics.total_tokens == 100
|
||||
|
||||
def test_handler_is_unregistered_after_kickoff(self) -> None:
|
||||
"""Long-lived workers (Celery, devkit) must not leak one handler per
|
||||
kickoff on the singleton bus, on either the success or failure path."""
|
||||
|
||||
def handler_count() -> int:
|
||||
return len(
|
||||
crewai_event_bus._sync_handlers.get(LLMCallCompletedEvent, frozenset())
|
||||
)
|
||||
|
||||
before = handler_count()
|
||||
|
||||
flow = _ScriptedFlow()
|
||||
flow._script = lambda f: _emit_llm_call(
|
||||
flow_id=f._flow_match_id, prompt_tokens=5, completion_tokens=5
|
||||
)
|
||||
for _ in range(3):
|
||||
flow.kickoff()
|
||||
|
||||
assert handler_count() == before
|
||||
|
||||
def boom(_f: Flow) -> None:
|
||||
raise RuntimeError("boom")
|
||||
|
||||
failing = _ScriptedFlow()
|
||||
failing._script = boom
|
||||
|
||||
with pytest.raises(RuntimeError, match="boom"):
|
||||
failing.kickoff()
|
||||
|
||||
assert handler_count() == before
|
||||
|
||||
def test_kickoff_flushes_event_bus_before_returning(
|
||||
self, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
"""`kickoff_async` must drain pending LLMCallCompletedEvent handlers
|
||||
before detaching the listener — otherwise late handlers landing on
|
||||
the threadpool would be lost on short flows. Mirrors the flush
|
||||
``Crew.kickoff()`` performs before reporting ``token_usage``."""
|
||||
|
||||
flush_calls: list[None] = []
|
||||
original_flush = crewai_event_bus.flush
|
||||
|
||||
def tracked_flush(*args: Any, **kwargs: Any) -> bool:
|
||||
flush_calls.append(None)
|
||||
return original_flush(*args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(crewai_event_bus, "flush", tracked_flush)
|
||||
|
||||
flow = _ScriptedFlow()
|
||||
flow._script = lambda f: _emit_llm_call(
|
||||
flow_id=f._flow_match_id, prompt_tokens=3, completion_tokens=4
|
||||
)
|
||||
flow.kickoff()
|
||||
|
||||
assert flush_calls, "kickoff did not flush the event bus before returning"
|
||||
assert flow.usage_metrics.total_tokens == 7
|
||||
|
||||
def test_stale_handler_from_prior_kickoff_does_not_contaminate(self) -> None:
|
||||
"""A handler still queued from a prior kickoff must not write into
|
||||
a later kickoff's accumulator. The handler's closure captures its
|
||||
own accumulator object, so any late writes land on an orphaned
|
||||
instance and the live ``usage_metrics`` is unaffected."""
|
||||
|
||||
captured: dict[str, Any] = {}
|
||||
|
||||
def script(flow: Flow) -> None:
|
||||
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=10, completion_tokens=10)
|
||||
captured["handler"] = flow._usage_aggregation_handler
|
||||
captured["match_id"] = flow._flow_match_id
|
||||
|
||||
flow = _run(script)
|
||||
assert flow.usage_metrics.total_tokens == 20
|
||||
|
||||
flow._script = lambda f: None
|
||||
flow.kickoff()
|
||||
assert flow.usage_metrics.total_tokens == 0
|
||||
|
||||
stale_handler = captured["handler"]
|
||||
assert stale_handler is not None
|
||||
|
||||
stale_event = LLMCallCompletedEvent(
|
||||
call_id=str(uuid4()),
|
||||
model="gpt-4o-mini",
|
||||
response="ok",
|
||||
call_type=LLMCallType.LLM_CALL,
|
||||
usage={"prompt_tokens": 999, "completion_tokens": 999, "total_tokens": 1998},
|
||||
)
|
||||
ctx = contextvars.copy_context()
|
||||
ctx.run(lambda: (current_flow_id.set(captured["match_id"]), stale_handler(object(), stale_event)))
|
||||
|
||||
assert flow.usage_metrics.total_tokens == 0
|
||||
|
||||
def test_pause_detaches_listener_and_does_not_leak(self) -> None:
|
||||
"""When ``kickoff_async`` pauses for human feedback, the listener
|
||||
must be detached from the singleton bus to avoid leaking handlers
|
||||
across abandoned paused instances. Pre-pause LLM events still
|
||||
count because the bus snapshots handlers at emit time. Late
|
||||
events emitted after the pause returns do not count for this
|
||||
instance — resume paths re-attach a fresh listener."""
|
||||
|
||||
from crewai.flow.async_feedback.types import HumanFeedbackPending
|
||||
|
||||
captured: dict[str, Any] = {}
|
||||
|
||||
class _PausingFlow(Flow):
|
||||
@start()
|
||||
def begin(self) -> None:
|
||||
_emit_llm_call(
|
||||
flow_id=self._flow_match_id,
|
||||
prompt_tokens=10,
|
||||
completion_tokens=20,
|
||||
)
|
||||
captured["pre_pause_total"] = self.usage_metrics.total_tokens
|
||||
raise HumanFeedbackPending(
|
||||
context=PendingFeedbackContext(
|
||||
flow_id=self.flow_id,
|
||||
flow_class="_PausingFlow",
|
||||
method_name="begin",
|
||||
method_output="content",
|
||||
message="Review:",
|
||||
)
|
||||
)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db"))
|
||||
flow = _PausingFlow(persistence=persistence)
|
||||
result = flow.kickoff()
|
||||
|
||||
assert isinstance(result, HumanFeedbackPending)
|
||||
assert captured["pre_pause_total"] == 30
|
||||
assert flow._usage_aggregation_handler is None
|
||||
|
||||
# A late event emitted after the pause does not reach the
|
||||
# detached listener, so the running total is unchanged.
|
||||
_emit_llm_call(
|
||||
flow_id=flow._flow_match_id,
|
||||
prompt_tokens=2,
|
||||
completion_tokens=3,
|
||||
)
|
||||
assert flow.usage_metrics.total_tokens == 30
|
||||
|
||||
def test_aggregates_resume_after_from_pending(self) -> None:
|
||||
"""A flow restored via ``from_pending`` is a fresh instance with no
|
||||
``_flow_match_id``; without seeding it, the listener attached in
|
||||
``resume_async`` either ignores its own LLM calls or absorbs unrelated
|
||||
ones. ``from_pending`` must seed the match id so the resume-phase
|
||||
aggregator counts our own calls and only our own calls."""
|
||||
|
||||
class _ResumeFlow(Flow):
|
||||
@start()
|
||||
def begin(self) -> str:
|
||||
return "content"
|
||||
|
||||
@listen(begin)
|
||||
def on_begin(self, _feedback: Any) -> str:
|
||||
_emit_llm_call(
|
||||
flow_id=self._flow_match_id,
|
||||
prompt_tokens=100,
|
||||
completion_tokens=50,
|
||||
)
|
||||
_emit_llm_call(
|
||||
flow_id="some-other-flow",
|
||||
prompt_tokens=9_999,
|
||||
completion_tokens=9_999,
|
||||
)
|
||||
return "done"
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db"))
|
||||
flow_id = "usage-resume-test"
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid=flow_id,
|
||||
context=PendingFeedbackContext(
|
||||
flow_id=flow_id,
|
||||
flow_class="_ResumeFlow",
|
||||
method_name="begin",
|
||||
method_output="content",
|
||||
message="Review:",
|
||||
),
|
||||
state_data={"id": flow_id},
|
||||
)
|
||||
|
||||
flow = _ResumeFlow.from_pending(flow_id, persistence)
|
||||
assert flow._flow_match_id == flow.flow_id
|
||||
|
||||
flow.resume("ok")
|
||||
|
||||
assert flow.usage_metrics.total_tokens == 150
|
||||
assert flow.usage_metrics.prompt_tokens == 100
|
||||
assert flow.usage_metrics.completion_tokens == 50
|
||||
assert flow.usage_metrics.successful_requests == 1
|
||||
|
||||
def test_resume_aggregates_under_foreign_flow_context(self) -> None:
|
||||
"""Resume must override an already-set ``current_flow_id`` so its
|
||||
own LLM events match the listener's filter even when invoked from
|
||||
inside another flow's active context."""
|
||||
|
||||
class _ResumeFlow(Flow):
|
||||
@start()
|
||||
def begin(self) -> str:
|
||||
return "content"
|
||||
|
||||
@listen(begin)
|
||||
def on_begin(self, _feedback: Any) -> str:
|
||||
_emit_llm_call(
|
||||
flow_id=self._flow_match_id,
|
||||
prompt_tokens=42,
|
||||
completion_tokens=8,
|
||||
)
|
||||
return "done"
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db"))
|
||||
flow_id = "resume-foreign-context"
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid=flow_id,
|
||||
context=PendingFeedbackContext(
|
||||
flow_id=flow_id,
|
||||
flow_class="_ResumeFlow",
|
||||
method_name="begin",
|
||||
method_output="content",
|
||||
message="Review:",
|
||||
),
|
||||
state_data={"id": flow_id},
|
||||
)
|
||||
|
||||
foreign_token = current_flow_id.set("some-parent-flow")
|
||||
try:
|
||||
flow = _ResumeFlow.from_pending(flow_id, persistence)
|
||||
flow.resume("ok")
|
||||
finally:
|
||||
current_flow_id.reset(foreign_token)
|
||||
|
||||
assert flow.usage_metrics.total_tokens == 50
|
||||
assert flow.usage_metrics.successful_requests == 1
|
||||
@@ -77,12 +77,22 @@ class ComplexFlow(Flow):
|
||||
return "complete"
|
||||
|
||||
|
||||
def _attach_flow_definition(flow_class: type[Flow], methods: dict[str, object]) -> None:
|
||||
def _attach_flow_definition(
|
||||
flow_class: type[Flow], methods: dict[str, dict[str, object]]
|
||||
) -> None:
|
||||
flow_class._flow_definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": flow_class.__name__,
|
||||
"methods": methods,
|
||||
"methods": {
|
||||
name: {
|
||||
"do": {
|
||||
"ref": f"{flow_class.__module__}:{flow_class.__name__}.{name}"
|
||||
},
|
||||
**spec,
|
||||
}
|
||||
for name, spec in methods.items()
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -125,13 +135,20 @@ def test_build_flow_structure_from_flow_definition():
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "DefinedFlow",
|
||||
"methods": {
|
||||
"begin": {"start": True},
|
||||
"begin": {
|
||||
"do": {"ref": "defined_flows:DefinedFlow.begin"},
|
||||
"start": True,
|
||||
},
|
||||
"decide": {
|
||||
"do": {"ref": "defined_flows:DefinedFlow.decide"},
|
||||
"listen": "begin",
|
||||
"router": True,
|
||||
"emit": ["done"],
|
||||
},
|
||||
"finish": {"listen": "done"},
|
||||
"finish": {
|
||||
"do": {"ref": "defined_flows:DefinedFlow.finish"},
|
||||
"listen": "done",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -92,8 +92,8 @@ class TestHumanFeedbackValidation:
|
||||
assert hasattr(test_method, "__human_feedback_config__")
|
||||
assert not hasattr(test_method, "__is_router__")
|
||||
|
||||
def test_persist_preserves_human_feedback_llm_attribute(self):
|
||||
"""Test @persist preserves the live LLM stashed by @human_feedback."""
|
||||
def test_persist_preserves_human_feedback_config(self):
|
||||
"""Test @persist preserves the config stamped by @human_feedback."""
|
||||
llm = object()
|
||||
|
||||
@persist()
|
||||
@@ -105,8 +105,8 @@ class TestHumanFeedbackValidation:
|
||||
def test_method(self):
|
||||
return "output"
|
||||
|
||||
assert hasattr(test_method, "_human_feedback_llm")
|
||||
assert test_method._human_feedback_llm is llm
|
||||
assert hasattr(test_method, "__human_feedback_config__")
|
||||
assert test_method.__human_feedback_config__.llm is llm
|
||||
|
||||
|
||||
class TestHumanFeedbackConfig:
|
||||
@@ -481,7 +481,7 @@ class TestHumanFeedbackLearn:
|
||||
with patch.object(
|
||||
flow, "_request_human_feedback", return_value="looks good"
|
||||
):
|
||||
flow.produce()
|
||||
flow.kickoff()
|
||||
|
||||
# memory.recall and memory.remember_many should NOT be called
|
||||
flow.memory.recall.assert_not_called()
|
||||
@@ -516,7 +516,7 @@ class TestHumanFeedbackLearn:
|
||||
)
|
||||
MockLLM.return_value = mock_llm
|
||||
|
||||
flow.produce()
|
||||
flow.kickoff()
|
||||
|
||||
# remember_many should be called with the distilled lesson
|
||||
flow.memory.remember_many.assert_called_once()
|
||||
@@ -570,7 +570,7 @@ class TestHumanFeedbackLearn:
|
||||
]
|
||||
MockLLM.return_value = mock_llm
|
||||
|
||||
flow.produce()
|
||||
flow.kickoff()
|
||||
|
||||
assert captured_output["shown_to_human"] == "draft with citations added"
|
||||
# recall was called to find past lessons
|
||||
@@ -592,7 +592,7 @@ class TestHumanFeedbackLearn:
|
||||
with patch.object(
|
||||
flow, "_request_human_feedback", return_value=""
|
||||
):
|
||||
flow.produce()
|
||||
flow.kickoff()
|
||||
|
||||
flow.memory.remember_many.assert_not_called()
|
||||
|
||||
@@ -645,7 +645,7 @@ class TestHumanFeedbackLearn:
|
||||
mock_llm.call.side_effect = RuntimeError("simulated pre-review failure")
|
||||
MockLLM.return_value = mock_llm
|
||||
|
||||
flow.produce()
|
||||
flow.kickoff()
|
||||
|
||||
assert captured["shown_to_human"] == "raw draft"
|
||||
assert any(
|
||||
@@ -690,7 +690,7 @@ class TestHumanFeedbackLearn:
|
||||
MockLLM.return_value = mock_llm
|
||||
|
||||
with pytest.raises(RuntimeError, match="simulated pre-review failure"):
|
||||
flow.produce()
|
||||
flow.kickoff()
|
||||
|
||||
def test_distillation_failure_logs_and_does_not_block_flow(self, caplog):
|
||||
"""Distillation LLM failure logs a warning but does not break the flow."""
|
||||
@@ -717,7 +717,7 @@ class TestHumanFeedbackLearn:
|
||||
mock_llm.call.side_effect = RuntimeError("simulated distill failure")
|
||||
MockLLM.return_value = mock_llm
|
||||
|
||||
flow.produce() # must not raise
|
||||
flow.kickoff() # must not raise
|
||||
|
||||
flow.memory.remember_many.assert_not_called()
|
||||
assert any(
|
||||
|
||||
@@ -778,77 +778,11 @@ class TestEdgeCases:
|
||||
class TestLLMConfigPreservation:
|
||||
"""Tests that LLM config is preserved through @human_feedback serialization.
|
||||
|
||||
PR #4970 introduced _human_feedback_llm stashing so the live LLM object survives
|
||||
decorator wrapping for same-process resume. The serialization path
|
||||
(_serialize_llm_for_context / _deserialize_llm_from_context) preserves
|
||||
config for cross-process resume.
|
||||
The flow definition keeps the live LLM object for same-process execution.
|
||||
The serialization path (_serialize_llm_for_context /
|
||||
_deserialize_llm_from_context) preserves config for cross-process resume.
|
||||
"""
|
||||
|
||||
def test_human_feedback_llm_stashed_on_wrapper_with_llm_instance(self):
|
||||
"""Test that passing an LLM instance stashes it on the wrapper as _human_feedback_llm."""
|
||||
from crewai.llm import LLM
|
||||
|
||||
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
|
||||
|
||||
class ConfigFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=llm_instance,
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
method = ConfigFlow.review
|
||||
assert hasattr(method, "_human_feedback_llm"), "_human_feedback_llm not found on wrapper"
|
||||
assert method._human_feedback_llm is llm_instance, "_human_feedback_llm is not the same object"
|
||||
|
||||
def test_human_feedback_llm_preserved_on_listen_method(self):
|
||||
"""Test that _human_feedback_llm is preserved when @human_feedback is on a @listen method."""
|
||||
from crewai.llm import LLM
|
||||
|
||||
llm_instance = LLM(model="gpt-4o-mini", temperature=0.7)
|
||||
|
||||
class ListenConfigFlow(Flow):
|
||||
@start()
|
||||
def generate(self):
|
||||
return "draft"
|
||||
|
||||
@listen("generate")
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=llm_instance,
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
method = ListenConfigFlow.review
|
||||
assert hasattr(method, "_human_feedback_llm")
|
||||
assert method._human_feedback_llm is llm_instance
|
||||
|
||||
def test_human_feedback_llm_accessible_on_instance(self):
|
||||
"""Test that _human_feedback_llm survives Flow instantiation (bound method access)."""
|
||||
from crewai.llm import LLM
|
||||
|
||||
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
|
||||
|
||||
class InstanceFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=llm_instance,
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
flow = InstanceFlow()
|
||||
instance_method = flow.review
|
||||
assert hasattr(instance_method, "_human_feedback_llm")
|
||||
assert instance_method._human_feedback_llm is llm_instance
|
||||
|
||||
def test_serialize_llm_preserves_config_fields(self):
|
||||
"""Test that _serialize_llm_for_context captures temperature, base_url, etc."""
|
||||
from crewai.flow.human_feedback import _serialize_llm_for_context
|
||||
|
||||
@@ -838,6 +838,74 @@ def test_flow_method_execution_finished_includes_serialized_state():
|
||||
assert final_output == "final_result"
|
||||
|
||||
|
||||
def test_suppress_flow_events_silences_method_lifecycle_events():
|
||||
"""``suppress_flow_events=True`` emits no MethodExecution* events on the
|
||||
bus (used by infrastructure flows like AgentExecutor so their control-flow
|
||||
methods don't pollute traces), while default flows still emit them."""
|
||||
captured: list[tuple[str, str]] = []
|
||||
|
||||
class SuppressedFlow(Flow):
|
||||
suppress_flow_events: bool = True
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@listen("begin")
|
||||
def process(self):
|
||||
return "done"
|
||||
|
||||
class ControlFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@listen("begin")
|
||||
def process(self):
|
||||
return "done"
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionStartedEvent)
|
||||
def _on_started(source, event):
|
||||
captured.append(("started", type(source).__name__))
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionFinishedEvent)
|
||||
def _on_finished(source, event):
|
||||
captured.append(("finished", type(source).__name__))
|
||||
|
||||
SuppressedFlow().kickoff()
|
||||
wait_for_event_handlers()
|
||||
assert [e for e in captured if e[1] == "SuppressedFlow"] == [], (
|
||||
"suppress_flow_events=True must emit no MethodExecution* events"
|
||||
)
|
||||
|
||||
captured.clear()
|
||||
ControlFlow().kickoff()
|
||||
wait_for_event_handlers()
|
||||
control = [e for e in captured if e[1] == "ControlFlow"]
|
||||
assert ("started", "ControlFlow") in control
|
||||
assert ("finished", "ControlFlow") in control
|
||||
|
||||
|
||||
def test_infrastructure_flows_suppress_flow_events_by_default():
|
||||
"""Pin the infra flows that must stay silent in traces.
|
||||
|
||||
The gating in ``_execute_method`` only helps if these flows actually set
|
||||
``suppress_flow_events=True``; without this guard, removing the flag from
|
||||
AgentExecutor would silently bring back the verbose per-method trace spans.
|
||||
"""
|
||||
from crewai.experimental.agent_executor import AgentExecutor
|
||||
from crewai.memory.encoding_flow import EncodingFlow
|
||||
from crewai.memory.recall_flow import RecallFlow
|
||||
|
||||
assert AgentExecutor.model_fields["suppress_flow_events"].default is True
|
||||
|
||||
for flow_cls in (EncodingFlow, RecallFlow):
|
||||
flow = flow_cls(storage=None, llm=None, embedder=None)
|
||||
assert flow.suppress_flow_events is True
|
||||
|
||||
|
||||
@pytest.mark.vcr()
|
||||
def test_llm_emits_call_started_event():
|
||||
started_events: list[LLMCallStartedEvent] = []
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""CrewAI development tools."""
|
||||
|
||||
__version__ = "1.14.7a3"
|
||||
__version__ = "1.14.7"
|
||||
|
||||
Reference in New Issue
Block a user