Compare commits

...

26 Commits

Author SHA1 Message Date
Vinicius Brasil
e22732814e Change lib/crewai/tests/test_flow_from_definition.py 2026-06-11 11:34:52 -07:00
Vinicius Brasil
ab57853fc6 Add event scoping to flow test 2026-06-11 11:30:33 -07:00
Vinicius Brasil
39962a9c16 Fix conversational start router missing required do field
FlowMethodDefinition.do became required when the handler string was
replaced with FlowActionDefinition, but _conversation_start_router still
built its fragment without it, breaking crewai import entirely.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 10:25:17 -07:00
Vinicius Brasil
837ae2bf7f Replace handler string with structured FlowActionDefinition
`handler: str | None` was optional and opaque — missing handlers only
surfaced at kickoff time. `do: FlowActionDefinition` is required, so
Pydantic rejects invalid definitions at parse time.

The `call: "code"` discriminator prepares the schema for future
non-Python action types (e.g. MCP tool, crew) without touching
`FlowMethodDefinition`. Resolution logic is extracted to
`runtime/_action_resolvers.py` to keep the dispatch point isolated.
2026-06-11 10:18:46 -07:00
Vinicius Brasil
d029a5cd92 Build flow state from FlowDefinition
Definition-driven flows previously always started with a bare dict
state.
2026-06-11 10:18:46 -07:00
Vinicius Brasil
2652caea2d Add Flow.from_definition to run flows without a subclass
A FlowDefinition (e.g. loaded from YAML) was only usable for dispatch on
decorator-authored subclasses. Now each method definition records an
importable `module:qualname` handler ref, and `Flow.from_definition`
resolves and binds those handlers to build a runnable flow directly.
2026-06-11 10:18:46 -07:00
Vinicius Brasil
5c18f3335b Read flow dispatch from FlowDefinition
Store the definition in a `_definition` PrivateAttr at post-init and
convert the dispatch helpers (`_start_method_names`, `_listener_methods`,
`_start_condition`, `_listen_condition`, `_is_router`) from classmethods
to instance methods that read it. Event names now fall back to
`self._definition.name` instead of `self.__class__.__name__`.

Behavior is identical for decorator subclasses, but the engine no longer
assumes the definition comes from the class. This is the seam for
`Flow.from_definition`, where an instance runs a definition that was
loaded rather than built from a Python subclass.
2026-06-11 10:18:46 -07:00
Greyson LaLonde
21fa8e32d9 docs: update changelog and version for v1.14.7
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Check Documentation Broken Links / Check broken links (push) Waiting to run
Vulnerability Scan / pip-audit (push) Waiting to run
2026-06-11 10:13:40 -07:00
Greyson LaLonde
f18c03cd8f feat: bump versions to 1.14.7 2026-06-11 10:06:07 -07:00
Greyson LaLonde
50b9c02272 fix(checkpoint): rebuild custom BaseLLM as concrete LLM on restore
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
A custom BaseLLM subclass serializes with the inherited llm_type "base",
which the registry maps to the abstract BaseLLM. Restore then crashed on
cls(**value). Rebuild a concrete LLM from the saved config when the
resolved class is abstract.
2026-06-10 22:21:35 -07:00
Greyson LaLonde
c55334be5f docs: update changelog and version for v1.14.7rc2 2026-06-10 20:52:56 -07:00
Greyson LaLonde
05a2ba9ca4 feat: bump versions to 1.14.7rc2 2026-06-10 20:45:29 -07:00
Greyson LaLonde
fbafe1f0d3 fix(flow): gate restore on a flag so live snapshots don't replay as resume
Checkpoint serialization stamps checkpoint_completed_methods onto every live
Flow in RuntimeState.root, including the agent executor reused across a crew's
tasks. kickoff_async read that stamp as a restore signal, so the second task
replayed the first task's completed methods and never reached a final answer.

Gate is_restoring on _restored_from_checkpoint, set only by
_restore_from_checkpoint, and consume it single-shot.
2026-06-10 20:40:08 -07:00
Greyson LaLonde
5267c059f5 test(flow): pass show=False in test_flow_plotting to not open a browser
flow.plot defaults to show=True, which calls webbrowser.open on every run.
The test only asserts FlowPlotEvent is emitted, so disable the browser open.
2026-06-10 20:36:14 -07:00
Greyson LaLonde
243c9edc1c docs: update changelog and version for v1.14.7rc1
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
2026-06-10 18:56:52 -07:00
Greyson LaLonde
68910b70c0 feat: bump versions to 1.14.7rc1 2026-06-10 18:50:54 -07:00
Greyson LaLonde
299782765c ci: ignore GHSA-rrmf-rvhw-rf47 (torch alias of PYSEC-2025-194)
* ci: ignore GHSA-rrmf-rvhw-rf47 (torch alias of PYSEC-2025-194)

pip-audit reports CVE-2025-3000 under its GHSA id, which the existing
PYSEC-2025-194 ignore does not match. Same advisory: memory corruption
in torch.jit.script, CVSS 1.9, local-only, no fix for torch 2.11.0.

* ci: sync GHSA-rrmf-rvhw-rf47 ignore into pre-commit pip-audit
2026-06-10 18:45:42 -07:00
Greyson LaLonde
a1f44eb272 fix(events): scope runtime state per run to bound growth and isolate concurrent runs 2026-06-10 18:39:05 -07:00
Lorenze Jay
036b032ab6 handle supporting both custom prompts (#6108)
* handle supporting both custom prompts

* handle translations

* handle deprecation warnings better
2026-06-10 17:52:53 -07:00
Lorenze Jay
f88ae54f96 fix telemetry setup on crewai-login (#6106)
* fix telemetry setup on crewai-login

* type check fix
2026-06-10 17:03:25 -07:00
Lorenze Jay
b6e5d632c1 improve convo routing cycle with one less route (#6102)
* improve one less route

* flows in flows, new agent executor causing early trace batch finalization

* addressing comments

* addressing comments pt2

* lint and typecheck fix
2026-06-10 16:49:16 -07:00
Greyson LaLonde
0d971e5bc5 feat(events): add reset_runtime_state to release accumulated bus state 2026-06-10 16:12:28 -07:00
Lucas Gomide
b3f175b56f docs: update otel images (#6103)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
2026-06-10 14:34:30 -04:00
Lucas Gomide
f523a7d029 docs: udpate docs to reflect new state of OpenTelemetry collector (#6100)
* docs: udpate docs to reflect new state of OpenTelemetry collector

* docs: add OTel collector and Datadog screenshots

These images are referenced by the capture_telemetry_logs guides but were
missing from the tree, which broke the link checker across all locales.

* docs: address PR review on OTel collector guide

- Clarify that OpenTelemetry Traces and Logs are separate integrations
  sharing the same fields (resolves Traces/Logs wording inconsistency)
- List regional Datadog OTLP hosts (US1/US3/US5/EU1/AP1) so users outside
  US5 can copy the right domain
2026-06-10 14:26:35 -04:00
Lorenze Jay
f214ff4b7b decouple convo logic from runtime and added a conversational_definition (#6091)
* decouple convo logic from runtime and added a conversational_definition

* type check fix

* always defer traces for convo and so fix tests to reflect that
2026-06-10 10:49:39 -07:00
Vini Brasil
a9e7c3a44f Simplify flow condition evaluation to be stateless per event (#6097)
Re-evaluate the whole `@listen`/`@router` condition tree against the set
of events seen so far, instead of tracking which AND sub-branches remain
pending.

Net effect:
* Fixes a regression where `or_()` short-circuited at the first
  satisfied branch, leaving a sibling `and_()` half-complete so a later
  trigger could spuriously re-fire the listener
* Removes the fragile per-branch pending state and `id()`-based keys
* Shrinks the evaluator to one readable predicate
2026-06-10 10:35:25 -07:00
63 changed files with 4966 additions and 510 deletions

View File

@@ -64,6 +64,7 @@ jobs:
--ignore-vuln PYSEC-2025-197 \
--ignore-vuln PYSEC-2025-210 \
--ignore-vuln PYSEC-2026-139 \
--ignore-vuln GHSA-rrmf-rvhw-rf47 \
--ignore-vuln PYSEC-2025-211 \
--ignore-vuln PYSEC-2025-212 \
--ignore-vuln PYSEC-2025-213 \
@@ -81,6 +82,7 @@ jobs:
# PYSEC-2025-183 - pyjwt 2.12.1: disputed weak-encryption claim; key length is application-chosen
# PYSEC-2025-189..197 - torch 2.11.0: memory-corruption/DoS in functions only reachable via untrusted models; no fix available
# PYSEC-2025-210, PYSEC-2026-139 - torch 2.11.0: profiler/deserialization issues; no fix available
# GHSA-rrmf-rvhw-rf47 - torch 2.11.0 (CVE-2025-3000, alias of PYSEC-2025-194): memory corruption in torch.jit.script, CVSS 1.9, local-only; affected <=2.12.0, no fix available. pip-audit reports it under the GHSA id so the PYSEC ignore above does not catch it.
# PYSEC-2025-211..218 - transformers 5.5.4: deserialization/code injection via malicious model checkpoints; no fix available
# GHSA-f4j7-r4q5-qw2c - chromadb 1.1.1 (CVE-2026-45829): pre-auth RCE via /api/v2/tenants/{tenant}/databases/{db}/collections when trust_remote_code=true.
# Advisory: vulnerable >=1.0.0,<=1.5.9, firstPatchedVersion=none. We only use chromadb.PersistentClient (lib/crewai/src/crewai/rag/chromadb/factory.py)

View File

@@ -47,6 +47,7 @@ repos:
--ignore-vuln PYSEC-2025-197
--ignore-vuln PYSEC-2025-210
--ignore-vuln PYSEC-2026-139
--ignore-vuln GHSA-rrmf-rvhw-rf47
--ignore-vuln PYSEC-2025-211
--ignore-vuln PYSEC-2025-212
--ignore-vuln PYSEC-2025-213

View File

@@ -4,6 +4,106 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="11 يونيو 2026">
## v1.14.7
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
## ما الذي تغير
### الميزات
- إضافة واجهات خلفية افتراضية قابلة للتوصيل للذاكرة، والمعرفة، وrag، وflow.
- عرض السبب الحقيقي للإنهاء، ومعلمات العينة، وresponse.id في أحداث LLM.
- تصنيف مشغلات DSL كزخارف واعية للمسار.
- إضافة واجهة برمجة تطبيقات الدردشة لتدفقات المحادثة.
- جعل واجهة القفل قابلة للتجاوز.
- بناء FlowDefinition من بيانات التعريف الخاصة بـ Flow DSL.
- إضافة مزود LLM من Snowflake Cortex الأصلي.
- إضافة دعم لملفات الوكلاء المدربين من crew.
### إصلاحات الأخطاء
- إصلاح نقطة التحقق لإعادة بناء BaseLLM مخصص كـ LLM ملموس عند الاستعادة.
- تقييد الاستعادة على علامة لمنع اللقطات الحية من إعادة التشغيل كاستئناف.
- تحديد حالة وقت التشغيل لكل تشغيل للحد من النمو وعزل التشغيل المتزامن.
- إصلاح إعدادات التتبع على crewai-login.
- احترام suppress_flow_events لأحداث تنفيذ الطريقة.
- استعادة [project.scripts] في حزمة crewai لتثبيت أداة uv.
- حل مشكلات CVE الخاصة بـ pip-audit لـ aiohttp وdocling وdocling-core.
- إصلاح إدخال الملفات الذي لا يعمل بشكل موثوق.
- إصلاح تاريخ نتائج أدوات Snowflake Claude غير المكتملة.
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.7.
- تحديث وثائق جامع OpenTelemetry.
- تحديث دليل NVIDIA Nemotron LLM.
- إضافة دليل تكامل Databricks.
- إضافة دليل تكامل Snowflake.
### الأداء
- تحسين سرعة استيراد crewai من خلال تحميل مستندات docling بشكل كسول.
### إعادة الهيكلة
- تبسيط تقييم شروط التدفق ليكون بلا حالة لكل حدث.
- فصل منطق المحادثة عن وقت التشغيل وإضافة تعريف المحادثة.
- تقسيم `flow.py` إلى DSL، وتعريف، ووقت تشغيل.
## المساهمون
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
</Update>
<Update label="10 يونيو 2026">
## v1.14.7rc2
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
## ما الذي تغير
### إصلاحات الأخطاء
- استعادة البوابة على علامة لمنع اللقطات الحية من إعادة التشغيل كاستئناف
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.7rc1
## المساهمون
@greysonlalonde
</Update>
<Update label="10 يونيو 2026">
## v1.14.7rc1
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## ما الذي تغير
### الميزات
- إضافة `reset_runtime_state` لإطلاق حالة الحافلة المتراكمة
- التعامل مع دعم كل من الموجهات المخصصة
- فصل منطق المحادثة عن وقت التشغيل وإضافة `conversational_definition`
### إصلاحات الأخطاء
- إصلاح نطاق حالة وقت التشغيل لكل تشغيل للحد من النمو وعزل التشغيلات المتزامنة
- إصلاح إعدادات القياس عن بُعد على `crewai-login`
- إصلاح احترام `suppress_flow_events` لفعاليات تنفيذ الأساليب
### الوثائق
- تحديث صور OpenTelemetry
- تحديث الوثائق لتعكس الحالة الجديدة لجمع بيانات OpenTelemetry
- تحديث سجل التغييرات والإصدار لـ v1.14.7a4
### إعادة الهيكلة
- تبسيط تقييم شرط التدفق ليكون بلا حالة لكل حدث
- تحسين دورة توجيه المحادثة مع تقليل مسار واحد
## المساهمون
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="9 يونيو 2026">
## v1.14.7a4

View File

@@ -24,15 +24,39 @@ mode: "wide"
1. في CrewAI AMP، انتقل إلى **Settings** > **OpenTelemetry Collectors**.
2. انقر على **Add Collector**.
3. اختر نوع التكامل — **OpenTelemetry Traces** أو **OpenTelemetry Logs**.
4. هيّئ الاتصال:
- **Endpoint** — نقطة نهاية OTLP لمجمّعك (مثل `https://otel-collector.example.com:4317`).
- **Service Name** — اسم لتعريف هذه الخدمة في منصة المراقبة.
- **Custom Headers** *(اختياري)* — أضف رؤوس المصادقة أو التوجيه كأزواج مفتاح-قيمة.
- **Certificate** *(اختياري)* — قدم شهادة TLS إذا كان مجمّعك يتطلبها.
5. انقر على **Save**.
3. اختر تكاملاً:
- **OpenTelemetry Traces** و**OpenTelemetry Logs** — صدّر إلى أي مجمّع أو واجهة خلفية متوافقة مع OTLP.
- **Datadog** — أرسل التتبعات مباشرة إلى استقبال OTLP الخاص بـ Datadog، دون الحاجة إلى مجمّع منفصل أو Datadog Agent.
4. هيّئ الاتصال. تعتمد الحقول على التكامل الذي اخترته:
<Frame>![تهيئة مجمّع OpenTelemetry](/images/crewai-otel-collector-config.png)</Frame>
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
إن **OpenTelemetry Traces** و**OpenTelemetry Logs** تكاملان منفصلان يتشاركان نفس الحقول — اختر التكامل المطابق للإشارة التي تريد تصديرها.
- **Endpoint** — نقطة نهاية OTLP لمجمّعك (مثل `https://otel-collector.example.com:4317`).
- **Service Name** — اسم لتعريف هذه الخدمة في منصة المراقبة.
- **Custom Headers** *(اختياري)* — أضف رؤوس المصادقة أو التوجيه كأزواج مفتاح-قيمة.
- **Certificate** *(اختياري)* — قدم شهادة TLS إذا كان مجمّعك يتطلبها.
<Frame>![تهيئة مجمّع OpenTelemetry](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — مضيف OTLP لموقع Datadog الخاص بك فقط، دون بروتوكول أو مسار. يقوم CrewAI ببناء نقطة نهاية HTTPS OTLP الكاملة نيابةً عنك. استخدم المضيف المطابق لـ [موقع Datadog](https://docs.datadoghq.com/getting_started/site/) الخاص بك:
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — مفتاح واجهة برمجة تطبيقات Datadog الخاص بك. راجع [كيفية إنشاء واحد](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
يصدّر تكامل Datadog **التتبعات**.
<Frame>![تهيئة مجمّع Datadog](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(اختياري)* انقر على **Test Connection** للتحقق من قدرة CrewAI على الوصول إلى نقطة النهاية باستخدام بيانات الاعتماد التي قدمتها.
6. انقر على **Save**.
<Tip>
يمكنك إضافة مجمّعات متعددة — على سبيل المثال، واحد للتتبعات وآخر للسجلات، أو الإرسال إلى واجهات خلفية مختلفة لأغراض مختلفة.

View File

@@ -161,6 +161,18 @@ crew = Crew(
)
```
<Note>
يُحتفظ بـ `agent.i18n` للتوافق مع الإصدارات السابقة فقط، وقد تم إهماله. لتخصيص المطالبات أثناء التشغيل، مرّر `prompt_file` إلى `Crew`. وللوصول البرمجي المباشر إلى شرائح المطالبات، استخدم أداة i18n مباشرة:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### الخيار 3: تعطيل مطالبات النظام لنماذج o1
```python
agent = Agent(
@@ -208,6 +220,8 @@ agent = Agent(
يدمج CrewAI بعد ذلك تخصيصاتك مع الإعدادات الافتراضية، فلا تحتاج لإعادة تعريف كل مطالبة. إليك الطريقة:
بالنسبة للكود الذي يحتاج إلى قراءة شرائح المطالبات مباشرة، استخدم `crewai.utilities.i18n.get_i18n()` مع ملف المطالبات نفسه بدلًا من قراءة `agent.i18n`.
### مثال: تخصيص أساسي للمطالبات
أنشئ ملف `custom_prompts.json` بالمطالبات التي تريد تعديلها. تأكد من إدراج جميع المطالبات عالية المستوى التي يجب أن يحتويها، وليس فقط تغييراتك:

File diff suppressed because it is too large Load Diff

View File

@@ -4,6 +4,106 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Jun 11, 2026">
## v1.14.7
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
## What's Changed
### Features
- Add pluggable default backends for memory, knowledge, rag, and flow.
- Surface real finish_reason, sampling params, and response.id on LLM events.
- Type DSL triggers as route-aware decorators.
- Add chat API for conversational flows.
- Make locking backend overridable.
- Build FlowDefinition from Flow DSL metadata.
- Add native Snowflake Cortex LLM provider.
- Add crew trained agents file support.
### Bug Fixes
- Fix checkpoint to rebuild custom BaseLLM as concrete LLM on restore.
- Gate restore on a flag to prevent live snapshots from replaying as resume.
- Scope runtime state per run to bound growth and isolate concurrent runs.
- Fix telemetry setup on crewai-login.
- Respect suppress_flow_events for method-execution events.
- Restore [project.scripts] in crewai package for uv tool install.
- Resolve pip-audit CVEs for aiohttp, docling, and docling-core.
- Fix file input not working reliably.
- Fix Snowflake Claude incomplete tool result histories.
### Documentation
- Update changelog and version for v1.14.7.
- Update OpenTelemetry collector documentation.
- Update NVIDIA Nemotron LLM guide.
- Add Databricks integration guide.
- Add Snowflake integration guide.
### Performance
- Improve crewai import speed by lazy-loading docling imports.
### Refactoring
- Simplify flow condition evaluation to be stateless per event.
- Decouple convo logic from runtime and add a conversational_definition.
- Split `flow.py` into DSL, definition, and runtime.
## Contributors
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
</Update>
<Update label="Jun 10, 2026">
## v1.14.7rc2
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
## What's Changed
### Bug Fixes
- Gate restore on a flag to prevent live snapshots from replaying as resume
### Documentation
- Update changelog and version for v1.14.7rc1
## Contributors
@greysonlalonde
</Update>
<Update label="Jun 10, 2026">
## v1.14.7rc1
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## What's Changed
### Features
- Add `reset_runtime_state` to release accumulated bus state
- Handle supporting both custom prompts
- Decouple conversation logic from runtime and add a `conversational_definition`
### Bug Fixes
- Fix scope of runtime state per run to bound growth and isolate concurrent runs
- Fix telemetry setup on `crewai-login`
- Fix respect for `suppress_flow_events` for method-execution events
### Documentation
- Update OpenTelemetry images
- Update documentation to reflect new state of OpenTelemetry collector
- Update changelog and version for v1.14.7a4
### Refactoring
- Simplify flow condition evaluation to be stateless per event
- Improve conversation routing cycle with one less route
## Contributors
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="Jun 09, 2026">
## v1.14.7a4

View File

@@ -24,15 +24,39 @@ Telemetry data follows the [OpenTelemetry GenAI semantic conventions](https://op
1. In CrewAI AMP, go to **Settings** > **OpenTelemetry Collectors**.
2. Click **Add Collector**.
3. Select an integration type — **OpenTelemetry Traces** or **OpenTelemetry Logs**.
4. Configure the connection:
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
- **Service Name** — A name to identify this service in your observability platform.
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
5. Click **Save**.
3. Select an integration:
- **OpenTelemetry Traces** and **OpenTelemetry Logs** — export to any OTLP-compatible collector or backend.
- **Datadog** — send traces straight to Datadog's OTLP intake, no separate collector or Datadog Agent required.
4. Configure the connection. The fields depend on the integration you selected:
<Frame>![OpenTelemetry Collector Configuration](/images/crewai-otel-collector-config.png)</Frame>
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
**OpenTelemetry Traces** and **OpenTelemetry Logs** are separate integrations that share the same fields — pick the one matching the signal you want to export.
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
- **Service Name** — A name to identify this service in your observability platform.
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
<Frame>![OpenTelemetry collector configuration](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — Your Datadog site's OTLP host only, with no protocol or path. CrewAI builds the full HTTPS OTLP endpoint for you. Use the host that matches your [Datadog site](https://docs.datadoghq.com/getting_started/site/):
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — Your Datadog API key. See [how to create one](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
The Datadog integration exports **traces**.
<Frame>![Datadog collector configuration](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(optional)* Click **Test Connection** to verify CrewAI can reach the endpoint with the credentials you provided.
6. Click **Save**.
<Tip>
You can add multiple collectors — for example, one for traces and another for logs, or send to different backends for different purposes.

View File

@@ -161,6 +161,18 @@ crew = Crew(
)
```
<Note>
`agent.i18n` is maintained only for backward compatibility and is deprecated. For runtime prompt customization, pass `prompt_file` to `Crew`. For programmatic access to prompt slices, use the i18n utility directly:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### Option 3: Disable System Prompts for o1 Models
```python
agent = Agent(
@@ -208,6 +220,8 @@ One straightforward approach is to create a JSON file for the prompts you want t
CrewAI then merges your customizations with the defaults, so you don't have to redefine every prompt. Here's how:
For code that needs to read prompt slices directly, use `crewai.utilities.i18n.get_i18n()` with the same prompt file instead of reading `agent.i18n`.
### Example: Basic Prompt Customization
Create a `custom_prompts.json` file with the prompts you want to modify. Ensure you list all top-level prompts it should contain, not just your changes:

Binary file not shown.

After

Width:  |  Height:  |  Size: 455 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 420 KiB

View File

@@ -4,6 +4,106 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 6월 11일">
## v1.14.7
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
## 변경 사항
### 기능
- 메모리, 지식, RAG 및 흐름에 대한 플러그 가능한 기본 백엔드를 추가했습니다.
- LLM 이벤트에서 실제 finish_reason, 샘플링 매개변수 및 response.id를 표시합니다.
- 경로 인식 장식자로서의 타입 DSL 트리거를 설정합니다.
- 대화 흐름을 위한 채팅 API를 추가했습니다.
- 잠금 백엔드를 재정의 가능하도록 만듭니다.
- Flow DSL 메타데이터에서 FlowDefinition을 빌드합니다.
- 네이티브 Snowflake Cortex LLM 공급자를 추가했습니다.
- 훈련된 에이전트 파일 지원을 추가했습니다.
### 버그 수정
- 복원 시 사용자 정의 BaseLLM을 구체적인 LLM으로 재구성하도록 체크포인트를 수정했습니다.
- 라이브 스냅샷이 재개로 재생되지 않도록 플래그를 사용하여 복원을 제한합니다.
- 실행마다 런타임 상태의 범위를 설정하여 성장을 제한하고 동시 실행을 격리합니다.
- crewai-login에서 텔레메트리 설정을 수정했습니다.
- 메서드 실행 이벤트에 대해 suppress_flow_events를 존중합니다.
- uv 도구 설치를 위해 crewai 패키지에서 [project.scripts]를 복원합니다.
- aiohttp, docling 및 docling-core에 대한 pip-audit CVE를 해결합니다.
- 파일 입력이 신뢰할 수 없게 작동하는 문제를 수정했습니다.
- Snowflake Claude의 불완전한 도구 결과 기록을 수정했습니다.
### 문서
- v1.14.7에 대한 변경 로그 및 버전을 업데이트했습니다.
- OpenTelemetry 수집기 문서를 업데이트했습니다.
- NVIDIA Nemotron LLM 가이드를 업데이트했습니다.
- Databricks 통합 가이드를 추가했습니다.
- Snowflake 통합 가이드를 추가했습니다.
### 성능
- docling 가져오기를 지연 로딩하여 crewai 가져오기 속도를 개선했습니다.
### 리팩토링
- 흐름 조건 평가를 이벤트별로 상태 비저장으로 단순화했습니다.
- 대화 논리를 런타임에서 분리하고 conversational_definition을 추가했습니다.
- `flow.py`를 DSL, 정의 및 런타임으로 분리했습니다.
## 기여자
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
</Update>
<Update label="2026년 6월 10일">
## v1.14.7rc2
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
## 변경 사항
### 버그 수정
- 라이브 스냅샷이 재개로 재생되는 것을 방지하기 위한 플래그에서 게이트 복원
### 문서
- v1.14.7rc1에 대한 변경 로그 및 버전 업데이트
## 기여자
@greysonlalonde
</Update>
<Update label="2026년 6월 10일">
## v1.14.7rc1
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## 변경 사항
### 기능
- 누적된 버스 상태를 해제하기 위해 `reset_runtime_state` 추가
- 사용자 정의 프롬프트를 모두 지원하도록 처리
- 대화 논리를 런타임과 분리하고 `conversational_definition` 추가
### 버그 수정
- 실행당 런타임 상태의 범위를 수정하여 성장 제한 및 동시 실행 격리
- `crewai-login`에서 원격 측정 설정 수정
- 메서드 실행 이벤트에 대한 `suppress_flow_events` 존중 수정
### 문서
- OpenTelemetry 이미지 업데이트
- OpenTelemetry 수집기의 새로운 상태를 반영하도록 문서 업데이트
- v1.14.7a4에 대한 변경 로그 및 버전 업데이트
### 리팩토링
- 이벤트당 상태 비저장 방식으로 흐름 조건 평가 단순화
- 경로를 하나 줄여 대화 라우팅 사이클 개선
## 기여자
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="2026년 6월 9일">
## v1.14.7a4

View File

@@ -24,15 +24,39 @@ CrewAI AMP는 배포에서 OpenTelemetry **트레이스**와 **로그**를 자
1. CrewAI AMP에서 **Settings** > **OpenTelemetry Collectors**로 이동합니다.
2. **Add Collector**를 클릭합니다.
3. 통합 유형을 선택합니다 — **OpenTelemetry Traces** 또는 **OpenTelemetry Logs**.
4. 연결을 구성합니다:
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
5. **Save**를 클릭합니다.
3. 통합을 선택합니다:
- **OpenTelemetry Traces** 및 **OpenTelemetry Logs** — OTLP 호환 수집기 또는 백엔드로 내보냅니다.
- **Datadog** — 별도의 수집기나 Datadog Agent 없이 트레이스를 Datadog의 OTLP 인테이크로 직접 전송합니다.
4. 연결을 구성합니다. 필드는 선택한 통합에 따라 달라집니다:
<Frame>![OpenTelemetry 수집기 구성](/images/crewai-otel-collector-config.png)</Frame>
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
**OpenTelemetry Traces**와 **OpenTelemetry Logs**는 동일한 필드를 공유하는 별개의 통합입니다 — 내보내려는 신호에 맞는 것을 선택하세요.
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
<Frame>![OpenTelemetry 수집기 구성](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — Datadog 사이트의 OTLP 호스트만 입력합니다 (프로토콜이나 경로 제외). CrewAI가 전체 HTTPS OTLP 엔드포인트를 자동으로 구성합니다. [Datadog 사이트](https://docs.datadoghq.com/getting_started/site/)에 맞는 호스트를 사용하세요:
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — Datadog API 키입니다. [키 생성 방법](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys)을 참고하세요.
Datadog 통합은 **트레이스**를 내보냅니다.
<Frame>![Datadog 수집기 구성](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(선택 사항)* **Test Connection**을 클릭하여 제공한 자격 증명으로 CrewAI가 엔드포인트에 연결할 수 있는지 확인합니다.
6. **Save**를 클릭합니다.
<Tip>
여러 수집기를 추가할 수 있습니다 — 예를 들어, 트레이스용 하나와 로그용 하나를 추가하거나, 다른 목적을 위해 다른 백엔드로 전송할 수 있습니다.

View File

@@ -161,6 +161,18 @@ crew = Crew(
)
```
<Note>
`agent.i18n`은 이전 버전과의 호환성을 위해서만 유지되며 사용이 중단될 예정입니다. 런타임 프롬프트 커스터마이징에는 `Crew`에 `prompt_file`을 전달하세요. 프롬프트 슬라이스를 코드에서 직접 읽어야 한다면 i18n 유틸리티를 직접 사용하세요:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### 옵션 3: o1 모델에 대한 시스템 프롬프트 비활성화
```python
agent = Agent(
@@ -208,6 +220,8 @@ agent = Agent(
그러면 CrewAI가 기본값과 사용자가 지정한 내용을 병합하므로, 모든 프롬프트를 다시 정의할 필요가 없습니다. 방법은 다음과 같습니다:
프롬프트 슬라이스를 코드에서 직접 읽어야 하는 경우에는 `agent.i18n`을 읽는 대신 동일한 프롬프트 파일로 `crewai.utilities.i18n.get_i18n()`을 사용하세요.
### 예시: 기본 프롬프트 커스터마이징
수정하고 싶은 프롬프트를 포함하는 `custom_prompts.json` 파일을 생성하세요. 변경 사항만이 아니라 포함해야 하는 모든 최상위 프롬프트를 반드시 나열해야 합니다:
@@ -314,4 +328,4 @@ CrewAI에서의 저수준 prompt 커스터마이제이션은 매우 맞춤화되
<Check>
이제 CrewAI에서 고급 prompt 커스터마이징을 위한 기초를 갖추었습니다. 모델별 구조나 도메인별 제약에 맞춰 적용하든, 이러한 저수준 접근 방식은 agent 상호작용을 매우 전문적으로 조정할 수 있게 해줍니다.
</Check>
</Check>

View File

@@ -4,6 +4,106 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="11 jun 2026">
## v1.14.7
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
## O que Mudou
### Recursos
- Adicionar backends padrão plugáveis para memória, conhecimento, rag e fluxo.
- Exibir o verdadeiro finish_reason, parâmetros de amostragem e response.id em eventos LLM.
- Tipar os gatilhos DSL como decoradores cientes de rotas.
- Adicionar API de chat para fluxos de conversa.
- Tornar o backend de bloqueio substituível.
- Construir FlowDefinition a partir de metadados Flow DSL.
- Adicionar provedor nativo Snowflake Cortex LLM.
- Adicionar suporte a arquivos de agentes treinados pela equipe.
### Correções de Bugs
- Corrigir checkpoint para reconstruir BaseLLM personalizado como LLM concreto na restauração.
- Controlar a restauração com uma flag para evitar que snapshots ao vivo sejam reproduzidos como retomar.
- Escopar o estado de execução por execução para limitar o crescimento e isolar execuções concorrentes.
- Corrigir configuração de telemetria no crewai-login.
- Respeitar suppress_flow_events para eventos de execução de método.
- Restaurar [project.scripts] no pacote crewai para instalação da ferramenta uv.
- Resolver CVEs de pip-audit para aiohttp, docling e docling-core.
- Corrigir entrada de arquivo que não estava funcionando de forma confiável.
- Corrigir histórias de resultados de ferramentas incompletas do Snowflake Claude.
### Documentação
- Atualizar changelog e versão para v1.14.7.
- Atualizar documentação do coletor OpenTelemetry.
- Atualizar guia do LLM NVIDIA Nemotron.
- Adicionar guia de integração do Databricks.
- Adicionar guia de integração do Snowflake.
### Desempenho
- Melhorar a velocidade de importação do crewai através do carregamento preguiçoso de imports do docling.
### Refatoração
- Simplificar a avaliação de condições de fluxo para ser sem estado por evento.
- Desacoplar a lógica de conversa da execução e adicionar uma conversational_definition.
- Dividir `flow.py` em DSL, definição e execução.
## Contribuidores
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
</Update>
<Update label="10 jun 2026">
## v1.14.7rc2
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
## O que Mudou
### Correções de Bugs
- Restauração de portão em uma flag para evitar que snapshots ao vivo sejam reproduzidos como retomar
### Documentação
- Atualizar changelog e versão para v1.14.7rc1
## Contributors
@greysonlalonde
</Update>
<Update label="10 jun 2026">
## v1.14.7rc1
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## O que Mudou
### Recursos
- Adicionar `reset_runtime_state` para liberar o estado acumulado do barramento
- Lidar com suporte a ambos os prompts personalizados
- Desacoplar a lógica de conversa do tempo de execução e adicionar uma `conversational_definition`
### Correções de Bugs
- Corrigir o escopo do estado de tempo de execução por execução para limitar o crescimento e isolar execuções concorrentes
- Corrigir a configuração de telemetria em `crewai-login`
- Corrigir o respeito a `suppress_flow_events` para eventos de execução de método
### Documentação
- Atualizar imagens do OpenTelemetry
- Atualizar a documentação para refletir o novo estado do coletor OpenTelemetry
- Atualizar o changelog e a versão para v1.14.7a4
### Refatoração
- Simplificar a avaliação da condição de fluxo para ser sem estado por evento
- Melhorar o ciclo de roteamento de conversas com uma rota a menos
## Contribuidores
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="09 jun 2026">
## v1.14.7a4

View File

@@ -24,15 +24,39 @@ Os dados de telemetria seguem as [convenções semânticas GenAI do OpenTelemetr
1. No CrewAI AMP, vá para **Settings** > **OpenTelemetry Collectors**.
2. Clique em **Add Collector**.
3. Selecione um tipo de integração — **OpenTelemetry Traces** ou **OpenTelemetry Logs**.
4. Configure a conexão:
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
5. Clique em **Save**.
3. Selecione uma integração:
- **OpenTelemetry Traces** e **OpenTelemetry Logs** — exporte para qualquer coletor ou backend compatível com OTLP.
- **Datadog** — envie traces diretamente para a ingestão OTLP do Datadog, sem precisar de um coletor separado ou do Datadog Agent.
4. Configure a conexão. Os campos dependem da integração selecionada:
<Frame>![Configuração do Coletor OpenTelemetry](/images/crewai-otel-collector-config.png)</Frame>
<Tabs>
<Tab title="OpenTelemetry Traces / Logs">
**OpenTelemetry Traces** e **OpenTelemetry Logs** são integrações separadas que compartilham os mesmos campos — escolha a que corresponde ao sinal que você quer exportar.
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
<Frame>![Configuração do coletor OpenTelemetry](/images/crewai-otel-collector-opentelemetry.png)</Frame>
</Tab>
<Tab title="Datadog">
- **Datadog Site Domain** — Apenas o host OTLP do seu site Datadog, sem protocolo ou caminho. O CrewAI monta o endpoint HTTPS OTLP completo para você. Use o host correspondente ao seu [site Datadog](https://docs.datadoghq.com/getting_started/site/):
- `otlp.datadoghq.com` (US1)
- `otlp.us3.datadoghq.com` (US3)
- `otlp.us5.datadoghq.com` (US5)
- `otlp.datadoghq.eu` (EU1)
- `otlp.ap1.datadoghq.com` (AP1)
- **API Key** — Sua chave de API do Datadog. Veja [como criar uma](https://docs.datadoghq.com/account_management/api-app-keys/#api-keys).
A integração com o Datadog exporta **traces**.
<Frame>![Configuração do coletor Datadog](/images/crewai-otel-collector-datadog.png)</Frame>
</Tab>
</Tabs>
5. *(opcional)* Clique em **Test Connection** para verificar se o CrewAI consegue acessar o endpoint com as credenciais fornecidas.
6. Clique em **Save**.
<Tip>
Você pode adicionar múltiplos coletores — por exemplo, um para traces e outro para logs, ou enviar para diferentes backends para diferentes propósitos.

View File

@@ -161,6 +161,18 @@ crew = Crew(
)
```
<Note>
`agent.i18n` é mantido apenas para compatibilidade retroativa e está obsoleto. Para customização de prompts em tempo de execução, passe `prompt_file` para `Crew`. Para acesso programático aos slices de prompt, use diretamente o utilitário de i18n:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### Opção 3: Desativar Prompts de Sistema para Modelos o1
```python
agent = Agent(
@@ -208,6 +220,8 @@ Uma abordagem direta é criar um arquivo JSON para os prompts que deseja sobresc
O CrewAI então mescla suas customizações com os padrões, assim você não precisa redefinir todos os prompts. Veja como:
Para código que precisa ler slices de prompt diretamente, use `crewai.utilities.i18n.get_i18n()` com o mesmo arquivo de prompts em vez de ler `agent.i18n`.
### Exemplo: Customização Básica de Prompt
Crie um arquivo `custom_prompts.json` com os prompts que deseja modificar. Certifique-se de listar todos os prompts de nível superior que ele deve conter, não apenas suas alterações:

View File

@@ -8,7 +8,7 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7a4",
"crewai-core==1.14.7",
"click>=8.1.7,<9",
"pydantic>=2.11.9,<2.13",
"pydantic-settings~=2.10.1",

View File

@@ -1 +1 @@
__version__ = "1.14.7a4"
__version__ = "1.14.7"

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7a4"
"crewai[tools]==1.14.7"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7a4"
"crewai[tools]==1.14.7"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7a4"
"crewai[tools]==1.14.7"
]
[tool.crewai]

View File

@@ -1 +1 @@
__version__ = "1.14.7a4"
__version__ = "1.14.7"

View File

@@ -17,7 +17,7 @@ import contextlib
import logging
import os
import threading
from typing import Any, Final
from typing import Any, ClassVar, Final
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
@@ -27,7 +27,7 @@ from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import Span, Status, StatusCode
from opentelemetry.trace import ProxyTracerProvider, Span, Status, StatusCode
from typing_extensions import Self
@@ -72,8 +72,8 @@ class Telemetry:
and event-bus signal handlers (see ``crewai.telemetry.telemetry``).
"""
_instance = None
_lock = threading.Lock()
_instance: ClassVar[Self | None] = None
_lock: ClassVar[threading.Lock] = threading.Lock()
def __new__(cls) -> Self:
if cls._instance is None:
@@ -149,6 +149,10 @@ class Telemetry:
if self.ready and not self.trace_set:
try:
with suppress_warnings():
existing_provider = trace.get_tracer_provider()
if not isinstance(existing_provider, ProxyTracerProvider):
self.trace_set = True
return
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception as e:

View File

@@ -14,6 +14,7 @@ from crewai_core import (
version,
)
import pytest
from opentelemetry.sdk.trace import TracerProvider
def test_version_returns_string() -> None:
@@ -94,3 +95,36 @@ def test_user_data_decline_blocks(
def test_unused_var_warning_silenced() -> None:
# Touch os to keep the import (used by env-var fixtures above)
assert os.environ is not None
def test_core_telemetry_skips_duplicate_tracer_provider(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from crewai_core.telemetry import Telemetry
Telemetry._instance = None
monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False)
monkeypatch.delenv("CREWAI_DISABLE_TELEMETRY", raising=False)
monkeypatch.delenv("CREWAI_DISABLE_TRACKING", raising=False)
monkeypatch.setattr(
"crewai_core.telemetry.trace.get_tracer_provider",
lambda: TracerProvider(),
)
called = False
def fail_if_called(provider: object) -> None:
nonlocal called
called = True
monkeypatch.setattr(
"crewai_core.telemetry.trace.set_tracer_provider",
fail_if_called,
)
telemetry = Telemetry()
telemetry.set_tracer()
assert called is False
assert telemetry.trace_set is True

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.7a4"
__version__ = "1.14.7"

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
"pytube~=15.0.0",
"requests>=2.33.0,<3",
"crewai==1.14.7a4",
"crewai==1.14.7",
"tiktoken>=0.8.0,<0.13",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -330,4 +330,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.14.7a4"
__version__ = "1.14.7"

View File

@@ -8,8 +8,8 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7a4",
"crewai-cli==1.14.7a4",
"crewai-core==1.14.7",
"crewai-cli==1.14.7",
# Core Dependencies
"pydantic>=2.11.9,<2.13",
"openai>=2.30.0,<3",
@@ -54,7 +54,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.14.7a4",
"crewai-tools==1.14.7",
]
embeddings = [
"tiktoken>=0.8.0,<0.13"

View File

@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.14.7a4"
__version__ = "1.14.7"
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
"Memory": ("crewai.memory.unified_memory", "Memory"),

View File

@@ -46,6 +46,7 @@ from crewai.state.checkpoint_config import CheckpointConfig, _coerce_checkpoint
from crewai.tools.base_tool import BaseTool, Tool
from crewai.types.callback import SerializableCallable
from crewai.utilities.config import process_config
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.logger import Logger
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.string_utils import interpolate_only
@@ -81,6 +82,7 @@ _LLM_TYPE_REGISTRY: dict[str, str] = {
def _validate_llm_ref(value: Any) -> Any:
if isinstance(value, dict):
import importlib
import inspect
llm_type = value.get("llm_type")
if not llm_type or llm_type not in _LLM_TYPE_REGISTRY:
@@ -91,6 +93,12 @@ def _validate_llm_ref(value: Any) -> Any:
dotted = _LLM_TYPE_REGISTRY[llm_type]
mod_path, cls_name = dotted.rsplit(".", 1)
cls = getattr(importlib.import_module(mod_path), cls_name)
if inspect.isabstract(cls):
from crewai.llm import LLM
return LLM(
**{k: v for k, v in value.items() if v is not None and k != "llm_type"}
)
return cls(**value)
return value
@@ -186,6 +194,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
tools (list[Any] | None): Tools at the agent's disposal.
max_iter (int): Maximum iterations for an agent to execute a task.
agent_executor: An instance of the CrewAgentExecutor class.
i18n (I18N): Internationalization settings.
llm (Any): Language model that will run the agent.
crew (Any): Crew to which the agent belongs.
@@ -265,6 +274,14 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
_serialize_executor_ref, return_type=dict | None, when_used="json"
),
] = Field(default=None, description="An instance of the CrewAgentExecutor class.")
i18n: I18N = Field(
default_factory=get_i18n,
description="Internationalization settings.",
deprecated=(
"Agent.i18n is deprecated and will be removed in a future release. "
"Use crewai.utilities.i18n.get_i18n() or Crew(prompt_file=...) instead."
),
)
llm: Annotated[
str | BaseLLM | None,

View File

@@ -117,8 +117,10 @@ def capture_execution_context(
)
def apply_execution_context(ctx: ExecutionContext) -> None:
def apply_execution_context(ctx: ExecutionContext | dict[str, Any]) -> None:
"""Write an ExecutionContext back into the ContextVars."""
if isinstance(ctx, dict):
ctx = ExecutionContext.model_validate(ctx)
_current_task_id.set(ctx.current_task_id)
current_flow_request_id.set(ctx.flow_request_id)
current_flow_id.set(ctx.flow_id)

View File

@@ -1013,6 +1013,7 @@ class Crew(FlowTrackable, BaseModel):
)
token = attach(baggage_ctx)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
inputs = prepare_kickoff(self, inputs, input_files)
@@ -1048,6 +1049,7 @@ class Crew(FlowTrackable, BaseModel):
self._memory.drain_writes()
clear_files(self.id)
detach(token)
crewai_event_bus._exit_runtime_scope(runtime_scope)
def _post_kickoff(self, result: CrewOutput) -> CrewOutput:
return result
@@ -1223,6 +1225,7 @@ class Crew(FlowTrackable, BaseModel):
)
token = attach(baggage_ctx)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
inputs = prepare_kickoff(self, inputs, input_files)
@@ -1256,6 +1259,7 @@ class Crew(FlowTrackable, BaseModel):
finally:
clear_files(self.id)
detach(token)
crewai_event_bus._exit_runtime_scope(runtime_scope)
async def akickoff_for_each(
self,

View File

@@ -80,6 +80,17 @@ def is_replaying() -> bool:
return _replaying.get()
_runtime_state_var: contextvars.ContextVar[RuntimeState | None] = (
contextvars.ContextVar("crewai_runtime_state", default=None)
)
_registered_entity_ids_var: contextvars.ContextVar[set[int] | None] = (
contextvars.ContextVar("crewai_registered_entity_ids", default=None)
)
_runtime_scope_depth: contextvars.ContextVar[int] = contextvars.ContextVar(
"crewai_runtime_scope_depth", default=0
)
class CrewAIEventsBus:
"""Singleton event bus for handling events in CrewAI.
@@ -116,7 +127,6 @@ class CrewAIEventsBus:
_futures_lock: threading.Lock
_executor_initialized: bool
_has_pending_events: bool
_runtime_state: RuntimeState | None
def __new__(cls) -> Self:
"""Create or return the singleton instance.
@@ -151,8 +161,6 @@ class CrewAIEventsBus:
self._console = ConsoleFormatter()
self._executor_initialized = False
self._has_pending_events = False
self._runtime_state: RuntimeState | None = None
self._registered_entity_ids: set[int] = set()
def _ensure_executor_initialized(self) -> None:
"""Lazily initialize the thread pool executor and event loop.
@@ -281,6 +289,51 @@ class CrewAIEventsBus:
"""The RuntimeState currently attached to the bus, if any."""
return self._runtime_state
@property
def _runtime_state(self) -> RuntimeState | None:
return _runtime_state_var.get()
@_runtime_state.setter
def _runtime_state(self, value: RuntimeState | None) -> None:
_runtime_state_var.set(value)
@property
def _registered_entity_ids(self) -> set[int]:
ids = _registered_entity_ids_var.get()
if ids is None:
ids = set()
_registered_entity_ids_var.set(ids)
return ids
@_registered_entity_ids.setter
def _registered_entity_ids(self, value: set[int]) -> None:
_registered_entity_ids_var.set(value)
def reset_runtime_state(self) -> None:
"""Detach the RuntimeState and clear the entity registry."""
self._runtime_state = None
self._registered_entity_ids = set()
def _enter_runtime_scope(self) -> bool:
depth = _runtime_scope_depth.get()
_runtime_scope_depth.set(depth + 1)
if depth != 0:
return False
if _runtime_state_var.get() is None:
from crewai import RuntimeState
if RuntimeState is not None:
_runtime_state_var.set(RuntimeState(root=[]))
_registered_entity_ids_var.set(set())
return True
def _exit_runtime_scope(self, outermost: bool) -> None:
depth = _runtime_scope_depth.get()
_runtime_scope_depth.set(depth - 1 if depth > 0 else 0)
if outermost:
_runtime_state_var.set(None)
_registered_entity_ids_var.set(None)
def register_entity(self, entity: Any) -> None:
"""Add an entity to the RuntimeState, creating it if needed.
@@ -349,6 +402,7 @@ class CrewAIEventsBus:
source: Any,
event: BaseEvent,
handlers: SyncHandlerSet,
state: RuntimeState | None,
) -> None:
"""Call provided synchronous handlers.
@@ -356,8 +410,8 @@ class CrewAIEventsBus:
source: The emitting object
event: The event instance
handlers: Frozenset of sync handlers to call
state: The RuntimeState captured on the emitting context
"""
state = self._runtime_state
errors: list[tuple[SyncHandler, Exception]] = [
(handler, error)
for handler in handlers
@@ -376,6 +430,7 @@ class CrewAIEventsBus:
source: Any,
event: BaseEvent,
handlers: AsyncHandlerSet,
state: RuntimeState | None,
) -> None:
"""Asynchronously call provided async handlers.
@@ -383,8 +438,8 @@ class CrewAIEventsBus:
source: The object that emitted the event
event: The event instance
handlers: Frozenset of async handlers to call
state: The RuntimeState captured on the emitting context
"""
state = self._runtime_state
async def _call(handler: AsyncHandler) -> Any:
if _get_param_count(handler) >= 3:
@@ -399,7 +454,9 @@ class CrewAIEventsBus:
f"[CrewAIEventsBus] Async handler error in {getattr(handler, '__name__', handler)}: {result}"
)
async def _emit_with_dependencies(self, source: Any, event: BaseEvent) -> None:
async def _emit_with_dependencies(
self, source: Any, event: BaseEvent, state: RuntimeState | None
) -> None:
"""Emit an event with dependency-aware handler execution.
Handlers are grouped into execution levels based on their dependencies.
@@ -450,18 +507,18 @@ class CrewAIEventsBus:
if level_sync:
if event_type is LLMStreamChunkEvent:
self._call_handlers(source, event, level_sync)
self._call_handlers(source, event, level_sync, state)
else:
ctx = contextvars.copy_context()
future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, level_sync
ctx.run, self._call_handlers, source, event, level_sync, state
)
await asyncio.get_running_loop().run_in_executor(
None, future.result
)
if level_async:
await self._acall_handlers(source, event, level_async)
await self._acall_handlers(source, event, level_async, state)
def _register_source(self, source: Any) -> None:
"""Register the source entity in RuntimeState if applicable."""
@@ -556,21 +613,23 @@ class CrewAIEventsBus:
self._ensure_executor_initialized()
self._has_pending_events = True
state = self._runtime_state
if has_dependencies:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies(source, event),
self._emit_with_dependencies(source, event, state),
self._loop,
)
)
if sync_handlers:
if event_type is LLMStreamChunkEvent:
self._call_handlers(source, event, sync_handlers)
self._call_handlers(source, event, sync_handlers, state)
else:
ctx = contextvars.copy_context()
sync_future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, sync_handlers
ctx.run, self._call_handlers, source, event, sync_handlers, state
)
if not async_handlers:
return self._track_future(sync_future)
@@ -578,7 +637,7 @@ class CrewAIEventsBus:
if async_handlers:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers(source, event, async_handlers),
self._acall_handlers(source, event, async_handlers, state),
self._loop,
)
)
@@ -590,21 +649,22 @@ class CrewAIEventsBus:
source: Any,
event: BaseEvent,
handlers: AsyncHandlerSet,
state: RuntimeState | None,
) -> None:
"""Call async handlers with the replaying flag set on the loop thread."""
token = _replaying.set(True)
try:
await self._acall_handlers(source, event, handlers)
await self._acall_handlers(source, event, handlers, state)
finally:
_replaying.reset(token)
async def _emit_with_dependencies_replaying(
self, source: Any, event: BaseEvent
self, source: Any, event: BaseEvent, state: RuntimeState | None
) -> None:
"""Dependency-aware dispatch with the replaying flag set."""
token = _replaying.set(True)
try:
await self._emit_with_dependencies(source, event)
await self._emit_with_dependencies(source, event, state)
finally:
_replaying.reset(token)
@@ -638,12 +698,13 @@ class CrewAIEventsBus:
self._ensure_executor_initialized()
self._has_pending_events = True
state = self._runtime_state
token = _replaying.set(True)
try:
if has_dependencies:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies_replaying(source, event),
self._emit_with_dependencies_replaying(source, event, state),
self._loop,
)
)
@@ -651,7 +712,7 @@ class CrewAIEventsBus:
if sync_handlers:
ctx = contextvars.copy_context()
sync_future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, sync_handlers
ctx.run, self._call_handlers, source, event, sync_handlers, state
)
self._track_future(sync_future)
if not async_handlers:
@@ -659,7 +720,9 @@ class CrewAIEventsBus:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers_replaying(source, event, async_handlers),
self._acall_handlers_replaying(
source, event, async_handlers, state
),
self._loop,
)
)
@@ -727,7 +790,9 @@ class CrewAIEventsBus:
async_handlers = self._async_handlers.get(event_type, frozenset())
if async_handlers:
await self._acall_handlers(source, event, async_handlers)
await self._acall_handlers(
source, event, async_handlers, self._runtime_state
)
def register_handler(
self,

View File

@@ -292,7 +292,7 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None:
self._handle_trace_event("crew_kickoff_completed", source, event)
if self.batch_manager.defer_session_finalization:
if self._should_defer_session_finalization():
return
if self._nested_in_flow_execution():
return
@@ -306,7 +306,7 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None:
self._handle_trace_event("crew_kickoff_failed", source, event)
if self.batch_manager.defer_session_finalization:
if self._should_defer_session_finalization():
return
if self._nested_in_flow_execution():
return
@@ -734,7 +734,7 @@ class TraceCollectionListener(BaseEventListener):
if not self.batch_manager.is_batch_initialized():
return
# Multi-turn flows defer batch finalization to finalize_session_traces().
if self.batch_manager.defer_session_finalization:
if self._should_defer_session_finalization():
return
self.batch_manager.finalize_batch()
@@ -745,6 +745,15 @@ class TraceCollectionListener(BaseEventListener):
return current_flow_id.get() is not None
def _should_defer_session_finalization(self) -> bool:
"""True when the active trace belongs to a deferred flow session."""
from crewai.flow.flow_context import current_flow_defer_trace_finalization
return (
self.batch_manager.defer_session_finalization
or current_flow_defer_trace_finalization.get()
)
def _flow_owns_trace_batch(self) -> bool:
"""True when an in-flight conversational flow already owns the trace batch."""
if self.batch_manager.batch_owner_type == "flow":
@@ -786,7 +795,11 @@ class TraceCollectionListener(BaseEventListener):
(``current_flow_id``) to keep LLM/tool events from falling back to an
implicit crew batch.
"""
from crewai.flow.flow_context import current_flow_id, current_flow_name
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
current_flow_name,
)
flow_id = current_flow_id.get()
if flow_id is None:
@@ -802,6 +815,8 @@ class TraceCollectionListener(BaseEventListener):
}
self.batch_manager.batch_owner_type = "flow"
self.batch_manager.batch_owner_id = flow_id
if current_flow_defer_trace_finalization.get():
self.batch_manager.defer_session_finalization = True
self._initialize_batch(user_context, execution_metadata)
return True

View File

@@ -1,6 +1,6 @@
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, field_serializer
from crewai.events.base_events import BaseEvent
@@ -57,6 +57,10 @@ class MethodExecutionFailedEvent(FlowEvent):
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_serializer("error")
def _serialize_error(self, error: Exception) -> str:
return str(error)
class MethodExecutionPausedEvent(FlowEvent):
"""Event emitted when a flow method is paused waiting for human feedback.

View File

@@ -1,15 +1,17 @@
"""Conversational graph + helpers as a mixin for ``Flow`` (experimental).
"""Conversational graph + helpers as an experimental Flow extension.
The experimental conversational chat surface lives here as a mixin so that
``crewai.flow.runtime`` stays focused on the execution engine. ``Flow``
inherits from ``_ConversationalMixin``; the methods only register on
subclasses that opt in via ``conversational = True`` (enforced by the
``_conversational_only`` marker + ``FlowMeta`` gating in
``crewai.flow.runtime``).
The conversational chat surface remains experimental and may change before the
v2 graduation path. It lives here so ``crewai.flow.runtime`` can stay focused
on the execution engine. ``crewai.flow.flow`` composes this mixin onto the
public ``Flow`` class for backwards compatibility.
The built-in conversational graph only registers for subclasses that opt in
with ``conversational = True``. Static conversational metadata is projected
into ``FlowDefinition.conversational`` via the Python DSL builder.
Import surface:
- :class:`_ConversationalMixin` — internal; ``Flow`` mixes it in. Users
don't import it directly.
- :class:`_ConversationalMixin` — internal; the public ``Flow`` class
composes it in. Users don't import it directly.
- The data types this mixin uses live in
:mod:`crewai.experimental.conversational`.
"""
@@ -20,7 +22,7 @@ from collections.abc import Callable, Mapping, Sequence
from enum import Enum
import json
import logging
from typing import TYPE_CHECKING, Any, ClassVar, Literal, cast
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypeVar, cast
from pydantic import BaseModel, Field, create_model
@@ -44,26 +46,69 @@ from crewai.flow.conversation import (
get_conversation_messages,
receive_user_message as _receive_user_message,
)
from crewai.flow.dsl import listen, router, start
from crewai.flow.dsl import listen, start
from crewai.flow.dsl._utils import _method_action, _set_flow_method_definition
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
from crewai.llms.base_llm import BaseLLM
logger = logging.getLogger(__name__)
class _ConversationalMixin:
"""Built-in conversational graph for ``Flow`` (gated on ``conversational``).
def _iter_condition_labels(condition: Any) -> set[str]:
if isinstance(condition, str):
return {condition}
if isinstance(condition, dict):
labels: set[str] = set()
for value in condition.values():
if isinstance(value, list):
for item in value:
labels.update(_iter_condition_labels(item))
else:
labels.update(_iter_condition_labels(value))
return labels
return set()
Mixed into ``Flow`` so its execution engine (``runtime.py``) stays focused
on running graphs. The methods here only register on subclasses that set
``conversational = True``; non-chat flows see them as inert attributes.
def _conversation_start_router(func: Callable[..., Any]) -> Any:
wrapper = start()(func)
_set_flow_method_definition(
cast(Any, wrapper),
FlowMethodDefinition(do=_method_action(func), start=True, router=True),
)
return wrapper
class _ConversationalMixin:
"""Experimental conversational graph for ``Flow``.
This mixin owns chat behavior and runtime hooks. Non-chat flows see these
methods as inert attributes unless they opt in with ``conversational = True``.
"""
# === EXPERIMENTAL: conversational mode ===
# When ``conversational = True`` on a Flow subclass, this mixin's built-in
# graph registers and ``handle_turn`` / ``chat`` become chat entry points.
conversational: ClassVar[bool] = False
conversational_config: ClassVar[ConversationConfig | None] = None
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
internal_routes: ClassVar[tuple[str, ...]] = ("answer_from_history",)
builtin_route_descriptions: ClassVar[dict[str, str]] = {
"converse": (
"Ordinary chat, follow-ups, summaries, clarifications, and "
"questions answerable from prior conversation history."
),
"end": ("User signals the conversation is finished (goodbye, exit, done)."),
"answer_from_history": (
"Answer directly from prior conversation history without invoking "
"tools, agents, or custom routes."
),
}
# The metaclass + state attributes referenced below live on ``Flow`` —
# this mixin is never instantiated standalone. These type-only
# declarations exist so static analyzers don't flag attribute access.
@@ -71,22 +116,15 @@ class _ConversationalMixin:
# (otherwise mypy flags "Cannot override instance variable with class
# variable" when Flow declares them as ``ClassVar``).
if TYPE_CHECKING:
conversational: ClassVar[bool]
conversational_config: ClassVar[ConversationConfig | None]
builtin_routes: ClassVar[tuple[str, ...]]
internal_routes: ClassVar[tuple[str, ...]]
builtin_route_descriptions: ClassVar[dict[str, str]]
# Registry ClassVars populated by ``FlowMeta`` at class creation.
_listeners: ClassVar[dict[Any, Any]]
# Instance attrs from ``Flow``.
state: Any
name: str | None
_completed_methods: set[Any]
_method_outputs: list[Any]
_pending_and_listeners: dict[Any, Any]
_pending_events: dict[Any, Any]
_method_call_counts: dict[Any, int]
_is_execution_resuming: bool
_conversation_messages: list[LLMMessage]
_pending_user_message: str | dict[str, Any] | None
_pending_intents: Sequence[str] | None
_pending_intent_llm: str | BaseLLM | None
@@ -97,8 +135,8 @@ class _ConversationalMixin:
def _collapse_to_outcome(
self,
feedback: str,
outcomes: tuple[str, ...],
llm: str | BaseLLM | Any,
outcomes: Sequence[str],
llm: str | BaseLLM,
) -> str:
pass
@@ -108,23 +146,24 @@ class _ConversationalMixin:
def kickoff(self, *args: Any, **kwargs: Any) -> Any:
pass
@start()
@_conversational_only
def conversation_start(self) -> str | None:
"""Internal Flow entrypoint that hands the user message to the router.
"""Return the current user message for conversational route selection.
In conversational mode, ``Flow.kickoff_async`` runs all ``@start``
methods sequentially and this one is registered last, so any user
``@start`` methods (e.g. permission loading) have already finished
before the returned value triggers ``route_conversation``.
This remains as a plain overridable helper for compatibility. It is not
registered as a Flow method; ``route_conversation`` is the synthetic
built-in start/router that begins a conversational turn.
"""
state = cast(ConversationState, self.state)
return state.current_user_message
@router(conversation_start)
@_conversation_start_router
@_conversational_only
def route_conversation(self) -> str:
"""Route the current turn to a listener label."""
if "conversation_start" not in {
str(method_name) for method_name in self._completed_methods
}:
self.conversation_start()
state = cast(ConversationState, self.state)
context = self.build_router_context()
previous_intent = state.last_intent
@@ -238,8 +277,8 @@ class _ConversationalMixin:
state = cast(ConversationState, self.state)
sid = session_id or state.id
# Stash the pending turn so ``_apply_pending_conversational_turn``
# picks it up AFTER persist restore.
# Stash the pending turn so the kickoff extension hook picks it up
# after persist restore.
self._pending_user_message = message
self._pending_intents = list(intents) if intents else None
self._pending_intent_llm = intent_llm
@@ -286,7 +325,7 @@ class _ConversationalMixin:
callers can customize prompts or exercise the loop without patching
builtins.
"""
if not getattr(type(self), "conversational", False):
if not self._is_conversational_enabled():
raise ValueError("Flow.chat() is only available on conversational flows")
exit_set = {command.lower() for command in exit_commands}
@@ -491,14 +530,14 @@ class _ConversationalMixin:
**extra: Any,
) -> None:
"""Append a message to conversation history (legacy ChatState path)."""
_append_conversation_message(cast("Flow[Any]", self), role, content, **extra)
_append_conversation_message(cast(Any, self), role, content, **extra)
@property
def conversation_messages(self) -> list[LLMMessage]:
"""Message history from state, coerced to LLM-shaped dicts."""
return [
message_to_llm_dict(message)
for message in get_conversation_messages(cast("Flow[Any]", self))
for message in get_conversation_messages(cast(Any, self))
]
def receive_user_message(
@@ -514,7 +553,7 @@ class _ConversationalMixin:
``state.messages`` and preserve ``last_intent`` across turns.
Non-conversational flows fall through to the legacy helper.
"""
if self.conversational:
if self._is_conversational_enabled():
state = cast(ConversationState, self.state)
state.messages.append(ConversationMessage(role="user", content=text))
self._emit_conversation_message_added(
@@ -535,9 +574,7 @@ class _ConversationalMixin:
return intent
return text
return _receive_user_message(
cast("Flow[Any]", self), text, outcomes=outcomes, llm=llm
)
return _receive_user_message(cast(Any, self), text, outcomes=outcomes, llm=llm)
def classify_intent(
self,
@@ -561,27 +598,104 @@ class _ConversationalMixin:
def _conversation_config(self) -> ConversationConfig | None:
return getattr(type(self), "conversational_config", None)
@property
def _conversation_definition(self) -> Any | None:
return self._conversation_flow_definition().conversational
def _conversation_flow_definition(self) -> Any:
flow_definition = getattr(type(self), "flow_definition", None)
if not callable(flow_definition):
raise AttributeError(
f"{type(self).__name__} does not expose flow_definition()"
)
return flow_definition()
@classmethod
def _conversational_definition(cls) -> Any | None:
flow_definition = getattr(cls, "flow_definition", None)
if not callable(flow_definition):
return None
return flow_definition().conversational
@classmethod
def _is_conversational(cls) -> bool:
definition = cls._conversational_definition()
return bool(definition and definition.enabled)
def _is_conversational_enabled(self) -> bool:
definition = self._conversation_definition
return bool(definition and definition.enabled)
def _initialize_runtime_extension_attrs(self) -> None:
if not isinstance(getattr(self, "_conversation_messages", None), list):
object.__setattr__(self, "_conversation_messages", [])
if not hasattr(self, "_pending_user_message"):
object.__setattr__(self, "_pending_user_message", None)
if not hasattr(self, "_pending_intents"):
object.__setattr__(self, "_pending_intents", None)
if not hasattr(self, "_pending_intent_llm"):
object.__setattr__(self, "_pending_intent_llm", None)
def _create_default_extension_state(self) -> ConversationState | None:
initial_state_t = getattr(self, "_initial_state_t", None)
if type(self)._is_conversational() and (
not hasattr(self, "_initial_state_t")
or isinstance(initial_state_t, TypeVar)
):
return ConversationState()
return None
def _should_apply_pending_kickoff_context(self) -> bool:
return (
type(self)._is_conversational() and self._pending_user_message is not None
)
def _apply_pending_kickoff_context(self) -> None:
self._apply_pending_conversational_turn()
def _order_start_methods_for_kickoff(
self,
start_methods: list[Any],
) -> tuple[list[Any], bool]:
if not type(self)._is_conversational():
return start_methods, False
route_conversation = "route_conversation"
if route_conversation not in {str(method) for method in start_methods}:
return start_methods, False
ordered_starts = [
method for method in start_methods if str(method) != route_conversation
]
ordered_starts.append(
next(
method for method in start_methods if str(method) == route_conversation
)
)
return ordered_starts, True
def _should_defer_trace_finalization(self) -> bool:
"""Whether per-turn ``FlowFinished`` + ``finalize_batch`` should be skipped.
True when either:
- ``flow.defer_trace_finalization`` is set on the instance, OR
- the class-level ``ConversationConfig.defer_trace_finalization``
on a conversational subclass is True.
- the static conversational definition enables deferred finalization.
Either source enables the deferred-session pattern. The caller
eventually invokes ``finalize_session_traces()`` to close the batch.
"""
if getattr(self, "defer_trace_finalization", False):
return True
config = self._conversation_config
return bool(config and config.defer_trace_finalization)
definition = self._conversation_definition
return bool(
definition and definition.enabled and definition.defer_trace_finalization
)
def _reset_turn_execution_state(self) -> None:
"""Clear per-execution tracking so the next turn re-runs the graph."""
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_and_listeners.clear()
self._pending_events.clear()
self._method_call_counts.clear()
self._clear_or_listeners()
self._is_execution_resuming = False
@@ -733,11 +847,12 @@ class _ConversationalMixin:
router_config: RouterConfig | None,
) -> dict[str, str]:
label_to_method: dict[str, str] = {}
for listener_name, condition in self._listeners.items():
if isinstance(condition, tuple):
_, trigger_labels = condition
for trigger_label in trigger_labels:
label_to_method.setdefault(str(trigger_label), str(listener_name))
flow_definition = self._conversation_flow_definition()
for listener_name, method_definition in flow_definition.methods.items():
if method_definition.listen is None or method_definition.router:
continue
for trigger_label in _iter_condition_labels(method_definition.listen):
label_to_method.setdefault(trigger_label, listener_name)
routes = self._effective_routes(router_config)
overrides = (
@@ -788,21 +903,31 @@ class _ConversationalMixin:
def _valid_route_labels(self) -> set[str]:
labels: set[str] = set()
for condition in self._listeners.values():
if isinstance(condition, tuple):
_, methods = condition
labels.update(str(method) for method in methods)
flow_definition = self._conversation_flow_definition()
for method_definition in flow_definition.methods.values():
if method_definition.listen is None or method_definition.router:
continue
labels.update(_iter_condition_labels(method_definition.listen))
return labels
def _effective_routes(self, router_config: RouterConfig | None = None) -> set[str]:
custom_routes = set(router_config.routes or ()) if router_config else set()
definition = self._conversation_definition
builtin_routes = (
tuple(definition.builtin_routes)
if definition is not None
else self.builtin_routes
)
internal_routes = (
tuple(definition.internal_routes)
if definition is not None
else self.internal_routes
)
if not custom_routes:
custom_routes = (
self._valid_route_labels()
- set(self.builtin_routes)
- set(self.internal_routes)
self._valid_route_labels() - set(builtin_routes) - set(internal_routes)
)
return custom_routes | set(self.builtin_routes)
return custom_routes | set(builtin_routes)
def _default_conversation_llm(self) -> Any | None:
config = self._conversation_config
@@ -931,12 +1056,15 @@ class _ConversationalMixin:
trace_listener = TraceCollectionListener()
batch_manager = trace_listener.batch_manager
if batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
batch_manager.finalize_batch()
try:
if batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
batch_manager.finalize_batch()
finally:
batch_manager.defer_session_finalization = False
__all__ = ["_ConversationalMixin"]

View File

@@ -0,0 +1,48 @@
"""Static conversational Flow definition models.
This module is part of the serializable Flow Definition contract. It should
only contain static data shapes. Experimental conversational runtime behavior
continues to live in ``crewai.experimental.conversational_mixin``.
"""
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, Field
class FlowConversationalRouterDefinition(BaseModel):
"""Static conversational router configuration."""
prompt: str | None = None
response_format: Any = None
llm: Any = None
routes: list[str] | None = None
route_descriptions: dict[str, str] | None = None
default_intent: str | None = "converse"
fallback_intent: str | None = "converse"
intent_field: str = "intent"
class FlowConversationalDefinition(BaseModel):
"""Static conversational Flow configuration."""
enabled: bool = False
system_prompt: str | None = None
llm: Any = None
router: FlowConversationalRouterDefinition | None = None
answer_from_history_prompt: str | None = None
default_intents: list[str] | None = None
intent_llm: Any = None
answer_from_history_llm: Any = None
visible_agent_outputs: list[str] | Literal["all"] | None = None
defer_trace_finalization: bool = True
builtin_routes: list[str] = Field(default_factory=lambda: ["converse", "end"])
internal_routes: list[str] = Field(default_factory=lambda: ["answer_from_history"])
__all__ = [
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
]

View File

@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -45,7 +46,11 @@ def listen(condition: FlowTrigger) -> FlowMethodDecorator:
wrapper = ListenMethod(func)
_set_flow_method_definition(
wrapper, FlowMethodDefinition(listen=_to_definition_condition(condition))
wrapper,
FlowMethodDefinition(
do=_method_action(func),
listen=_to_definition_condition(condition),
),
)
return wrapper

View File

@@ -19,6 +19,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -148,6 +149,7 @@ def router(
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
do=_method_action(func),
listen=_to_definition_condition(condition),
router=True,
emit=router_events or None,

View File

@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -53,13 +54,17 @@ def start(
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
wrapper = StartMethod(func)
if condition is not None:
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(start=_to_definition_condition(condition)),
)
else:
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
do=_method_action(func),
start=(
_to_definition_condition(condition)
if condition is not None
else True
),
),
)
return wrapper
return cast(FlowMethodDecorator, decorator)

View File

@@ -8,7 +8,10 @@ from pydantic import BaseModel
from typing_extensions import TypeIs
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowConfigDefinition,
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
FlowDefinition,
FlowDefinitionDiagnostic,
FlowHumanFeedbackDefinition,
@@ -27,6 +30,13 @@ R = TypeVar("R")
logger = logging.getLogger(__name__)
_FLOW_METHOD_DEFINITION_ATTR = "__flow_method_definition__"
_FLOW_METHOD_METADATA_ATTRS = [
"__conversational_only__",
"__flow_method_definition__",
"__flow_persistence_config__",
"__human_feedback_config__",
"_human_feedback_llm",
]
def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]:
@@ -42,6 +52,43 @@ def _should_include_flow_method(flow_class: type, method: Any) -> bool:
return True
def _is_conversational_flow(flow_class: type) -> bool:
return bool(getattr(flow_class, "conversational", False))
def _get_inherited_conversational_method(
flow_class: type,
attr_name: str,
) -> Any | None:
if not _is_conversational_flow(flow_class):
return None
for base in flow_class.__mro__[1:]:
inherited = base.__dict__.get(attr_name)
if inherited is None:
continue
if getattr(inherited, "__conversational_only__", False) and is_flow_method(
inherited
):
return inherited
return None
def _stamp_inherited_conversational_metadata(
method: Any,
inherited: Any,
) -> Any:
for attr in _FLOW_METHOD_METADATA_ATTRS:
if hasattr(inherited, attr):
setattr(method, attr, getattr(inherited, attr))
method.__is_flow_method__ = True
return method
def _method_action(method: Any) -> FlowActionDefinition:
return FlowActionDefinition(ref=f"{method.__module__}:{method.__qualname__}")
def _set_flow_method_definition(
wrapper: FlowMethod[P, R],
definition: FlowMethodDefinition,
@@ -135,6 +182,8 @@ def _build_state_definition(
from pydantic import BaseModel as PydanticBaseModel
state_value = getattr(flow_class, "_initial_state_t", None)
if isinstance(state_value, TypeVar):
state_value = None
initial_state = getattr(flow_class, "initial_state", None)
if initial_state is not None:
state_value = initial_state
@@ -230,6 +279,98 @@ def _build_persistence_definition(
)
def _build_conversational_router_definition(
router_config: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowConversationalRouterDefinition | None:
if router_config is None:
return None
routes = getattr(router_config, "routes", None)
return FlowConversationalRouterDefinition(
prompt=getattr(router_config, "prompt", None),
response_format=_serialize_static_value(
getattr(router_config, "response_format", None),
diagnostics,
f"{path}.response_format",
),
llm=_serialize_static_value(
getattr(router_config, "llm", None), diagnostics, f"{path}.llm"
),
routes=[str(route) for route in routes] if routes is not None else None,
route_descriptions=getattr(router_config, "route_descriptions", None),
default_intent=getattr(router_config, "default_intent", "converse"),
fallback_intent=getattr(router_config, "fallback_intent", "converse"),
intent_field=str(getattr(router_config, "intent_field", "intent")),
)
def _build_conversational_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowConversationalDefinition | None:
if not _is_conversational_flow(flow_class):
return None
config = getattr(flow_class, "conversational_config", None)
builtin_routes = getattr(flow_class, "builtin_routes", ("converse", "end"))
internal_routes = getattr(
flow_class,
"internal_routes",
("answer_from_history",),
)
if config is None:
return FlowConversationalDefinition(
enabled=True,
builtin_routes=[str(route) for route in builtin_routes],
internal_routes=[str(route) for route in internal_routes],
)
default_intents = getattr(config, "default_intents", None)
visible_agent_outputs = getattr(config, "visible_agent_outputs", None)
return FlowConversationalDefinition(
enabled=True,
system_prompt=getattr(config, "system_prompt", None),
llm=_serialize_static_value(
getattr(config, "llm", None), diagnostics, "conversational.llm"
),
router=_build_conversational_router_definition(
getattr(config, "router", None),
diagnostics,
"conversational.router",
),
answer_from_history_prompt=getattr(config, "answer_from_history_prompt", None),
default_intents=(
[str(intent) for intent in default_intents]
if default_intents is not None
else None
),
intent_llm=_serialize_static_value(
getattr(config, "intent_llm", None),
diagnostics,
"conversational.intent_llm",
),
answer_from_history_llm=_serialize_static_value(
getattr(config, "answer_from_history_llm", None),
diagnostics,
"conversational.answer_from_history_llm",
),
visible_agent_outputs=(
"all"
if visible_agent_outputs == "all"
else [str(output) for output in visible_agent_outputs]
if visible_agent_outputs is not None
else None
),
defer_trace_finalization=bool(
getattr(config, "defer_trace_finalization", True)
),
builtin_routes=[str(route) for route in builtin_routes],
internal_routes=[str(route) for route in internal_routes],
)
def _build_method_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],
@@ -237,9 +378,11 @@ def _build_method_definition(
) -> FlowMethodDefinition:
fragment = _get_flow_method_definition(method)
if fragment is None:
method_definition = FlowMethodDefinition()
method_definition = FlowMethodDefinition(do=_method_action(method))
else:
method_definition = fragment.model_copy(deep=True)
method_definition = fragment.model_copy(
deep=True, update={"do": _method_action(method)}
)
human_feedback = _build_human_feedback_definition(
method, diagnostics, f"{path}.human_feedback"
@@ -270,6 +413,29 @@ def _iter_flow_methods(flow_class: type) -> dict[str, Any]:
flow_class, attr_value
):
methods[attr_name] = attr_value
continue
inherited = _get_inherited_conversational_method(flow_class, attr_name)
if inherited is not None and callable(attr_value):
methods[attr_name] = _stamp_inherited_conversational_metadata(
attr_value, inherited
)
if _is_conversational_flow(flow_class):
for base in reversed(flow_class.__mro__[1:]):
for attr_name, raw_value in base.__dict__.items():
if attr_name.startswith("_") or attr_name in methods:
continue
if not getattr(raw_value, "__conversational_only__", False):
continue
try:
attr_value = getattr(flow_class, attr_name)
except AttributeError:
continue
if is_flow_method(attr_value) and _should_include_flow_method(
flow_class, attr_value
):
methods[attr_name] = attr_value
# A wrapped method whose name collides with a base Flow model field
# (e.g. ``checkpoint``) is absorbed by Pydantic as a field; the underlying
@@ -314,6 +480,7 @@ def _build_flow_definition_from_class(
state=_build_state_definition(flow_class, diagnostics),
config=_build_config_definition(flow_class, diagnostics),
persist=_build_persistence_definition(flow_class, diagnostics, "persist"),
conversational=_build_conversational_definition(flow_class, diagnostics),
methods=methods,
diagnostics=diagnostics,
)

View File

@@ -6,15 +6,22 @@ The implementation now lives in three modules, split by concern:
``@router``, ``or_`` / ``and_``) and Python Flow class projection
- ``crewai.flow.flow_definition`` -- the serializable Flow Definition contract
- ``crewai.flow.runtime`` -- the Flow execution engine and state
- ``crewai.experimental.conversational_mixin`` -- experimental conversational
runtime extension composed onto the public ``Flow`` class
Prefer importing from those modules in new code; this module preserves the
historical ``crewai.flow.flow`` import path.
"""
from typing import Any, TypeVar
from pydantic import BaseModel
from crewai.experimental.conversational_mixin import _ConversationalMixin
from crewai.flow.dsl import and_, listen, or_, router, start
from crewai.flow.runtime import (
_INITIAL_STATE_CLASS_MARKER,
Flow,
Flow as RuntimeFlow,
FlowMeta,
FlowState,
LockedDictProxy,
@@ -23,6 +30,13 @@ from crewai.flow.runtime import (
)
T = TypeVar("T", bound=dict[str, Any] | BaseModel)
class Flow(_ConversationalMixin, RuntimeFlow[T]):
"""Public Flow class with experimental conversational extension behavior."""
__all__ = [
"_INITIAL_STATE_CLASS_MARKER",
"Flow",

View File

@@ -15,6 +15,10 @@ current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_id", default=None
)
current_flow_defer_trace_finalization: contextvars.ContextVar[bool] = (
contextvars.ContextVar("flow_defer_trace_finalization", default=False)
)
current_flow_method_name: contextvars.ContextVar[str] = contextvars.ContextVar(
"flow_method_name", default="unknown"
)

View File

@@ -16,13 +16,21 @@ from typing import Any, Literal as TypingLiteral
from pydantic import BaseModel, ConfigDict, Field
import yaml
from crewai.flow.conversational_definition import (
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
)
logger = logging.getLogger(__name__)
FlowDefinitionCondition = str | dict[str, Any]
__all__ = [
"FlowActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
@@ -45,8 +53,9 @@ class FlowDefinitionDiagnostic(BaseModel):
class FlowStateDefinition(BaseModel):
"""Static description of a Flow state contract."""
type: TypingLiteral["dict", "pydantic", "unknown"] = "dict"
type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict"
ref: str | None = None
json_schema: dict[str, Any] | None = None
default: Any = None
@@ -83,9 +92,17 @@ class FlowHumanFeedbackDefinition(BaseModel):
learn_strict: bool = False
class FlowActionDefinition(BaseModel):
"""What a Flow method node executes, independent of when it fires."""
call: TypingLiteral["code"] = "code"
ref: str
class FlowMethodDefinition(BaseModel):
"""Static definition of one Flow method and its execution roles."""
do: FlowActionDefinition
start: bool | FlowDefinitionCondition | None = None
listen: FlowDefinitionCondition | None = None
router: bool = False
@@ -115,6 +132,7 @@ class FlowDefinition(BaseModel):
state: FlowStateDefinition | None = None
config: FlowConfigDefinition = Field(default_factory=FlowConfigDefinition)
persist: FlowPersistenceDefinition | None = None
conversational: FlowConversationalDefinition | None = None
methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict)
diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list)

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
from collections.abc import Callable
import importlib
from operator import attrgetter
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.flow_definition import FlowActionDefinition
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
class InvalidActionRefError(ValueError):
def __init__(self, ref: str) -> None:
super().__init__(f"invalid callable {ref!r}; expected 'module:qualname'")
def _resolve_code_action(
flow: Flow[Any], action: FlowActionDefinition
) -> Callable[..., Any]:
ref = action.ref
module_name, _, qualname = ref.partition(":")
if "<" in ref or not module_name or not qualname:
raise InvalidActionRefError(ref)
try:
target = attrgetter(qualname)(importlib.import_module(module_name))
except (ImportError, AttributeError) as e:
raise InvalidActionRefError(ref) from e
if not callable(target):
raise InvalidActionRefError(ref)
handler = cast(Callable[..., Any], target)
if getattr(handler, "__self__", None) is None:
handler = handler.__get__(flow, type(flow))
return handler
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
"""Turn one `do:` action into the callable the flow runs for that node."""
if action.call == "code":
return _resolve_code_action(flow, action)
raise ValueError(f"unknown call type {action.call!r}")

View File

@@ -16,7 +16,7 @@ R = TypeVar("R", covariant=True)
FlowMethodName = NewType("FlowMethodName", str)
PendingListenerKey = NewType(
"PendingListenerKey",
Annotated[str, "nested flow conditions use 'listener_name:object_id'"],
Annotated[str, "listener method name, or 'start:<method>' for conditional starts"],
)

View File

@@ -259,8 +259,9 @@ class RecallFlow(Flow[RecallState]):
candidates = []
if not candidates:
candidates = [scope_prefix]
self.state.candidate_scopes = candidates[:20]
return self.state.candidate_scopes
selected_scopes = candidates[:20]
self.state.candidate_scopes = selected_scopes
return selected_scopes
@listen(filter_and_chunk)
def search_chunks(self) -> list[Any]:
@@ -368,9 +369,10 @@ class RecallFlow(Flow[RecallState]):
)
)
matches.sort(key=lambda m: m.score, reverse=True)
self.state.final_results = matches[: self.state.limit]
final_results = matches[: self.state.limit]
self.state.final_results = final_results
if self.state.evidence_gaps and self.state.final_results:
self.state.final_results[0].evidence_gaps = list(self.state.evidence_gaps)
return self.state.final_results
return final_results

View File

@@ -30,7 +30,7 @@ from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import Span
from opentelemetry.trace import ProxyTracerProvider, Span
from typing_extensions import Self
from crewai.events.event_bus import crewai_event_bus
@@ -162,6 +162,10 @@ class Telemetry:
if self.ready and not self.trace_set:
try:
with suppress_warnings():
existing_provider = trace.get_tracer_provider()
if not isinstance(existing_provider, ProxyTracerProvider):
self.trace_set = True
return
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception as e:

View File

@@ -999,7 +999,11 @@ def _json_schema_to_pydantic_field(
if examples:
schema_extra["examples"] = examples
default = ... if is_required else None
default = (
json_schema["default"]
if "default" in json_schema
else (... if is_required else None)
)
if isinstance(type_, type) and issubclass(type_, (int, float)):
if "minimum" in json_schema:

View File

@@ -4,6 +4,7 @@ import os
import threading
from unittest import mock
from unittest.mock import MagicMock, patch
import warnings
from crewai.agents.crew_agent_executor import AgentFinish, CrewAgentExecutor
from crewai.constants import DEFAULT_LLM_MODEL
@@ -77,6 +78,51 @@ def test_agent_creation():
assert agent.backstory == "test backstory"
def test_agent_exposes_i18n_for_backward_compatibility():
from crewai.utilities.i18n import I18N_DEFAULT
agent = Agent(role="test role", goal="test goal", backstory="test backstory")
with pytest.warns(DeprecationWarning, match="Agent.i18n is deprecated"):
i18n = agent.i18n
assert i18n is I18N_DEFAULT
assert isinstance(i18n.slice("role_playing"), str)
def test_agent_accepts_custom_i18n():
from crewai.utilities.i18n import I18N
prompt_file = os.path.join(
os.path.dirname(__file__), "..", "utilities", "prompts.json"
)
i18n = I18N(prompt_file=prompt_file)
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
i18n=i18n,
)
with pytest.warns(DeprecationWarning, match="Agent.i18n is deprecated"):
agent_i18n = agent.i18n
assert agent_i18n is i18n
assert agent_i18n.slice("role_playing") == "Lorem ipsum dolor sit amet"
def test_agent_copy_does_not_emit_i18n_deprecation_warning():
agent = Agent(role="test role", goal="test goal", backstory="test backstory")
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter("always", DeprecationWarning)
agent.copy()
assert not any(
"Agent.i18n is deprecated" in str(w.message) for w in caught_warnings
)
def test_agent_with_only_system_template():
"""Test that an agent with only system_template works without errors."""
agent = Agent(

View File

@@ -32,7 +32,7 @@ def _build_executor(**kwargs: Any) -> AgentExecutor:
executor._method_outputs = []
executor._completed_methods = set()
executor._fired_or_listeners = set()
executor._pending_and_listeners = {}
executor._pending_events = {}
executor._method_execution_counts = {}
executor._method_call_counts = {}
executor._event_futures = []

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import threading
from typing import Any
from unittest.mock import patch
@@ -109,10 +110,79 @@ class TestCheckpointListenerOptsOut:
assert do_cp.call_count == 0
class TestFlowResumeReplaysEvents:
"""End-to-end: a resumed flow emits MethodExecution* events for completed methods."""
class TestCheckpointResumeReplaysEvents:
"""A flow resumed from a checkpoint replays MethodExecution* events for
completed methods and executes the pending ones. The checkpoint persists
the event record, which is reloaded into the per-run runtime state.
def test_resume_dispatches_completed_method_events(self, tmp_path) -> None:
``step_c`` is gated on a threading.Event so the flow is frozen with exactly
``step_a`` and ``step_b`` completed when the checkpoint is written — the
mid-run snapshot is deterministic rather than dependent on write timing.
"""
def test_resume_replays_completed_and_executes_pending(self, tmp_path) -> None:
from crewai.flow.flow import Flow, listen, start
from crewai.state.checkpoint_config import CheckpointConfig
at_step_c = threading.Event()
release = threading.Event()
captured: list[Any] = []
class ThreeStepFlow(Flow[dict]):
@start()
def step_a(self) -> str:
return "a"
@listen(step_a)
def step_b(self) -> str:
return "b"
@listen(step_b)
def step_c(self) -> str:
captured.append(crewai_event_bus.runtime_state)
at_step_c.set()
release.wait(timeout=10)
return "c"
runner = threading.Thread(target=ThreeStepFlow().kickoff)
runner.start()
try:
assert at_step_c.wait(timeout=10)
location = captured[0].checkpoint(str(tmp_path / "cp"))
finally:
release.set()
runner.join(timeout=10)
captured_started: list[str] = []
captured_finished: list[str] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def _cs(_: Any, event: MethodExecutionStartedEvent) -> None:
captured_started.append(event.method_name)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def _cf(_: Any, event: MethodExecutionFinishedEvent) -> None:
captured_finished.append(event.method_name)
ThreeStepFlow().kickoff(
from_checkpoint=CheckpointConfig(restore_from=location)
)
assert captured_started == ["step_a", "step_b", "step_c"]
assert captured_finished == ["step_a", "step_b", "step_c"]
class TestPersistResumeDoesNotReplayCompletedEvents:
"""A @persist resume continues from pending methods only.
@persist stores flow state, not the event record, so completed-method
events have no persisted source to replay from. Runtime state is scoped
per run, so flow1's events are not visible to flow2.
"""
def test_persist_resume_executes_only_pending_methods(self, tmp_path) -> None:
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
@@ -132,9 +202,6 @@ class TestFlowResumeReplaysEvents:
def step_c(self) -> str:
return "c"
if crewai_event_bus.runtime_state is not None:
crewai_event_bus.runtime_state.event_record.clear()
flow1 = ThreeStepFlow(persistence=persistence)
flow1.kickoff()
flow_id = flow1.state["id"]
@@ -157,9 +224,5 @@ class TestFlowResumeReplaysEvents:
flow2.kickoff(inputs={"id": flow_id})
assert captured_started.count("step_a") == 1
assert captured_started.count("step_b") == 1
assert captured_started.count("step_c") == 1
assert captured_finished.count("step_a") == 1
assert captured_finished.count("step_b") == 1
assert captured_finished.count("step_c") == 1
assert captured_started == ["step_c"]
assert captured_finished == ["step_c"]

View File

@@ -6,6 +6,7 @@ import pytest
from crewai import Agent, Crew, Task
from crewai.telemetry import Telemetry
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
@pytest.fixture(autouse=True)
@@ -53,6 +54,23 @@ def test_telemetry_enabled_by_default():
assert telemetry.ready is True
def test_set_tracer_skips_when_provider_already_configured():
"""A second telemetry instance must not re-install the global provider."""
with (
patch.dict(os.environ, {}, clear=True),
patch(
"crewai.telemetry.telemetry.trace.get_tracer_provider",
return_value=TracerProvider(),
),
patch("crewai.telemetry.telemetry.trace.set_tracer_provider") as mock_set,
):
telemetry = Telemetry()
telemetry.set_tracer()
mock_set.assert_not_called()
assert telemetry.trace_set is True
@patch("crewai.telemetry.telemetry.logger.error")
@patch(
"opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export",

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import inspect
import json
import os
import sqlite3
@@ -16,6 +17,7 @@ from pydantic import BaseModel
from crewai.agent.core import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.crew import Crew
from crewai.llms.base_llm import BaseLLM
from crewai.flow.flow import _INITIAL_STATE_CLASS_MARKER, Flow, start
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.state.checkpoint_listener import (
@@ -682,3 +684,85 @@ class TestAgentCheckpoint:
cfg = CheckpointConfig(restore_from=loc)
restored = Agent.from_checkpoint(cfg)
assert restored._kickoff_event_id == "evt-456"
class _FinalAnswerLLM(BaseLLM):
"""Stub LLM that always returns a final answer without any API calls."""
def __init__(self) -> None:
super().__init__(model="stub")
def call(
self,
messages,
tools=None,
callbacks=None,
available_functions=None,
from_task=None,
from_agent=None,
response_model=None,
):
return "Final Answer: done."
def supports_function_calling(self) -> bool:
return False
def supports_stop_words(self) -> bool:
return False
def get_context_window_size(self) -> int:
return 4096
async def acall(self, *args, **kwargs):
raise NotImplementedError
class TestCheckpointReusedExecutor:
"""Checkpoint serialization stamps every live Flow's completed methods.
The agent executor is a Flow reused across a crew's tasks, so the stamp
must not be read back as a restore signal on the next task — otherwise the
second task replays as a resume and never reaches a final answer.
"""
def test_second_task_runs_with_checkpointing_enabled(self) -> None:
agent = Agent(role="r", goal="g", backstory="b", llm=_FinalAnswerLLM())
task1 = Task(description="first", expected_output="x", agent=agent)
task2 = Task(description="second", expected_output="y", agent=agent)
with tempfile.TemporaryDirectory() as d:
crew = Crew(
agents=[agent],
tasks=[task1, task2],
verbose=False,
checkpoint=CheckpointConfig(
provider=JsonProvider(location=d),
on_events=["task_started", "task_completed"],
),
)
result = crew.kickoff()
assert len(result.tasks_output) == 2
assert result.tasks_output[1].raw
class TestCustomLLMCheckpointRestore:
"""A custom BaseLLM subclass serializes with the inherited llm_type "base".
Restoring it must not try to instantiate the abstract BaseLLM; it is rebuilt
as a concrete LLM from the saved config instead.
"""
def test_restore_does_not_instantiate_abstract_base_llm(self) -> None:
agent = Agent(role="r", goal="g", backstory="b", llm=_FinalAnswerLLM())
task = Task(description="d", expected_output="e", agent=agent)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
raw = RuntimeState(root=[crew]).model_dump_json()
restored = RuntimeState.model_validate_json(
raw, context={"from_checkpoint": True}
)
llm = restored.root[0].agents[0].llm
assert isinstance(llm, BaseLLM)
assert not inspect.isabstract(type(llm))
assert llm.model == "stub"

View File

@@ -409,4 +409,31 @@ class TestRuntimeStateIntegration:
old_json, context={"from_checkpoint": True}
)
assert len(restored.root) == 1
assert len(restored.event_record) == 0
assert len(restored.event_record) == 0
def test_reset_runtime_state_clears_state_and_registry(self):
from crewai import Agent, Crew, RuntimeState
from crewai.events.event_bus import crewai_event_bus
if RuntimeState is None:
pytest.skip("RuntimeState unavailable (model_rebuild failed)")
agent = Agent(role="test", goal="test", backstory="test", llm="gpt-4o-mini")
crew = Crew(agents=[agent], tasks=[], verbose=False)
previous_state = crewai_event_bus._runtime_state
previous_ids = crewai_event_bus._registered_entity_ids
crewai_event_bus._runtime_state = None
crewai_event_bus._registered_entity_ids = set()
try:
crewai_event_bus.register_entity(crew)
assert crewai_event_bus.runtime_state is not None
assert crewai_event_bus._registered_entity_ids
crewai_event_bus.reset_runtime_state()
assert crewai_event_bus.runtime_state is None
assert crewai_event_bus._registered_entity_ids == set()
finally:
crewai_event_bus._runtime_state = previous_state
crewai_event_bus._registered_entity_ids = previous_ids

View File

@@ -1040,7 +1040,7 @@ def test_flow_plotting():
received_events.append(event)
event_received.set()
flow.plot("test_flow")
flow.plot("test_flow", show=False)
assert event_received.wait(timeout=5), "Timeout waiting for plot event"
assert len(received_events) == 1
@@ -1157,6 +1157,26 @@ def test_flow_name():
assert flow.name == "MyFlow"
def test_flow_custom_name_overrides_class_name_in_events():
class InternalFlowClass(Flow):
name = "PublicName"
@start()
def begin(self):
return "done"
received = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowStartedEvent)
def handle(source, event):
received.append(event)
InternalFlowClass().kickoff()
assert received[0].flow_name == "PublicName"
def test_nested_and_or_conditions():
"""Test nested conditions like or_(and_(A, B), and_(C, D)).
@@ -1542,40 +1562,63 @@ def test_deeply_nested_conditions():
def test_or_branch_does_not_leave_stale_and_state():
"""or_() over nested and_() branches must not leave stale pending AND state.
Regression: evaluating an or_() condition stopped at the first branch that was
satisfied, so a later and_() branch that the *same* trigger would have completed
never cleared its pending state. On the next cycle that trigger alone then
spuriously re-satisfied the whole condition. Both branches share the final
event ``x`` here, so the shared trigger that completes branch ``(a AND x)`` also
completes branch ``(c AND x)`` and both must be cleared together.
"""
fired = []
class StaleStateFlow(Flow):
@start()
def begin(self):
pass
@listen(or_(and_("a", "x"), and_("c", "x")))
def joined(self):
@listen(begin)
def a(self):
pass
flow = StaleStateFlow()
condition = type(flow)._listen_condition("joined")
@listen(begin)
def c(self):
pass
def fires(trigger):
return flow._evaluate_condition(condition, trigger, "joined")
@listen(and_(a, c))
def x(self):
pass
# First cycle: "a" then "c" arrive, then the shared "x" completes (a AND x).
assert fires("a") is False
assert fires("c") is False
assert fires("x") is True
@listen(or_(and_("a", "x"), and_("c", "y")))
def joined(self):
fired.append("joined")
# Next cycle: "x" alone must NOT re-satisfy the condition. The "c" from the
# previous cycle was consumed when "joined" fired, so neither branch is half
# complete and "x" by itself is insufficient.
assert fires("x") is False
@router(joined)
def emit_y(self):
return "y"
StaleStateFlow().kickoff()
assert fired == ["joined"]
def test_and_branch_inside_or_does_not_race():
execution_order = []
class DiamondWithFallbackFlow(Flow):
@start()
def go(self):
execution_order.append("go")
@listen(go)
def a(self):
execution_order.append("a")
@listen(go)
def b(self):
execution_order.append("b")
@listen(or_(and_(a, b), "fallback"))
def done(self):
execution_order.append("done")
DiamondWithFallbackFlow().kickoff()
assert "done" in execution_order
assert execution_order.index("done") > execution_order.index("a")
assert execution_order.index("done") > execution_order.index("b")
def test_mixed_sync_async_execution_order():

View File

@@ -26,7 +26,11 @@ from crewai.experimental import (
RouterConfig,
)
from crewai.flow import Flow, ChatState, listen, start
from crewai.flow.flow_context import current_flow_id, current_flow_name
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
current_flow_name,
)
from crewai.flow.conversation import (
append_message,
get_conversation_messages,
@@ -169,9 +173,6 @@ class TestConversationalFlow:
)
@pytest.mark.skip(
reason="Experimental conversational registry behavior is out of scope for the definition-first start migration."
)
def test_handle_turn_routes_to_listener_and_records_public_result(self) -> None:
@ConversationConfig(default_intents=["research"], intent_llm="gpt-4o-mini")
class ResearchFlow(ConversationalFlow):
@@ -595,18 +596,15 @@ class TestConversationalFlow:
assert result == "legacy-searched"
assert flow.state.last_intent == "search"
@pytest.mark.skip(
reason="Experimental conversational sequential-start behavior is out of scope for the definition-first start migration."
)
def test_user_start_methods_run_sequentially_before_router_in_conversational_mode(
self,
) -> None:
"""Conversational flows: user ``@start`` methods finish before router fires.
Non-chat flows run ``@start`` methods in parallel via ``asyncio.gather``,
which would race with ``conversation_start`` and let the router fire
which would race with ``route_conversation`` and let the router fire
before user setup finished. In conversational mode the framework runs
them sequentially, with ``conversation_start`` last.
them sequentially, with ``route_conversation`` last.
"""
order: list[str] = []
@@ -649,18 +647,10 @@ class TestConversationalFlow:
assert "attach_bus" in order # still fires every turn
assert "route_turn" in order
@pytest.mark.skip(
reason="Experimental inherited conversational start registration is out of scope for the definition-first start migration."
)
def test_subclass_can_override_conversation_start_without_redecorating(
def test_subclass_can_override_conversation_start_helper(
self,
) -> None:
"""Overriding an inherited ``@start`` method must not unregister it.
Before the metaclass fix, subclasses had to re-apply ``@start()`` on
every override or the parent's ``conversation_start`` would silently
drop out of the start registry — leaving the flow with nothing to fire.
"""
"""The compatibility helper remains overridable without adding a Flow node."""
bootstrap_calls: list[str] = []
@@ -681,6 +671,38 @@ class TestConversationalFlow:
flow = BootstrapFlow()
flow.handle_turn("hi")
assert bootstrap_calls == ["ran"]
assert "conversation_start" not in BootstrapFlow.flow_definition().methods
route_definition = BootstrapFlow.flow_definition().methods["route_conversation"]
assert route_definition.start is True
assert route_definition.router is True
assert flow.state.messages[-1].content == "worked"
def test_legacy_decorated_conversation_start_runs_once_per_turn(
self,
) -> None:
"""Legacy ``@start`` overrides are not invoked again by the router."""
bootstrap_calls: list[str] = []
@ConversationConfig()
class BootstrapFlow(ConversationalFlow):
@start()
def conversation_start(self) -> str | None:
bootstrap_calls.append("ran")
return super().conversation_start()
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> str:
self.append_assistant_message("worked")
return "worked"
flow = BootstrapFlow()
flow.handle_turn("hi")
assert bootstrap_calls == ["ran"]
assert flow.state.messages[-1].content == "worked"
@@ -1179,6 +1201,40 @@ class TestConversationalFlow:
"finalize_session_traces must finalize the trace batch once"
)
def test_deferred_resume_skips_per_resume_flow_finished_event(self) -> None:
"""Deferred sessions do not emit terminal events while resuming."""
from crewai.events.types.flow_events import FlowFinishedEvent
from crewai.flow.async_feedback.types import PendingFeedbackContext
class DeferredResumeFlow(Flow[ChatState]):
defer_trace_finalization = True
@start()
def begin(self) -> str:
return "started"
flow = DeferredResumeFlow()
flow._pending_feedback_context = PendingFeedbackContext(
flow_id=flow.flow_id,
flow_class="DeferredResumeFlow",
method_name="begin",
method_output="started",
message="Review",
)
finished_events: list[FlowFinishedEvent] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowFinishedEvent)
def capture(_: Any, event: FlowFinishedEvent) -> None:
finished_events.append(event)
flow.resume("approved")
crewai_event_bus.flush()
assert finished_events == []
def test_finalize_session_traces_restores_event_scope(self, capsys) -> None:
"""No ``empty scope stack`` warning when deferred ``flow_finished`` fires.
@@ -1342,6 +1398,12 @@ class TestFlowTracingWhenSuppressed:
class TestDeferTraceFinalization:
def test_bare_conversational_flow_defers_by_default(self) -> None:
class BareChat(ConversationalFlow):
pass
assert BareChat()._should_defer_trace_finalization() is True
def test_conversation_config_drives_defer_flag(self) -> None:
"""``ConversationConfig(defer_trace_finalization=...)`` controls whether
a conversational subclass defers per-turn trace finalization."""
@@ -1474,6 +1536,44 @@ class TestDeferredFlowLifecycleEvents:
listener.batch_manager.finalize_batch()
mock_finalize.assert_not_called()
def test_deferred_flow_kickoff_marks_trace_manager_session_deferred(
self,
) -> None:
class DeferredTraceFlow(Flow[ChatState]):
@start()
def begin(self) -> str:
return "done"
listener = TraceCollectionListener()
listener.batch_manager.defer_session_finalization = False
flow = DeferredTraceFlow()
flow.defer_trace_finalization = True
with patch.object(listener.batch_manager, "finalize_batch"):
flow.kickoff()
assert listener.batch_manager.defer_session_finalization is True
flow.finalize_session_traces()
assert listener.batch_manager.defer_session_finalization is False
def test_non_deferred_flow_kickoff_clears_stale_trace_manager_flag(
self,
) -> None:
class PlainTraceFlow(Flow[ChatState]):
@start()
def begin(self) -> str:
return "done"
listener = TraceCollectionListener()
listener.batch_manager.defer_session_finalization = True
PlainTraceFlow().kickoff()
assert listener.batch_manager.defer_session_finalization is False
class TestNestedCrewTracing:
def test_is_inside_active_flow_context_when_kickoff_running(self) -> None:
@@ -1527,3 +1627,130 @@ class TestNestedCrewTracing:
elif listener.batch_manager.batch_owner_type == "crew":
listener.batch_manager.finalize_batch()
mock_finalize.assert_not_called()
def test_lazy_flow_batch_from_context_preserves_deferred_parent(self) -> None:
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
listener = TraceCollectionListener()
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
listener.batch_manager.defer_session_finalization = False
listener.batch_manager.event_buffer.clear()
flow_id_token = current_flow_id.set("parent-flow-id")
flow_name_token = current_flow_name.set("ParentChatFlow")
defer_token = current_flow_defer_trace_finalization.set(True)
try:
initialized = listener._try_initialize_flow_batch_from_context(
type("Event", (), {"timestamp": None})()
)
assert initialized is True
assert listener.batch_manager.batch_owner_type == "flow"
assert listener.batch_manager.batch_owner_id == "parent-flow-id"
assert listener.batch_manager.defer_session_finalization is True
assert listener.batch_manager.current_batch is not None
assert (
listener.batch_manager.current_batch.execution_metadata[
"execution_type"
]
== "flow"
)
assert (
listener.batch_manager.current_batch.execution_metadata["flow_name"]
== "ParentChatFlow"
)
finally:
current_flow_defer_trace_finalization.reset(defer_token)
current_flow_name.reset(flow_name_token)
current_flow_id.reset(flow_id_token)
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
listener.batch_manager.trace_batch_id = None
listener.batch_manager.defer_session_finalization = False
listener.batch_manager.event_buffer.clear()
def test_nested_agent_executor_flow_does_not_finalize_parent_batch(
self,
) -> None:
from crewai import Agent, Crew, Task
from crewai.llms.base_llm import BaseLLM
class StaticLLM(BaseLLM):
def __init__(self) -> None:
super().__init__(model="debug-static-llm", provider="debug")
def call(
self,
messages: Any,
tools: Any = None,
callbacks: Any = None,
available_functions: Any = None,
from_task: Any = None,
from_agent: Any = None,
response_model: Any = None,
) -> str:
return (
"Thought: I can answer directly.\n"
"Final Answer: nested crew result"
)
class NestedCrewFlow(Flow[ChatState]):
defer_trace_finalization = True
tracing = True
@start()
def begin(self) -> str:
return "run_nested_crew"
@listen(begin)
def run_nested_crew(self, _: str) -> str:
agent = Agent(
role="Debug Agent",
goal="Return a short deterministic result",
backstory="Used only for trace finalization debugging.",
llm=StaticLLM(),
verbose=False,
)
task = Task(
description="Return the deterministic nested crew result.",
expected_output="nested crew result",
agent=agent,
)
return Crew(agents=[agent], tasks=[task], verbose=False).kickoff().raw
listener = TraceCollectionListener()
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
listener.batch_manager.trace_batch_id = None
listener.batch_manager.defer_session_finalization = False
listener.batch_manager.event_buffer.clear()
listener.first_time_handler.is_first_time = False
def initialize_backend_batch(*_: Any, **__: Any) -> None:
listener.batch_manager.trace_batch_id = "debug-trace-batch"
flow = NestedCrewFlow()
with (
patch.object(
listener.batch_manager,
"_initialize_backend_batch",
side_effect=initialize_backend_batch,
),
patch.object(listener.batch_manager, "finalize_batch") as mock_finalize,
):
flow.kickoff()
crewai_event_bus.flush()
flow.kickoff()
crewai_event_bus.flush()
assert mock_finalize.call_count == 0, (
"nested AgentExecutor flows inside a deferred parent Flow must "
"not finalize the parent trace batch"
)

View File

@@ -13,6 +13,7 @@ from pydantic import BaseModel
import crewai.flow.dsl as flow_dsl
import crewai.flow.flow_definition as flow_definition
import crewai.flow.visualization.builder as visualization_builder
from crewai.experimental import ConversationConfig, RouterConfig
from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start
@@ -35,7 +36,10 @@ def test_flow_public_exports_are_explicit():
"start",
}
assert set(flow_definition.__all__) == {
"FlowActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
@@ -169,6 +173,7 @@ def test_flow_definition_maps_dsl_to_static_contract():
assert definition.state.ref and "ContractState" in definition.state.ref
assert definition.config.stream is True
assert definition.config.max_method_calls == 7
assert definition.conversational is None
assert definition.methods["begin"].start is True
assert definition.methods["process"].listen == "begin"
@@ -201,25 +206,75 @@ def test_flow_definition_excludes_conversational_builtins_for_regular_flows():
methods = RegularFlow.flow_definition().methods
assert RegularFlow.flow_definition().conversational is None
assert set(methods) == {"begin"}
assert "conversation_start" not in methods
assert "route_conversation" not in methods
assert "converse_turn" not in methods
@pytest.mark.skip(
reason="Experimental conversational inherited built-ins are out of scope for the definition-first start migration."
)
def test_flow_definition_includes_conversational_builtins_when_enabled():
class ChatFlow(Flow):
conversational = True
methods = ChatFlow.flow_definition().methods
definition = ChatFlow.flow_definition()
methods = definition.methods
assert "conversation_start" in methods
assert definition.conversational is not None
assert definition.conversational.enabled is True
assert definition.conversational.defer_trace_finalization is True
assert definition.conversational.builtin_routes == ["converse", "end"]
assert "conversation_start" not in methods
assert "route_conversation" in methods
assert "converse_turn" in methods
assert methods["conversation_start"].start is True
assert methods["route_conversation"].start is True
assert methods["route_conversation"].router is True
def test_flow_definition_serializes_conversational_config():
@ConversationConfig(
system_prompt="Be concise.",
llm="gpt-4o-mini",
router=RouterConfig(
prompt="Pick a route.",
routes=["research"],
default_intent="converse",
fallback_intent="end",
),
default_intents=["research"],
visible_agent_outputs=["researcher"],
defer_trace_finalization=False,
)
class ChatFlow(Flow):
conversational = True
conversational = ChatFlow.flow_definition().conversational
assert conversational is not None
assert conversational.system_prompt == "Be concise."
assert conversational.llm == "gpt-4o-mini"
assert conversational.default_intents == ["research"]
assert conversational.visible_agent_outputs == ["researcher"]
assert conversational.defer_trace_finalization is False
assert conversational.router is not None
assert conversational.router.prompt == "Pick a route."
assert conversational.router.routes == ["research"]
assert conversational.router.fallback_intent == "end"
def test_flow_definition_uses_collapsed_conversational_router_start():
class ChatFlow(Flow):
conversational = True
def conversation_start(self) -> str | None:
return "custom"
methods = ChatFlow.flow_definition().methods
assert "conversation_start" not in methods
assert "route_conversation" in methods
assert methods["route_conversation"].start is True
assert methods["route_conversation"].router is True
def test_flow_definition_serializes_human_feedback_metadata():
@@ -575,6 +630,7 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract():
"name": "LoadedDiagnosticsFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"},
"router": True,
"emit": ["continue"],
}
@@ -608,6 +664,7 @@ def test_router_start_false_without_listen_reports_missing_trigger():
"name": "LoadedFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
"router": True,
"start": False,
"emit": ["continue"],
@@ -686,8 +743,14 @@ def test_static_string_listener_is_allowed_by_contract():
"schema": "crewai.flow/v1",
"name": "TypoFlow",
"methods": {
"begin": {"start": True},
"handle": {"listen": "begni"},
"begin": {
"do": {"ref": "loaded_flows:TypoFlow.begin"},
"start": True,
},
"handle": {
"do": {"ref": "loaded_flows:TypoFlow.handle"},
"listen": "begni",
},
},
}
)
@@ -700,8 +763,15 @@ def test_start_false_not_classified_as_start_method():
"schema": "crewai.flow/v1",
"name": "ExplicitNonStartFlow",
"methods": {
"begin": {"start": True},
"handle": {"start": False, "listen": "begin"},
"begin": {
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.begin"},
"start": True,
},
"handle": {
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.handle"},
"start": False,
"listen": "begin",
},
},
}
)
@@ -758,6 +828,7 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
"name": "LoadedFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
"router": True,
"emit": ["continue"],
}

View File

@@ -0,0 +1,552 @@
from __future__ import annotations
import pytest
from pydantic import ValidationError
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow import FlowState
from crewai.flow.flow_definition import FlowDefinition
class ChainFlow(Flow):
@start()
def begin(self):
self.state["begin_ran"] = True
return "hello"
@listen(begin)
def shout(self, result):
return result.upper()
@listen(shout)
def confirm(self):
self.state["confirmed"] = True
return f"confirmed:{self.state['confirmed']}"
CHAIN_YAML = f"""
schema: crewai.flow/v1
name: ChainFlow
methods:
begin:
do:
call: code
ref: {__name__}:ChainFlow.begin
start: true
shout:
do:
ref: {__name__}:ChainFlow.shout
listen: begin
confirm:
do:
ref: {__name__}:ChainFlow.confirm
listen: shout
"""
class MergeFlow(Flow):
@start()
def begin(self):
return "go"
@listen(begin)
def left(self):
return "left"
@listen(begin)
def right(self):
return "right"
@listen(or_(left, right))
def either(self):
self.state["either_ran"] = True
return "either"
@listen(and_(left, right, either))
def join(self):
self.state["joined"] = True
return "joined"
MERGE_YAML = f"""
schema: crewai.flow/v1
name: MergeFlow
methods:
begin:
do:
ref: {__name__}:MergeFlow.begin
start: true
left:
do:
ref: {__name__}:MergeFlow.left
listen: begin
right:
do:
ref: {__name__}:MergeFlow.right
listen: begin
either:
do:
ref: {__name__}:MergeFlow.either
listen:
or: [left, right]
join:
do:
ref: {__name__}:MergeFlow.join
listen:
and: [left, right, either]
"""
class RouteFlow(Flow):
@start()
def begin(self):
return "go"
@router(begin)
def decide(self):
return "left" if self.state.get("direction") == "left" else "right"
@listen("left")
def take_left(self):
return "took-left"
@listen("right")
def take_right(self):
return "took-right"
ROUTE_YAML = f"""
schema: crewai.flow/v1
name: RouteFlow
methods:
begin:
do:
ref: {__name__}:RouteFlow.begin
start: true
decide:
do:
ref: {__name__}:RouteFlow.decide
listen: begin
router: true
take_left:
do:
ref: {__name__}:RouteFlow.take_left
listen: left
take_right:
do:
ref: {__name__}:RouteFlow.take_right
listen: right
"""
class LoopFlow(Flow):
@start("retry")
def step(self):
self.state["count"] = self.state.get("count", 0) + 1
return self.state["count"]
@router(step)
def decide(self):
if self.state["count"] < 3:
return "retry"
return "done"
@listen("done")
def finish(self):
return f"finished:{self.state['count']}"
LOOP_YAML = f"""
schema: crewai.flow/v1
name: LoopFlow
methods:
step:
do:
ref: {__name__}:LoopFlow.step
start: retry
decide:
do:
ref: {__name__}:LoopFlow.decide
listen: step
router: true
finish:
do:
ref: {__name__}:LoopFlow.finish
listen: done
"""
class CounterState(FlowState):
count: int = 0
label: str = "none"
class PydanticStateFlow(Flow[CounterState]):
@start()
def begin(self):
self.state.count += 1
return self.state.count
@listen(begin)
def finish(self):
self.state.label = f"count={self.state.count}"
return self.state.label
PYDANTIC_STATE_YAML = f"""
schema: crewai.flow/v1
name: PydanticStateFlow
state:
type: pydantic
ref: {__name__}:CounterState
methods:
begin:
do:
ref: {__name__}:PydanticStateFlow.begin
start: true
finish:
do:
ref: {__name__}:PydanticStateFlow.finish
listen: begin
"""
PYDANTIC_STATE_OVERLAY_YAML = f"""
schema: crewai.flow/v1
name: PydanticStateFlow
state:
type: pydantic
ref: {__name__}:CounterState
default:
count: 5
methods:
begin:
do:
ref: {__name__}:PydanticStateFlow.begin
start: true
finish:
do:
ref: {__name__}:PydanticStateFlow.finish
listen: begin
"""
JSON_SCHEMA_STATE_YAML = f"""
schema: crewai.flow/v1
name: JsonSchemaStateFlow
state:
type: json_schema
json_schema:
title: CounterState
type: object
properties:
count:
type: integer
default: 0
label:
type: string
default: none
methods:
begin:
do:
ref: {__name__}:PydanticStateFlow.begin
start: true
finish:
do:
ref: {__name__}:PydanticStateFlow.finish
listen: begin
"""
PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML = f"""
schema: crewai.flow/v1
name: SchemaFallbackFlow
state:
type: pydantic
ref: definitely_not_a_module_xyz:MissingState
json_schema:
title: CounterState
type: object
properties:
count:
type: integer
default: 0
label:
type: string
default: none
methods:
begin:
do:
ref: {__name__}:PydanticStateFlow.begin
start: true
finish:
do:
ref: {__name__}:PydanticStateFlow.finish
listen: begin
"""
UNRESOLVABLE_STATE_YAML = f"""
schema: crewai.flow/v1
name: UnresolvableStateFlow
state:
type: pydantic
ref: definitely_not_a_module_xyz:MissingState
methods:
begin:
do:
ref: {__name__}:ChainFlow.begin
start: true
"""
DICT_STATE_YAML = f"""
schema: crewai.flow/v1
name: DictStateFlow
state:
type: dict
default:
count: 5
methods:
begin:
do:
ref: {__name__}:ChainFlow.begin
start: true
"""
UNKNOWN_STATE_YAML = f"""
schema: crewai.flow/v1
name: UnknownStateFlow
state:
type: unknown
ref: somewhere:Something
methods:
begin:
do:
ref: {__name__}:ChainFlow.begin
start: true
"""
def _run_with_events(flow, inputs=None):
events = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_finished(source, event):
events.append(event)
result = flow.kickoff(inputs=inputs)
events.sort(key=lambda e: e.timestamp)
return result, [
(type(e).__name__, str(e.method_name), e.flow_name) for e in events
]
def _state_without_id(flow):
snapshot = dict(flow.state.model_dump())
snapshot.pop("id", None)
return snapshot
def assert_parity(flow_cls, yaml_str, inputs=None, ordered=True):
class_flow = flow_cls()
class_result, class_events = _run_with_events(class_flow, inputs)
definition = FlowDefinition.from_yaml(yaml_str)
definition_flow = Flow.from_definition(definition)
definition_result, definition_events = _run_with_events(definition_flow, inputs)
assert definition_result == class_result
assert _state_without_id(definition_flow) == _state_without_id(class_flow)
if ordered:
assert definition_flow.method_outputs == class_flow.method_outputs
assert definition_events == class_events
else:
assert sorted(map(repr, definition_flow.method_outputs)) == sorted(
map(repr, class_flow.method_outputs)
)
assert sorted(definition_events) == sorted(class_events)
return definition_flow, definition_result
def test_simple_chain_parity():
flow, result = assert_parity(ChainFlow, CHAIN_YAML)
assert result == "confirmed:True"
assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"]
def test_and_or_merge_parity():
flow, _ = assert_parity(MergeFlow, MERGE_YAML, ordered=False)
assert flow.state["joined"] is True
assert flow.state["either_ran"] is True
def test_router_label_parity_for_each_branch():
left_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "left"})
assert "took-left" in left_flow.method_outputs
assert "took-right" not in left_flow.method_outputs
right_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "right"})
assert "took-right" in right_flow.method_outputs
def test_cyclic_flow_parity():
flow, result = assert_parity(LoopFlow, LOOP_YAML)
assert result == "finished:3"
assert flow.state["count"] == 3
def test_definition_flow_events_use_definition_name():
definition = FlowDefinition.from_yaml(CHAIN_YAML)
flow = Flow.from_definition(definition)
_, events = _run_with_events(flow)
assert events
assert all(flow_name == "ChainFlow" for _, _, flow_name in events)
def test_definition_method_without_action_is_invalid():
with pytest.raises(ValidationError, match="do"):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "NoActions",
"methods": {"begin": {"start": True}},
}
)
def test_from_definition_unresolvable_ref_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "BadRefs",
"methods": {
"begin": {
"start": True,
"do": {"ref": "definitely_not_a_module_xyz:nope"},
}
},
}
)
with pytest.raises(ValueError, match="unresolvable actions.*begin"):
Flow.from_definition(definition)
def test_from_definition_malformed_ref_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "MalformedRefs",
"methods": {"begin": {"start": True, "do": {"ref": "no-colon-here"}}},
}
)
with pytest.raises(ValueError, match="expected 'module:qualname'"):
Flow.from_definition(definition)
def test_from_definition_local_scope_ref_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LocalRefs",
"methods": {
"begin": {
"start": True,
"do": {"ref": f"{__name__}:make.<locals>.LocalFlow.begin"},
}
},
}
)
with pytest.raises(ValueError, match="expected 'module:qualname'"):
Flow.from_definition(definition)
def test_flow_definition_stamps_refs():
definition = ChainFlow.flow_definition()
assert definition.methods["begin"].do.ref == f"{__name__}:ChainFlow.begin"
assert definition.methods["shout"].do.ref == f"{__name__}:ChainFlow.shout"
def test_pydantic_state_from_ref_parity():
flow, result = assert_parity(PydanticStateFlow, PYDANTIC_STATE_YAML)
assert result == "count=1"
assert flow.state.count == 1
assert flow.state.label == "count=1"
assert flow.state.id
def test_pydantic_state_default_overlay():
flow = Flow.from_definition(FlowDefinition.from_yaml(PYDANTIC_STATE_OVERLAY_YAML))
result = flow.kickoff()
assert result == "count=6"
assert flow.state.count == 6
def test_json_schema_state():
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
result = flow.kickoff()
assert result == "count=1"
assert flow.state.count == 1
assert flow.state.label == "count=1"
assert flow.state.id
def test_json_schema_state_validates_inputs():
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
with pytest.raises(ValueError, match="Invalid inputs"):
flow.kickoff(inputs={"count": "not-a-number"})
def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable():
flow = Flow.from_definition(
FlowDefinition.from_yaml(PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML)
)
result = flow.kickoff()
assert result == "count=1"
assert flow.state.count == 1
def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog):
with caplog.at_level("ERROR"):
flow = Flow.from_definition(FlowDefinition.from_yaml(UNRESOLVABLE_STATE_YAML))
assert "falling back to dict state" in caplog.text
result = flow.kickoff()
assert result == "hello"
assert flow.state["begin_ran"] is True
assert flow.state["id"]
def test_dict_state_is_a_copy_of_default_plus_id():
definition = FlowDefinition.from_yaml(DICT_STATE_YAML)
flow = Flow.from_definition(definition)
assert flow.state["count"] == 5
assert flow.state["id"]
flow.kickoff()
assert flow.state["begin_ran"] is True
second = Flow.from_definition(definition)
assert second.state["count"] == 5
assert "begin_ran" not in second.state
assert second.state["id"] != flow.state["id"]
assert definition.state.default == {"count": 5}
def test_unknown_state_type_falls_back_to_dict(caplog):
with caplog.at_level("WARNING"):
flow = Flow.from_definition(FlowDefinition.from_yaml(UNKNOWN_STATE_YAML))
assert "falling back to dict state" in caplog.text
result = flow.kickoff()
assert result == "hello"
assert flow.state["begin_ran"] is True

View File

@@ -77,12 +77,22 @@ class ComplexFlow(Flow):
return "complete"
def _attach_flow_definition(flow_class: type[Flow], methods: dict[str, object]) -> None:
def _attach_flow_definition(
flow_class: type[Flow], methods: dict[str, dict[str, object]]
) -> None:
flow_class._flow_definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": flow_class.__name__,
"methods": methods,
"methods": {
name: {
"do": {
"ref": f"{flow_class.__module__}:{flow_class.__name__}.{name}"
},
**spec,
}
for name, spec in methods.items()
},
}
)
@@ -125,13 +135,20 @@ def test_build_flow_structure_from_flow_definition():
"schema": "crewai.flow/v1",
"name": "DefinedFlow",
"methods": {
"begin": {"start": True},
"begin": {
"do": {"ref": "defined_flows:DefinedFlow.begin"},
"start": True,
},
"decide": {
"do": {"ref": "defined_flows:DefinedFlow.decide"},
"listen": "begin",
"router": True,
"emit": ["done"],
},
"finish": {"listen": "done"},
"finish": {
"do": {"ref": "defined_flows:DefinedFlow.finish"},
"listen": "done",
},
},
}
)

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.14.7a4"
__version__ = "1.14.7"