Compare commits

..

19 Commits

Author SHA1 Message Date
Vini Brasil
64438cba37 Wire config and persistence from FlowDefinition into the runtime (#6132)
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Check Documentation Broken Links / Check broken links (push) Waiting to run
Vulnerability Scan / pip-audit (push) Waiting to run
* Wire config and persistence from FlowDefinition into the runtime

`from_definition` was silently dropping all config fields; it now passes
`config.model_dump()` so suppress_flow_events, max_method_calls, etc.
actually apply.

Persistence is now engine-driven: `_persist_method_completion` fires
after every method using the definition's persist metadata, so
`@persist` no longer needs to wrap methods — it just stamps them.

* Address code review comments
2026-06-12 11:51:44 -07:00
Lucas Gomide
887adafd2c fix: aggregate token usage across all LLM calls (#6122)
* feat: aggregate LLM token usage at the flow level

Introduces `flow.usage_metrics`, a snapshot of every LLMCallCompletedEvent
emitted under the flow's `current_flow_id` for the duration of one kickoff
(or resume) call. Aggregation happens on the singleton event bus so it
covers crews, direct `LLM.call`s, and nested listener calls — solving the
mismatch where the SDK reported only the last crew's usage while the
Enterprise UI showed the correct full total.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor: centralize provider key normalization in UsageMetrics

Add UsageMetrics.from_provider_dict to normalize raw LLM usage dicts
across providers (LiteLLM, native Anthropic, native Gemini, OpenAI
nested cached). BaseLLM._track_token_usage_internal and the flow-level
aggregator now share this single source of truth, so `flow.usage_metrics`
agrees with per-LLM totals on every provider — including the native
Anthropic path that emits `input_tokens`/`output_tokens` instead of
`prompt_tokens`/`completion_tokens`.

* fix: flush event bus before reading aggregated usage_metrics

`crewai_event_bus.emit` dispatches LLMCallCompletedEvent handlers on a
ThreadPoolExecutor (fire-and-forget), so a flow whose last LLM call
completes right before kickoff_async/resume_async returns can detach
the usage listener while that handler is still queued, leaving its
tokens off `flow.usage_metrics`. Match `Crew.kickoff()` and call
`crewai_event_bus.flush()` in both finally blocks so every handler
drains before the listener is detached.

---------

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-12 12:55:22 -04:00
Rip&Tear
d3fc0d31f8 [codex] Redact file tool paths (#6134)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
* Redact file tool paths

* Fix for pull request finding 'Empty except'

* Potential fix for pull request finding

---------
2026-06-12 15:50:40 +08:00
Vini Brasil
373dca3d04 Run flows from a definition without a Python subclass (#6104)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
* Read flow dispatch from FlowDefinition

Store the definition in a `_definition` PrivateAttr at post-init and
convert the dispatch helpers (`_start_method_names`, `_listener_methods`,
`_start_condition`, `_listen_condition`, `_is_router`) from classmethods
to instance methods that read it. Event names now fall back to
`self._definition.name` instead of `self.__class__.__name__`.

Behavior is identical for decorator subclasses, but the engine no longer
assumes the definition comes from the class. This is the seam for
`Flow.from_definition`, where an instance runs a definition that was
loaded rather than built from a Python subclass.

* Add Flow.from_definition to run flows without a subclass

A FlowDefinition (e.g. loaded from YAML) was only usable for dispatch on
decorator-authored subclasses. Now each method definition records an
importable `module:qualname` handler ref, and `Flow.from_definition`
resolves and binds those handlers to build a runnable flow directly.

* Build flow state from FlowDefinition

Definition-driven flows previously always started with a bare dict
state.

* Replace handler string with structured FlowActionDefinition

`handler: str | None` was optional and opaque — missing handlers only
surfaced at kickoff time. `do: FlowActionDefinition` is required, so
Pydantic rejects invalid definitions at parse time.

The `call: "code"` discriminator prepares the schema for future
non-Python action types (e.g. MCP tool, crew) without touching
`FlowMethodDefinition`. Resolution logic is extracted to
`runtime/_action_resolvers.py` to keep the dispatch point isolated.

* Fix conversational start router missing required do field

FlowMethodDefinition.do became required when the handler string was
replaced with FlowActionDefinition, but _conversation_start_router still
built its fragment without it, breaking crewai import entirely.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* Add event scoping to flow test

* Change lib/crewai/tests/test_flow_from_definition.py

---------

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 14:18:49 -07:00
Greyson LaLonde
21fa8e32d9 docs: update changelog and version for v1.14.7
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
2026-06-11 10:13:40 -07:00
Greyson LaLonde
f18c03cd8f feat: bump versions to 1.14.7 2026-06-11 10:06:07 -07:00
Greyson LaLonde
50b9c02272 fix(checkpoint): rebuild custom BaseLLM as concrete LLM on restore
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
A custom BaseLLM subclass serializes with the inherited llm_type "base",
which the registry maps to the abstract BaseLLM. Restore then crashed on
cls(**value). Rebuild a concrete LLM from the saved config when the
resolved class is abstract.
2026-06-10 22:21:35 -07:00
Greyson LaLonde
c55334be5f docs: update changelog and version for v1.14.7rc2 2026-06-10 20:52:56 -07:00
Greyson LaLonde
05a2ba9ca4 feat: bump versions to 1.14.7rc2 2026-06-10 20:45:29 -07:00
Greyson LaLonde
fbafe1f0d3 fix(flow): gate restore on a flag so live snapshots don't replay as resume
Checkpoint serialization stamps checkpoint_completed_methods onto every live
Flow in RuntimeState.root, including the agent executor reused across a crew's
tasks. kickoff_async read that stamp as a restore signal, so the second task
replayed the first task's completed methods and never reached a final answer.

Gate is_restoring on _restored_from_checkpoint, set only by
_restore_from_checkpoint, and consume it single-shot.
2026-06-10 20:40:08 -07:00
Greyson LaLonde
5267c059f5 test(flow): pass show=False in test_flow_plotting to not open a browser
flow.plot defaults to show=True, which calls webbrowser.open on every run.
The test only asserts FlowPlotEvent is emitted, so disable the browser open.
2026-06-10 20:36:14 -07:00
Greyson LaLonde
243c9edc1c docs: update changelog and version for v1.14.7rc1
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
2026-06-10 18:56:52 -07:00
Greyson LaLonde
68910b70c0 feat: bump versions to 1.14.7rc1 2026-06-10 18:50:54 -07:00
Greyson LaLonde
299782765c ci: ignore GHSA-rrmf-rvhw-rf47 (torch alias of PYSEC-2025-194)
* ci: ignore GHSA-rrmf-rvhw-rf47 (torch alias of PYSEC-2025-194)

pip-audit reports CVE-2025-3000 under its GHSA id, which the existing
PYSEC-2025-194 ignore does not match. Same advisory: memory corruption
in torch.jit.script, CVSS 1.9, local-only, no fix for torch 2.11.0.

* ci: sync GHSA-rrmf-rvhw-rf47 ignore into pre-commit pip-audit
2026-06-10 18:45:42 -07:00
Greyson LaLonde
a1f44eb272 fix(events): scope runtime state per run to bound growth and isolate concurrent runs 2026-06-10 18:39:05 -07:00
Lorenze Jay
036b032ab6 handle supporting both custom prompts (#6108)
* handle supporting both custom prompts

* handle translations

* handle deprecation warnings better
2026-06-10 17:52:53 -07:00
Lorenze Jay
f88ae54f96 fix telemetry setup on crewai-login (#6106)
* fix telemetry setup on crewai-login

* type check fix
2026-06-10 17:03:25 -07:00
Lorenze Jay
b6e5d632c1 improve convo routing cycle with one less route (#6102)
* improve one less route

* flows in flows, new agent executor causing early trace batch finalization

* addressing comments

* addressing comments pt2

* lint and typecheck fix
2026-06-10 16:49:16 -07:00
Greyson LaLonde
0d971e5bc5 feat(events): add reset_runtime_state to release accumulated bus state 2026-06-10 16:12:28 -07:00
67 changed files with 5883 additions and 442 deletions

View File

@@ -64,6 +64,7 @@ jobs:
--ignore-vuln PYSEC-2025-197 \
--ignore-vuln PYSEC-2025-210 \
--ignore-vuln PYSEC-2026-139 \
--ignore-vuln GHSA-rrmf-rvhw-rf47 \
--ignore-vuln PYSEC-2025-211 \
--ignore-vuln PYSEC-2025-212 \
--ignore-vuln PYSEC-2025-213 \
@@ -81,6 +82,7 @@ jobs:
# PYSEC-2025-183 - pyjwt 2.12.1: disputed weak-encryption claim; key length is application-chosen
# PYSEC-2025-189..197 - torch 2.11.0: memory-corruption/DoS in functions only reachable via untrusted models; no fix available
# PYSEC-2025-210, PYSEC-2026-139 - torch 2.11.0: profiler/deserialization issues; no fix available
# GHSA-rrmf-rvhw-rf47 - torch 2.11.0 (CVE-2025-3000, alias of PYSEC-2025-194): memory corruption in torch.jit.script, CVSS 1.9, local-only; affected <=2.12.0, no fix available. pip-audit reports it under the GHSA id so the PYSEC ignore above does not catch it.
# PYSEC-2025-211..218 - transformers 5.5.4: deserialization/code injection via malicious model checkpoints; no fix available
# GHSA-f4j7-r4q5-qw2c - chromadb 1.1.1 (CVE-2026-45829): pre-auth RCE via /api/v2/tenants/{tenant}/databases/{db}/collections when trust_remote_code=true.
# Advisory: vulnerable >=1.0.0,<=1.5.9, firstPatchedVersion=none. We only use chromadb.PersistentClient (lib/crewai/src/crewai/rag/chromadb/factory.py)

View File

@@ -47,6 +47,7 @@ repos:
--ignore-vuln PYSEC-2025-197
--ignore-vuln PYSEC-2025-210
--ignore-vuln PYSEC-2026-139
--ignore-vuln GHSA-rrmf-rvhw-rf47
--ignore-vuln PYSEC-2025-211
--ignore-vuln PYSEC-2025-212
--ignore-vuln PYSEC-2025-213

View File

@@ -4,6 +4,106 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="11 يونيو 2026">
## v1.14.7
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
## ما الذي تغير
### الميزات
- إضافة واجهات خلفية افتراضية قابلة للتوصيل للذاكرة، والمعرفة، وrag، وflow.
- عرض السبب الحقيقي للإنهاء، ومعلمات العينة، وresponse.id في أحداث LLM.
- تصنيف مشغلات DSL كزخارف واعية للمسار.
- إضافة واجهة برمجة تطبيقات الدردشة لتدفقات المحادثة.
- جعل واجهة القفل قابلة للتجاوز.
- بناء FlowDefinition من بيانات التعريف الخاصة بـ Flow DSL.
- إضافة مزود LLM من Snowflake Cortex الأصلي.
- إضافة دعم لملفات الوكلاء المدربين من crew.
### إصلاحات الأخطاء
- إصلاح نقطة التحقق لإعادة بناء BaseLLM مخصص كـ LLM ملموس عند الاستعادة.
- تقييد الاستعادة على علامة لمنع اللقطات الحية من إعادة التشغيل كاستئناف.
- تحديد حالة وقت التشغيل لكل تشغيل للحد من النمو وعزل التشغيل المتزامن.
- إصلاح إعدادات التتبع على crewai-login.
- احترام suppress_flow_events لأحداث تنفيذ الطريقة.
- استعادة [project.scripts] في حزمة crewai لتثبيت أداة uv.
- حل مشكلات CVE الخاصة بـ pip-audit لـ aiohttp وdocling وdocling-core.
- إصلاح إدخال الملفات الذي لا يعمل بشكل موثوق.
- إصلاح تاريخ نتائج أدوات Snowflake Claude غير المكتملة.
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.7.
- تحديث وثائق جامع OpenTelemetry.
- تحديث دليل NVIDIA Nemotron LLM.
- إضافة دليل تكامل Databricks.
- إضافة دليل تكامل Snowflake.
### الأداء
- تحسين سرعة استيراد crewai من خلال تحميل مستندات docling بشكل كسول.
### إعادة الهيكلة
- تبسيط تقييم شروط التدفق ليكون بلا حالة لكل حدث.
- فصل منطق المحادثة عن وقت التشغيل وإضافة تعريف المحادثة.
- تقسيم `flow.py` إلى DSL، وتعريف، ووقت تشغيل.
## المساهمون
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
</Update>
<Update label="10 يونيو 2026">
## v1.14.7rc2
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
## ما الذي تغير
### إصلاحات الأخطاء
- استعادة البوابة على علامة لمنع اللقطات الحية من إعادة التشغيل كاستئناف
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.7rc1
## المساهمون
@greysonlalonde
</Update>
<Update label="10 يونيو 2026">
## v1.14.7rc1
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## ما الذي تغير
### الميزات
- إضافة `reset_runtime_state` لإطلاق حالة الحافلة المتراكمة
- التعامل مع دعم كل من الموجهات المخصصة
- فصل منطق المحادثة عن وقت التشغيل وإضافة `conversational_definition`
### إصلاحات الأخطاء
- إصلاح نطاق حالة وقت التشغيل لكل تشغيل للحد من النمو وعزل التشغيلات المتزامنة
- إصلاح إعدادات القياس عن بُعد على `crewai-login`
- إصلاح احترام `suppress_flow_events` لفعاليات تنفيذ الأساليب
### الوثائق
- تحديث صور OpenTelemetry
- تحديث الوثائق لتعكس الحالة الجديدة لجمع بيانات OpenTelemetry
- تحديث سجل التغييرات والإصدار لـ v1.14.7a4
### إعادة الهيكلة
- تبسيط تقييم شرط التدفق ليكون بلا حالة لكل حدث
- تحسين دورة توجيه المحادثة مع تقليل مسار واحد
## المساهمون
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="9 يونيو 2026">
## v1.14.7a4

View File

@@ -226,6 +226,48 @@ counter=2 message='Hello from first_method - updated by second_method'
من خلال ضمان إعادة مخرجات الدالة الأخيرة وتوفير الوصول إلى الحالة، تجعل تدفقات CrewAI من السهل دمج نتائج سير عمل الذكاء الاصطناعي في التطبيقات أو الأنظمة الأكبر،
مع الحفاظ على الوصول إلى الحالة طوال تنفيذ التدفق.
## مقاييس استخدام التدفق
بعد اكتمال تنفيذ التدفق، يمكنك الوصول إلى الخاصية `usage_metrics` لعرض إجمالي استخدام التوكنات عبر **كل استدعاء لنموذج اللغة** يتم خلال التشغيل — بما في ذلك الاستدعاءات من كل فريق (Crew) ينظمه التدفق، والاستدعاءات داخل أدوات الـ Agents، والاستدعاءات المباشرة لـ `LLM.call(...)` من دوال التدفق. هذا هو المكافئ على جانب الـ SDK للإجماليات المعروضة في واجهة CrewAI Enterprise.
```python Code
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
class UsageMetricsFlow(Flow):
@start()
def run_first_crew(self):
self.state.first_result = FirstCrew().crew().kickoff()
@listen(run_first_crew)
def call_llm_directly(self):
# استدعاء مباشر لنموذج اللغة — يُحسب أيضًا ضمن flow.usage_metrics
llm = LLM(model="openai/gpt-4o-mini")
self.state.summary = llm.call("لخّص النقاط الرئيسية.")
@listen(call_llm_directly)
def run_second_crew(self):
self.state.second_result = SecondCrew().crew().kickoff()
flow = UsageMetricsFlow()
flow.kickoff()
print(flow.usage_metrics)
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
# cached_prompt_tokens=0, reasoning_tokens=0,
# cache_creation_tokens=0, successful_requests=5)
```
<Note>
`flow.usage_metrics` **ليست** نفس `flow.kickoff().token_usage`. هذه الأخيرة
ترجع فقط `CrewOutput.token_usage` لـ **آخر** دالة `@listen` أعادت
`CrewOutput`، مما يعني أنها تعكس فقط الفريق الأخير وتتجاهل الفرق السابقة
وكذلك أي استدعاءات مباشرة لـ `LLM.call(...)`. استخدم `flow.usage_metrics`
كلما احتجت إلى الإجمالي **الكامل** للتوكنات لتنفيذ التدفق.
</Note>
كل حقل في [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) المُعاد هو مجموع جميع استدعاءات نموذج اللغة التي حدثت خلال استدعاء واحد لـ `flow.kickoff()`. تتم إعادة تعيين العدادات عند الاستدعاء التالي لـ `kickoff()` (وفي كل تكرار من `kickoff_for_each`)، لذلك لن تتكرر العدّات عبر التشغيلات المتتالية. يمكن قراءة هذه الخاصية بأمان في أي وقت بعد اكتمال `kickoff()`؛ قراءتها أثناء التنفيذ تُرجع المجموع الجزئي المتراكم حتى تلك اللحظة.
## إدارة حالة التدفق
إدارة الحالة بفعالية أمر بالغ الأهمية لبناء سير عمل ذكاء اصطناعي موثوق وقابل للصيانة. توفر تدفقات CrewAI آليات قوية لإدارة الحالة غير المهيكلة والمهيكلة،

View File

@@ -161,6 +161,18 @@ crew = Crew(
)
```
<Note>
يُحتفظ بـ `agent.i18n` للتوافق مع الإصدارات السابقة فقط، وقد تم إهماله. لتخصيص المطالبات أثناء التشغيل، مرّر `prompt_file` إلى `Crew`. وللوصول البرمجي المباشر إلى شرائح المطالبات، استخدم أداة i18n مباشرة:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### الخيار 3: تعطيل مطالبات النظام لنماذج o1
```python
agent = Agent(
@@ -208,6 +220,8 @@ agent = Agent(
يدمج CrewAI بعد ذلك تخصيصاتك مع الإعدادات الافتراضية، فلا تحتاج لإعادة تعريف كل مطالبة. إليك الطريقة:
بالنسبة للكود الذي يحتاج إلى قراءة شرائح المطالبات مباشرة، استخدم `crewai.utilities.i18n.get_i18n()` مع ملف المطالبات نفسه بدلًا من قراءة `agent.i18n`.
### مثال: تخصيص أساسي للمطالبات
أنشئ ملف `custom_prompts.json` بالمطالبات التي تريد تعديلها. تأكد من إدراج جميع المطالبات عالية المستوى التي يجب أن يحتويها، وليس فقط تغييراتك:

File diff suppressed because it is too large Load Diff

View File

@@ -4,6 +4,106 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Jun 11, 2026">
## v1.14.7
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
## What's Changed
### Features
- Add pluggable default backends for memory, knowledge, rag, and flow.
- Surface real finish_reason, sampling params, and response.id on LLM events.
- Type DSL triggers as route-aware decorators.
- Add chat API for conversational flows.
- Make locking backend overridable.
- Build FlowDefinition from Flow DSL metadata.
- Add native Snowflake Cortex LLM provider.
- Add crew trained agents file support.
### Bug Fixes
- Fix checkpoint to rebuild custom BaseLLM as concrete LLM on restore.
- Gate restore on a flag to prevent live snapshots from replaying as resume.
- Scope runtime state per run to bound growth and isolate concurrent runs.
- Fix telemetry setup on crewai-login.
- Respect suppress_flow_events for method-execution events.
- Restore [project.scripts] in crewai package for uv tool install.
- Resolve pip-audit CVEs for aiohttp, docling, and docling-core.
- Fix file input not working reliably.
- Fix Snowflake Claude incomplete tool result histories.
### Documentation
- Update changelog and version for v1.14.7.
- Update OpenTelemetry collector documentation.
- Update NVIDIA Nemotron LLM guide.
- Add Databricks integration guide.
- Add Snowflake integration guide.
### Performance
- Improve crewai import speed by lazy-loading docling imports.
### Refactoring
- Simplify flow condition evaluation to be stateless per event.
- Decouple convo logic from runtime and add a conversational_definition.
- Split `flow.py` into DSL, definition, and runtime.
## Contributors
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
</Update>
<Update label="Jun 10, 2026">
## v1.14.7rc2
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
## What's Changed
### Bug Fixes
- Gate restore on a flag to prevent live snapshots from replaying as resume
### Documentation
- Update changelog and version for v1.14.7rc1
## Contributors
@greysonlalonde
</Update>
<Update label="Jun 10, 2026">
## v1.14.7rc1
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## What's Changed
### Features
- Add `reset_runtime_state` to release accumulated bus state
- Handle supporting both custom prompts
- Decouple conversation logic from runtime and add a `conversational_definition`
### Bug Fixes
- Fix scope of runtime state per run to bound growth and isolate concurrent runs
- Fix telemetry setup on `crewai-login`
- Fix respect for `suppress_flow_events` for method-execution events
### Documentation
- Update OpenTelemetry images
- Update documentation to reflect new state of OpenTelemetry collector
- Update changelog and version for v1.14.7a4
### Refactoring
- Simplify flow condition evaluation to be stateless per event
- Improve conversation routing cycle with one less route
## Contributors
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="Jun 09, 2026">
## v1.14.7a4

View File

@@ -226,6 +226,49 @@ After the Flow has run, you can access the final state to see the updates made b
By ensuring that the final method's output is returned and providing access to the state, CrewAI Flows make it easy to integrate the results of your AI workflows into larger applications or systems,
while also maintaining and accessing the state throughout the Flow's execution.
## Flow Usage Metrics
After a Flow execution completes, you can access the `usage_metrics` property to view aggregated token usage across **every LLM call** made during the run — including calls from every Crew the Flow orchestrated, calls inside Agent tools, and bare `LLM.call(...)` invocations from Flow methods. This is the SDK-side equivalent of the totals shown in the CrewAI Enterprise UI.
```python Code
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
class UsageMetricsFlow(Flow):
@start()
def run_first_crew(self):
self.state.first_result = FirstCrew().crew().kickoff()
@listen(run_first_crew)
def call_llm_directly(self):
# Bare LLM call — still counted by flow.usage_metrics
llm = LLM(model="openai/gpt-4o-mini")
self.state.summary = llm.call("Summarize the key takeaways.")
@listen(call_llm_directly)
def run_second_crew(self):
self.state.second_result = SecondCrew().crew().kickoff()
flow = UsageMetricsFlow()
flow.kickoff()
print(flow.usage_metrics)
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
# cached_prompt_tokens=0, reasoning_tokens=0,
# cache_creation_tokens=0, successful_requests=5)
```
<Note>
`flow.usage_metrics` is **not** the same as `flow.kickoff().token_usage`. The
latter returns the `CrewOutput.token_usage` of the **last** `@listen` method
that returned a `CrewOutput`, which means it only reflects the final Crew and
ignores prior Crews and bare `LLM.call(...)` invocations entirely. Use
`flow.usage_metrics` whenever you need the **full** token rollup for the Flow
execution.
</Note>
Each entry in the returned [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) is the sum across all LLM calls made within a single `flow.kickoff()` invocation. Counters reset on the next `kickoff()` call (or on each iteration of `kickoff_for_each`), so successive runs don't double-count. The property is safe to read at any point after `kickoff()` completes; reading it during execution returns the partial total accumulated so far.
## Flow State Management
Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management,

View File

@@ -161,6 +161,18 @@ crew = Crew(
)
```
<Note>
`agent.i18n` is maintained only for backward compatibility and is deprecated. For runtime prompt customization, pass `prompt_file` to `Crew`. For programmatic access to prompt slices, use the i18n utility directly:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### Option 3: Disable System Prompts for o1 Models
```python
agent = Agent(
@@ -208,6 +220,8 @@ One straightforward approach is to create a JSON file for the prompts you want t
CrewAI then merges your customizations with the defaults, so you don't have to redefine every prompt. Here's how:
For code that needs to read prompt slices directly, use `crewai.utilities.i18n.get_i18n()` with the same prompt file instead of reading `agent.i18n`.
### Example: Basic Prompt Customization
Create a `custom_prompts.json` file with the prompts you want to modify. Ensure you list all top-level prompts it should contain, not just your changes:

View File

@@ -4,6 +4,106 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 6월 11일">
## v1.14.7
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
## 변경 사항
### 기능
- 메모리, 지식, RAG 및 흐름에 대한 플러그 가능한 기본 백엔드를 추가했습니다.
- LLM 이벤트에서 실제 finish_reason, 샘플링 매개변수 및 response.id를 표시합니다.
- 경로 인식 장식자로서의 타입 DSL 트리거를 설정합니다.
- 대화 흐름을 위한 채팅 API를 추가했습니다.
- 잠금 백엔드를 재정의 가능하도록 만듭니다.
- Flow DSL 메타데이터에서 FlowDefinition을 빌드합니다.
- 네이티브 Snowflake Cortex LLM 공급자를 추가했습니다.
- 훈련된 에이전트 파일 지원을 추가했습니다.
### 버그 수정
- 복원 시 사용자 정의 BaseLLM을 구체적인 LLM으로 재구성하도록 체크포인트를 수정했습니다.
- 라이브 스냅샷이 재개로 재생되지 않도록 플래그를 사용하여 복원을 제한합니다.
- 실행마다 런타임 상태의 범위를 설정하여 성장을 제한하고 동시 실행을 격리합니다.
- crewai-login에서 텔레메트리 설정을 수정했습니다.
- 메서드 실행 이벤트에 대해 suppress_flow_events를 존중합니다.
- uv 도구 설치를 위해 crewai 패키지에서 [project.scripts]를 복원합니다.
- aiohttp, docling 및 docling-core에 대한 pip-audit CVE를 해결합니다.
- 파일 입력이 신뢰할 수 없게 작동하는 문제를 수정했습니다.
- Snowflake Claude의 불완전한 도구 결과 기록을 수정했습니다.
### 문서
- v1.14.7에 대한 변경 로그 및 버전을 업데이트했습니다.
- OpenTelemetry 수집기 문서를 업데이트했습니다.
- NVIDIA Nemotron LLM 가이드를 업데이트했습니다.
- Databricks 통합 가이드를 추가했습니다.
- Snowflake 통합 가이드를 추가했습니다.
### 성능
- docling 가져오기를 지연 로딩하여 crewai 가져오기 속도를 개선했습니다.
### 리팩토링
- 흐름 조건 평가를 이벤트별로 상태 비저장으로 단순화했습니다.
- 대화 논리를 런타임에서 분리하고 conversational_definition을 추가했습니다.
- `flow.py`를 DSL, 정의 및 런타임으로 분리했습니다.
## 기여자
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
</Update>
<Update label="2026년 6월 10일">
## v1.14.7rc2
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
## 변경 사항
### 버그 수정
- 라이브 스냅샷이 재개로 재생되는 것을 방지하기 위한 플래그에서 게이트 복원
### 문서
- v1.14.7rc1에 대한 변경 로그 및 버전 업데이트
## 기여자
@greysonlalonde
</Update>
<Update label="2026년 6월 10일">
## v1.14.7rc1
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## 변경 사항
### 기능
- 누적된 버스 상태를 해제하기 위해 `reset_runtime_state` 추가
- 사용자 정의 프롬프트를 모두 지원하도록 처리
- 대화 논리를 런타임과 분리하고 `conversational_definition` 추가
### 버그 수정
- 실행당 런타임 상태의 범위를 수정하여 성장 제한 및 동시 실행 격리
- `crewai-login`에서 원격 측정 설정 수정
- 메서드 실행 이벤트에 대한 `suppress_flow_events` 존중 수정
### 문서
- OpenTelemetry 이미지 업데이트
- OpenTelemetry 수집기의 새로운 상태를 반영하도록 문서 업데이트
- v1.14.7a4에 대한 변경 로그 및 버전 업데이트
### 리팩토링
- 이벤트당 상태 비저장 방식으로 흐름 조건 평가 단순화
- 경로를 하나 줄여 대화 라우팅 사이클 개선
## 기여자
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="2026년 6월 9일">
## v1.14.7a4

View File

@@ -221,6 +221,48 @@ Flow가 실행된 후, 이러한 메소드들에 의해 수행된 업데이트
최종 메소드의 출력이 반환되고 상태에 접근할 수 있도록 함으로써, CrewAI Flow는 AI 워크플로우의 결과를 더 큰 애플리케이션이나 시스템에 쉽게 통합할 수 있게 하며,
Flow 실행 과정 전반에 걸쳐 상태를 유지하고 접근하면서도 이를 용이하게 만듭니다.
## 플로우 사용 메트릭
Flow 실행이 완료된 후, `usage_metrics` 속성에 접근하여 실행 동안 발생한 **모든 LLM 호출**의 토큰 사용량 집계를 확인할 수 있습니다. 여기에는 Flow가 오케스트레이션한 모든 Crew의 호출, Agent의 도구 내부에서 발생한 호출, 그리고 Flow 메서드에서 직접 호출한 `LLM.call(...)`이 모두 포함됩니다. 이는 CrewAI Enterprise UI에 표시되는 총량과 동등한 SDK 측 값입니다.
```python Code
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
class UsageMetricsFlow(Flow):
@start()
def run_first_crew(self):
self.state.first_result = FirstCrew().crew().kickoff()
@listen(run_first_crew)
def call_llm_directly(self):
# 직접 LLM 호출 — flow.usage_metrics에서도 집계됩니다
llm = LLM(model="openai/gpt-4o-mini")
self.state.summary = llm.call("핵심 내용을 요약해 주세요.")
@listen(call_llm_directly)
def run_second_crew(self):
self.state.second_result = SecondCrew().crew().kickoff()
flow = UsageMetricsFlow()
flow.kickoff()
print(flow.usage_metrics)
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
# cached_prompt_tokens=0, reasoning_tokens=0,
# cache_creation_tokens=0, successful_requests=5)
```
<Note>
`flow.usage_metrics`는 `flow.kickoff().token_usage`와 **동일하지 않습니다**.
후자는 `CrewOutput`을 반환한 **마지막** `@listen` 메서드의
`CrewOutput.token_usage`만 반환하므로, 이전에 실행된 Crew들과 Flow 메서드에서
직접 호출한 `LLM.call(...)`은 전혀 포함되지 않습니다. Flow 실행에 대한
**전체** 토큰 집계가 필요할 때는 항상 `flow.usage_metrics`를 사용하십시오.
</Note>
반환되는 [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py)의 각 항목은 단일 `flow.kickoff()` 실행 동안 발생한 모든 LLM 호출의 합계입니다. 다음 `kickoff()` 호출(및 `kickoff_for_each`의 각 반복)에서 카운터가 초기화되므로 연속 실행이 이중으로 집계되지 않습니다. 이 속성은 `kickoff()` 완료 후 언제든지 안전하게 읽을 수 있으며, 실행 중에 읽으면 그 시점까지 누적된 부분 합계를 반환합니다.
## 플로우 상태 관리
상태를 효과적으로 관리하는 것은 신뢰할 수 있고 유지 보수가 용이한 AI 워크플로를 구축하는 데 매우 중요합니다. CrewAI 플로우는 비정형 및 정형 상태 관리를 위한 강력한 메커니즘을 제공하여, 개발자가 자신의 애플리케이션에 가장 적합한 접근 방식을 선택할 수 있도록 합니다.

View File

@@ -161,6 +161,18 @@ crew = Crew(
)
```
<Note>
`agent.i18n`은 이전 버전과의 호환성을 위해서만 유지되며 사용이 중단될 예정입니다. 런타임 프롬프트 커스터마이징에는 `Crew`에 `prompt_file`을 전달하세요. 프롬프트 슬라이스를 코드에서 직접 읽어야 한다면 i18n 유틸리티를 직접 사용하세요:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### 옵션 3: o1 모델에 대한 시스템 프롬프트 비활성화
```python
agent = Agent(
@@ -208,6 +220,8 @@ agent = Agent(
그러면 CrewAI가 기본값과 사용자가 지정한 내용을 병합하므로, 모든 프롬프트를 다시 정의할 필요가 없습니다. 방법은 다음과 같습니다:
프롬프트 슬라이스를 코드에서 직접 읽어야 하는 경우에는 `agent.i18n`을 읽는 대신 동일한 프롬프트 파일로 `crewai.utilities.i18n.get_i18n()`을 사용하세요.
### 예시: 기본 프롬프트 커스터마이징
수정하고 싶은 프롬프트를 포함하는 `custom_prompts.json` 파일을 생성하세요. 변경 사항만이 아니라 포함해야 하는 모든 최상위 프롬프트를 반드시 나열해야 합니다:
@@ -314,4 +328,4 @@ CrewAI에서의 저수준 prompt 커스터마이제이션은 매우 맞춤화되
<Check>
이제 CrewAI에서 고급 prompt 커스터마이징을 위한 기초를 갖추었습니다. 모델별 구조나 도메인별 제약에 맞춰 적용하든, 이러한 저수준 접근 방식은 agent 상호작용을 매우 전문적으로 조정할 수 있게 해줍니다.
</Check>
</Check>

View File

@@ -4,6 +4,106 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="11 jun 2026">
## v1.14.7
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
## O que Mudou
### Recursos
- Adicionar backends padrão plugáveis para memória, conhecimento, rag e fluxo.
- Exibir o verdadeiro finish_reason, parâmetros de amostragem e response.id em eventos LLM.
- Tipar os gatilhos DSL como decoradores cientes de rotas.
- Adicionar API de chat para fluxos de conversa.
- Tornar o backend de bloqueio substituível.
- Construir FlowDefinition a partir de metadados Flow DSL.
- Adicionar provedor nativo Snowflake Cortex LLM.
- Adicionar suporte a arquivos de agentes treinados pela equipe.
### Correções de Bugs
- Corrigir checkpoint para reconstruir BaseLLM personalizado como LLM concreto na restauração.
- Controlar a restauração com uma flag para evitar que snapshots ao vivo sejam reproduzidos como retomar.
- Escopar o estado de execução por execução para limitar o crescimento e isolar execuções concorrentes.
- Corrigir configuração de telemetria no crewai-login.
- Respeitar suppress_flow_events para eventos de execução de método.
- Restaurar [project.scripts] no pacote crewai para instalação da ferramenta uv.
- Resolver CVEs de pip-audit para aiohttp, docling e docling-core.
- Corrigir entrada de arquivo que não estava funcionando de forma confiável.
- Corrigir histórias de resultados de ferramentas incompletas do Snowflake Claude.
### Documentação
- Atualizar changelog e versão para v1.14.7.
- Atualizar documentação do coletor OpenTelemetry.
- Atualizar guia do LLM NVIDIA Nemotron.
- Adicionar guia de integração do Databricks.
- Adicionar guia de integração do Snowflake.
### Desempenho
- Melhorar a velocidade de importação do crewai através do carregamento preguiçoso de imports do docling.
### Refatoração
- Simplificar a avaliação de condições de fluxo para ser sem estado por evento.
- Desacoplar a lógica de conversa da execução e adicionar uma conversational_definition.
- Dividir `flow.py` em DSL, definição e execução.
## Contribuidores
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
</Update>
<Update label="10 jun 2026">
## v1.14.7rc2
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc2)
## O que Mudou
### Correções de Bugs
- Restauração de portão em uma flag para evitar que snapshots ao vivo sejam reproduzidos como retomar
### Documentação
- Atualizar changelog e versão para v1.14.7rc1
## Contributors
@greysonlalonde
</Update>
<Update label="10 jun 2026">
## v1.14.7rc1
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## O que Mudou
### Recursos
- Adicionar `reset_runtime_state` para liberar o estado acumulado do barramento
- Lidar com suporte a ambos os prompts personalizados
- Desacoplar a lógica de conversa do tempo de execução e adicionar uma `conversational_definition`
### Correções de Bugs
- Corrigir o escopo do estado de tempo de execução por execução para limitar o crescimento e isolar execuções concorrentes
- Corrigir a configuração de telemetria em `crewai-login`
- Corrigir o respeito a `suppress_flow_events` para eventos de execução de método
### Documentação
- Atualizar imagens do OpenTelemetry
- Atualizar a documentação para refletir o novo estado do coletor OpenTelemetry
- Atualizar o changelog e a versão para v1.14.7a4
### Refatoração
- Simplificar a avaliação da condição de fluxo para ser sem estado por evento
- Melhorar o ciclo de roteamento de conversas com uma rota a menos
## Contribuidores
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="09 jun 2026">
## v1.14.7a4

View File

@@ -219,6 +219,49 @@ Após o término da execução, é possível acessar o estado final e observar a
Ao garantir que a saída do método final seja retornada e oferecer acesso ao estado, o CrewAI Flows facilita a integração dos resultados dos seus workflows de IA em aplicações maiores,
além de permitir o gerenciamento e o acesso ao estado durante toda a execução do Flow.
## Métricas de Uso do Flow
Após a execução de um Flow, você pode acessar a propriedade `usage_metrics` para visualizar o consumo agregado de tokens em **todas as chamadas de LLM** realizadas durante a execução — incluindo chamadas das Crews orquestradas pelo Flow, chamadas dentro de tools de Agents, e invocações diretas de `LLM.call(...)` feitas a partir de métodos do Flow. Esse é o equivalente, do lado do SDK, ao total exibido na interface do CrewAI Enterprise.
```python Code
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
class UsageMetricsFlow(Flow):
@start()
def run_first_crew(self):
self.state.first_result = FirstCrew().crew().kickoff()
@listen(run_first_crew)
def call_llm_directly(self):
# Chamada direta de LLM — também contabilizada por flow.usage_metrics
llm = LLM(model="openai/gpt-4o-mini")
self.state.summary = llm.call("Resuma os principais pontos.")
@listen(call_llm_directly)
def run_second_crew(self):
self.state.second_result = SecondCrew().crew().kickoff()
flow = UsageMetricsFlow()
flow.kickoff()
print(flow.usage_metrics)
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
# cached_prompt_tokens=0, reasoning_tokens=0,
# cache_creation_tokens=0, successful_requests=5)
```
<Note>
`flow.usage_metrics` **não** é o mesmo que `flow.kickoff().token_usage`. Este
último retorna apenas o `CrewOutput.token_usage` do **último** método
`@listen` que retornou um `CrewOutput`, ou seja, reflete somente a Crew
final e ignora completamente as Crews anteriores e quaisquer chamadas
diretas de `LLM.call(...)`. Use `flow.usage_metrics` sempre que precisar do
rollup **completo** de tokens da execução do Flow.
</Note>
Cada campo do [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) retornado representa a soma de todas as chamadas de LLM feitas em uma única invocação de `flow.kickoff()`. Os contadores são resetados a cada novo `kickoff()` (e em cada iteração de `kickoff_for_each`), de modo que execuções sucessivas não duplicam o total. A propriedade é segura para ser lida em qualquer momento após o `kickoff()`; lê-la durante a execução retorna o total parcial acumulado até aquele instante.
## Gerenciamento de Estado em Flows
Gerenciar o estado de forma eficaz é fundamental para construir fluxos de trabalho de IA confiáveis e de fácil manutenção. O CrewAI Flows oferece mecanismos robustos para o gerenciamento de estado tanto não estruturado quanto estruturado,

View File

@@ -161,6 +161,18 @@ crew = Crew(
)
```
<Note>
`agent.i18n` é mantido apenas para compatibilidade retroativa e está obsoleto. Para customização de prompts em tempo de execução, passe `prompt_file` para `Crew`. Para acesso programático aos slices de prompt, use diretamente o utilitário de i18n:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### Opção 3: Desativar Prompts de Sistema para Modelos o1
```python
agent = Agent(
@@ -208,6 +220,8 @@ Uma abordagem direta é criar um arquivo JSON para os prompts que deseja sobresc
O CrewAI então mescla suas customizações com os padrões, assim você não precisa redefinir todos os prompts. Veja como:
Para código que precisa ler slices de prompt diretamente, use `crewai.utilities.i18n.get_i18n()` com o mesmo arquivo de prompts em vez de ler `agent.i18n`.
### Exemplo: Customização Básica de Prompt
Crie um arquivo `custom_prompts.json` com os prompts que deseja modificar. Certifique-se de listar todos os prompts de nível superior que ele deve conter, não apenas suas alterações:

View File

@@ -8,7 +8,7 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7a4",
"crewai-core==1.14.7",
"click>=8.1.7,<9",
"pydantic>=2.11.9,<2.13",
"pydantic-settings~=2.10.1",

View File

@@ -1 +1 @@
__version__ = "1.14.7a4"
__version__ = "1.14.7"

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7a4"
"crewai[tools]==1.14.7"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7a4"
"crewai[tools]==1.14.7"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7a4"
"crewai[tools]==1.14.7"
]
[tool.crewai]

View File

@@ -1 +1 @@
__version__ = "1.14.7a4"
__version__ = "1.14.7"

View File

@@ -17,7 +17,7 @@ import contextlib
import logging
import os
import threading
from typing import Any, Final
from typing import Any, ClassVar, Final
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
@@ -27,7 +27,7 @@ from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import Span, Status, StatusCode
from opentelemetry.trace import ProxyTracerProvider, Span, Status, StatusCode
from typing_extensions import Self
@@ -72,8 +72,8 @@ class Telemetry:
and event-bus signal handlers (see ``crewai.telemetry.telemetry``).
"""
_instance = None
_lock = threading.Lock()
_instance: ClassVar[Self | None] = None
_lock: ClassVar[threading.Lock] = threading.Lock()
def __new__(cls) -> Self:
if cls._instance is None:
@@ -149,6 +149,10 @@ class Telemetry:
if self.ready and not self.trace_set:
try:
with suppress_warnings():
existing_provider = trace.get_tracer_provider()
if not isinstance(existing_provider, ProxyTracerProvider):
self.trace_set = True
return
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception as e:

View File

@@ -14,6 +14,7 @@ from crewai_core import (
version,
)
import pytest
from opentelemetry.sdk.trace import TracerProvider
def test_version_returns_string() -> None:
@@ -94,3 +95,36 @@ def test_user_data_decline_blocks(
def test_unused_var_warning_silenced() -> None:
# Touch os to keep the import (used by env-var fixtures above)
assert os.environ is not None
def test_core_telemetry_skips_duplicate_tracer_provider(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from crewai_core.telemetry import Telemetry
Telemetry._instance = None
monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False)
monkeypatch.delenv("CREWAI_DISABLE_TELEMETRY", raising=False)
monkeypatch.delenv("CREWAI_DISABLE_TRACKING", raising=False)
monkeypatch.setattr(
"crewai_core.telemetry.trace.get_tracer_provider",
lambda: TracerProvider(),
)
called = False
def fail_if_called(provider: object) -> None:
nonlocal called
called = True
monkeypatch.setattr(
"crewai_core.telemetry.trace.set_tracer_provider",
fail_if_called,
)
telemetry = Telemetry()
telemetry.set_tracer()
assert called is False
assert telemetry.trace_set is True

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.7a4"
__version__ = "1.14.7"

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
"pytube~=15.0.0",
"requests>=2.33.0,<3",
"crewai==1.14.7a4",
"crewai==1.14.7",
"tiktoken>=0.8.0,<0.13",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -330,4 +330,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.14.7a4"
__version__ = "1.14.7"

View File

@@ -22,6 +22,31 @@ logger = logging.getLogger(__name__)
_UNSAFE_PATHS_ENV = "CREWAI_TOOLS_ALLOW_UNSAFE_PATHS"
def format_path_for_display(path: str, base_dir: str | None = None) -> str:
"""Return a path label that does not expose absolute directory prefixes."""
if base_dir is None:
base_dir = os.getcwd()
try:
resolved_base = os.path.realpath(base_dir)
resolved_path = os.path.realpath(
os.path.join(resolved_base, path) if not os.path.isabs(path) else path
)
if os.path.commonpath([resolved_base, resolved_path]) == resolved_base:
return os.path.relpath(resolved_path, resolved_base)
except (OSError, ValueError) as exc:
logger.debug("Falling back to basename for display path formatting: %s", exc)
return os.path.basename(os.path.realpath(path)) or "[redacted path]"
def format_error_for_display(error: Exception) -> str:
"""Return exception details without OS-added absolute path context."""
if isinstance(error, OSError):
return error.strerror or error.__class__.__name__
return str(error)
def _is_escape_hatch_enabled() -> bool:
"""Check if the unsafe paths escape hatch is enabled."""
return os.environ.get(_UNSAFE_PATHS_ENV, "").lower() in ("true", "1", "yes")
@@ -66,8 +91,8 @@ def validate_file_path(path: str, base_dir: str | None = None) -> str:
prefix = resolved_base if resolved_base.endswith(os.sep) else resolved_base + os.sep
if not resolved_path.startswith(prefix) and resolved_path != resolved_base:
raise ValueError(
f"Path '{path}' resolves to '{resolved_path}' which is outside "
f"the allowed directory '{resolved_base}'. "
f"Path '{format_path_for_display(resolved_path, resolved_base)}' is "
f"outside the allowed directory. "
f"Set {_UNSAFE_PATHS_ENV}=true to bypass this check."
)

View File

@@ -3,7 +3,11 @@ from typing import Any
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from crewai_tools.security.safe_path import validate_file_path
from crewai_tools.security.safe_path import (
format_error_for_display,
format_path_for_display,
validate_file_path,
)
class FileReadToolSchema(BaseModel):
@@ -58,8 +62,9 @@ class FileReadTool(BaseTool):
**kwargs: Additional keyword arguments passed to BaseTool.
"""
if file_path is not None:
display_path = format_path_for_display(file_path)
kwargs["description"] = (
f"A tool that reads file content. The default file is {file_path}, but you can provide a different 'file_path' parameter to read another file. You can also specify 'start_line' and 'line_count' to read specific parts of the file."
f"A tool that reads file content. The default file is {display_path}, but you can provide a different 'file_path' parameter to read another file. You can also specify 'start_line' and 'line_count' to read specific parts of the file."
)
super().__init__(**kwargs)
@@ -78,7 +83,12 @@ class FileReadTool(BaseTool):
if file_path is None:
return "Error: No file path provided. Please provide a file path either in the constructor or as an argument."
file_path = validate_file_path(file_path)
try:
file_path = validate_file_path(file_path)
except ValueError as e:
return f"Error: Invalid file path: {e!s}"
display_path = format_path_for_display(file_path)
try:
with open(file_path, "r") as file:
if start_line == 1 and line_count is None:
@@ -98,8 +108,11 @@ class FileReadTool(BaseTool):
return "".join(selected_lines)
except FileNotFoundError:
return f"Error: File not found at path: {file_path}"
return f"Error: File not found at path: {display_path}"
except PermissionError:
return f"Error: Permission denied when trying to read file: {file_path}"
return f"Error: Permission denied when trying to read file: {display_path}"
except Exception as e:
return f"Error: Failed to read file {file_path}. {e!s}"
return (
f"Error: Failed to read file {display_path}. "
f"{format_error_for_display(e)}"
)

View File

@@ -5,6 +5,11 @@ from typing import Any
from crewai.tools import BaseTool
from pydantic import BaseModel
from crewai_tools.security.safe_path import (
format_error_for_display,
format_path_for_display,
)
def strtobool(val: str | bool) -> bool:
if isinstance(val, bool):
@@ -44,6 +49,9 @@ class FileWriterTool(BaseTool):
# itself, since that is not a valid file target.
real_directory = Path(directory).resolve()
real_filepath = Path(filepath).resolve()
display_filepath = format_path_for_display(
str(real_filepath), str(real_directory)
)
if (
not real_filepath.is_relative_to(real_directory)
or real_filepath == real_directory
@@ -56,15 +64,18 @@ class FileWriterTool(BaseTool):
kwargs["overwrite"] = strtobool(kwargs["overwrite"])
if os.path.exists(real_filepath) and not kwargs["overwrite"]:
return f"File {real_filepath} already exists and overwrite option was not passed."
return f"File {display_filepath} already exists and overwrite option was not passed."
mode = "w" if kwargs["overwrite"] else "x"
with open(real_filepath, mode) as file:
file.write(kwargs["content"])
return f"Content successfully written to {real_filepath}"
return f"Content successfully written to {display_filepath}"
except FileExistsError:
return f"File {real_filepath} already exists and overwrite option was not passed."
return f"File {display_filepath} already exists and overwrite option was not passed."
except KeyError as e:
return f"An error occurred while accessing key: {e!s}"
except Exception as e:
return f"An error occurred while writing to the file: {e!s}"
return (
"An error occurred while writing to the file: "
f"{format_error_for_display(e)}"
)

View File

@@ -1,4 +1,3 @@
import os
from unittest.mock import mock_open, patch
from crewai_tools import FileReadTool
@@ -6,21 +5,16 @@ from crewai_tools import FileReadTool
def test_file_read_tool_constructor():
"""Test FileReadTool initialization with file_path."""
test_file = "/tmp/test_file.txt"
test_content = "Hello, World!"
with open(test_file, "w") as f:
f.write(test_content)
test_file = "test_file.txt"
tool = FileReadTool(file_path=test_file)
assert tool.file_path == test_file
assert "test_file.txt" in tool.description
os.remove(test_file)
def test_file_read_tool_run():
"""Test FileReadTool _run method with file_path at runtime."""
test_file = "/tmp/test_file.txt"
test_file = "test_file.txt"
test_content = "Hello, World!"
# Use mock_open to mock file operations
@@ -36,18 +30,18 @@ def test_file_read_tool_error_handling():
result = tool._run()
assert "Error: No file path provided" in result
result = tool._run(file_path="/nonexistent/file.txt")
result = tool._run(file_path="nonexistent/file.txt")
assert "Error: File not found at path:" in result
with patch("builtins.open", side_effect=PermissionError()):
result = tool._run(file_path="/tmp/no_permission.txt")
result = tool._run(file_path="no_permission.txt")
assert "Error: Permission denied" in result
def test_file_read_tool_constructor_and_run():
"""Test FileReadTool using both constructor and runtime file paths."""
test_file1 = "/tmp/test1.txt"
test_file2 = "/tmp/test2.txt"
test_file1 = "test1.txt"
test_file2 = "test2.txt"
content1 = "File 1 content"
content2 = "File 2 content"
@@ -64,7 +58,7 @@ def test_file_read_tool_constructor_and_run():
def test_file_read_tool_chunk_reading():
"""Test FileReadTool reading specific chunks of a file."""
test_file = "/tmp/multiline_test.txt"
test_file = "multiline_test.txt"
lines = [
"Line 1\n",
"Line 2\n",
@@ -104,7 +98,7 @@ def test_file_read_tool_chunk_reading():
def test_file_read_tool_chunk_error_handling():
"""Test error handling for chunk reading."""
test_file = "/tmp/short_test.txt"
test_file = "short_test.txt"
lines = ["Line 1\n", "Line 2\n", "Line 3\n"]
file_content = "".join(lines)
@@ -122,7 +116,7 @@ def test_file_read_tool_chunk_error_handling():
def test_file_read_tool_zero_or_negative_start_line():
"""Test that start_line values of 0 or negative read from the start of the file."""
test_file = "/tmp/negative_test.txt"
test_file = "negative_test.txt"
lines = ["Line 1\n", "Line 2\n", "Line 3\n", "Line 4\n", "Line 5\n"]
file_content = "".join(lines)
@@ -150,3 +144,45 @@ def test_file_read_tool_zero_or_negative_start_line():
result = tool._run(file_path=test_file, start_line=-10, line_count=2)
expected = "".join(lines[0:2]) # Should read first 2 lines
assert result == expected
def test_file_read_tool_error_messages_do_not_disclose_absolute_paths(
tmp_path, monkeypatch
):
"""FileReadTool should redact absolute prefixes from user-visible errors."""
monkeypatch.chdir(tmp_path)
tool = FileReadTool()
target = tmp_path / "secret.txt"
result = tool._run(file_path=str(target))
assert "secret.txt" in result
assert str(tmp_path) not in result
target.touch()
with patch("builtins.open", side_effect=PermissionError()):
result = tool._run(file_path=str(target))
assert "secret.txt" in result
assert str(tmp_path) not in result
with patch(
"builtins.open",
side_effect=OSError(5, "Input/output error", str(target)),
):
result = tool._run(file_path=str(target))
assert "secret.txt" in result
assert str(tmp_path) not in result
def test_file_read_tool_invalid_path_error_does_not_disclose_workspace(
tmp_path, monkeypatch
):
"""Validation errors should not echo the resolved workspace path."""
monkeypatch.chdir(tmp_path)
outside = tmp_path.parent / "outside.txt"
result = FileReadTool()._run(file_path=str(outside))
assert "Invalid file path" in result
assert "outside.txt" in result
assert str(tmp_path) not in result
assert str(tmp_path.parent) not in result

View File

@@ -47,6 +47,8 @@ def test_basic_file_write(tool, temp_env):
assert os.path.exists(path)
assert read_file(path) == temp_env["test_content"]
assert "successfully written" in result
assert temp_env["test_file"] in result
assert temp_env["temp_dir"] not in result
def test_directory_creation(tool, temp_env):
@@ -62,6 +64,8 @@ def test_directory_creation(tool, temp_env):
assert os.path.exists(new_dir)
assert os.path.exists(path)
assert "successfully written" in result
assert temp_env["test_file"] in result
assert new_dir not in result
@pytest.mark.parametrize(
@@ -134,6 +138,8 @@ def test_file_exists_error_handling(tool, temp_env, overwrite):
)
assert "already exists and overwrite option was not passed" in result
assert temp_env["test_file"] in result
assert temp_env["temp_dir"] not in result
assert read_file(path) == "Pre-existing content"

View File

@@ -7,6 +7,7 @@ import os
import pytest
from crewai_tools.security.safe_path import (
format_path_for_display,
validate_directory_path,
validate_file_path,
validate_url,
@@ -66,6 +67,37 @@ class TestValidateFilePath:
result = validate_file_path("/etc/passwd", str(tmp_path))
assert result == os.path.realpath("/etc/passwd")
def test_rejection_message_redacts_absolute_prefixes(self, tmp_path):
outside = tmp_path.parent / "outside.txt"
with pytest.raises(ValueError) as exc_info:
validate_file_path(str(outside), str(tmp_path))
message = str(exc_info.value)
assert "outside.txt" in message
assert str(tmp_path) not in message
assert str(tmp_path.parent) not in message
class TestFormatPathForDisplay:
"""Tests for user-visible path labels."""
def test_returns_relative_path_inside_base(self, tmp_path):
nested_file = tmp_path / "nested" / "file.txt"
nested_file.parent.mkdir()
nested_file.touch()
result = format_path_for_display(str(nested_file), str(tmp_path))
assert result == os.path.join("nested", "file.txt")
def test_redacts_absolute_prefix_outside_base(self, tmp_path):
outside_file = tmp_path.parent / "outside.txt"
result = format_path_for_display(str(outside_file), str(tmp_path))
assert result == "outside.txt"
class TestValidateDirectoryPath:
"""Tests for validate_directory_path."""

View File

@@ -8,8 +8,8 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7a4",
"crewai-cli==1.14.7a4",
"crewai-core==1.14.7",
"crewai-cli==1.14.7",
# Core Dependencies
"pydantic>=2.11.9,<2.13",
"openai>=2.30.0,<3",
@@ -54,7 +54,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.14.7a4",
"crewai-tools==1.14.7",
]
embeddings = [
"tiktoken>=0.8.0,<0.13"

View File

@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.14.7a4"
__version__ = "1.14.7"
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
"Memory": ("crewai.memory.unified_memory", "Memory"),

View File

@@ -46,6 +46,7 @@ from crewai.state.checkpoint_config import CheckpointConfig, _coerce_checkpoint
from crewai.tools.base_tool import BaseTool, Tool
from crewai.types.callback import SerializableCallable
from crewai.utilities.config import process_config
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.logger import Logger
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.string_utils import interpolate_only
@@ -81,6 +82,7 @@ _LLM_TYPE_REGISTRY: dict[str, str] = {
def _validate_llm_ref(value: Any) -> Any:
if isinstance(value, dict):
import importlib
import inspect
llm_type = value.get("llm_type")
if not llm_type or llm_type not in _LLM_TYPE_REGISTRY:
@@ -91,6 +93,12 @@ def _validate_llm_ref(value: Any) -> Any:
dotted = _LLM_TYPE_REGISTRY[llm_type]
mod_path, cls_name = dotted.rsplit(".", 1)
cls = getattr(importlib.import_module(mod_path), cls_name)
if inspect.isabstract(cls):
from crewai.llm import LLM
return LLM(
**{k: v for k, v in value.items() if v is not None and k != "llm_type"}
)
return cls(**value)
return value
@@ -186,6 +194,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
tools (list[Any] | None): Tools at the agent's disposal.
max_iter (int): Maximum iterations for an agent to execute a task.
agent_executor: An instance of the CrewAgentExecutor class.
i18n (I18N): Internationalization settings.
llm (Any): Language model that will run the agent.
crew (Any): Crew to which the agent belongs.
@@ -265,6 +274,14 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
_serialize_executor_ref, return_type=dict | None, when_used="json"
),
] = Field(default=None, description="An instance of the CrewAgentExecutor class.")
i18n: I18N = Field(
default_factory=get_i18n,
description="Internationalization settings.",
deprecated=(
"Agent.i18n is deprecated and will be removed in a future release. "
"Use crewai.utilities.i18n.get_i18n() or Crew(prompt_file=...) instead."
),
)
llm: Annotated[
str | BaseLLM | None,

View File

@@ -117,8 +117,10 @@ def capture_execution_context(
)
def apply_execution_context(ctx: ExecutionContext) -> None:
def apply_execution_context(ctx: ExecutionContext | dict[str, Any]) -> None:
"""Write an ExecutionContext back into the ContextVars."""
if isinstance(ctx, dict):
ctx = ExecutionContext.model_validate(ctx)
_current_task_id.set(ctx.current_task_id)
current_flow_request_id.set(ctx.flow_request_id)
current_flow_id.set(ctx.flow_id)

View File

@@ -1013,6 +1013,7 @@ class Crew(FlowTrackable, BaseModel):
)
token = attach(baggage_ctx)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
inputs = prepare_kickoff(self, inputs, input_files)
@@ -1048,6 +1049,7 @@ class Crew(FlowTrackable, BaseModel):
self._memory.drain_writes()
clear_files(self.id)
detach(token)
crewai_event_bus._exit_runtime_scope(runtime_scope)
def _post_kickoff(self, result: CrewOutput) -> CrewOutput:
return result
@@ -1223,6 +1225,7 @@ class Crew(FlowTrackable, BaseModel):
)
token = attach(baggage_ctx)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
inputs = prepare_kickoff(self, inputs, input_files)
@@ -1256,6 +1259,7 @@ class Crew(FlowTrackable, BaseModel):
finally:
clear_files(self.id)
detach(token)
crewai_event_bus._exit_runtime_scope(runtime_scope)
async def akickoff_for_each(
self,

View File

@@ -80,6 +80,17 @@ def is_replaying() -> bool:
return _replaying.get()
_runtime_state_var: contextvars.ContextVar[RuntimeState | None] = (
contextvars.ContextVar("crewai_runtime_state", default=None)
)
_registered_entity_ids_var: contextvars.ContextVar[set[int] | None] = (
contextvars.ContextVar("crewai_registered_entity_ids", default=None)
)
_runtime_scope_depth: contextvars.ContextVar[int] = contextvars.ContextVar(
"crewai_runtime_scope_depth", default=0
)
class CrewAIEventsBus:
"""Singleton event bus for handling events in CrewAI.
@@ -116,7 +127,6 @@ class CrewAIEventsBus:
_futures_lock: threading.Lock
_executor_initialized: bool
_has_pending_events: bool
_runtime_state: RuntimeState | None
def __new__(cls) -> Self:
"""Create or return the singleton instance.
@@ -151,8 +161,6 @@ class CrewAIEventsBus:
self._console = ConsoleFormatter()
self._executor_initialized = False
self._has_pending_events = False
self._runtime_state: RuntimeState | None = None
self._registered_entity_ids: set[int] = set()
def _ensure_executor_initialized(self) -> None:
"""Lazily initialize the thread pool executor and event loop.
@@ -281,6 +289,51 @@ class CrewAIEventsBus:
"""The RuntimeState currently attached to the bus, if any."""
return self._runtime_state
@property
def _runtime_state(self) -> RuntimeState | None:
return _runtime_state_var.get()
@_runtime_state.setter
def _runtime_state(self, value: RuntimeState | None) -> None:
_runtime_state_var.set(value)
@property
def _registered_entity_ids(self) -> set[int]:
ids = _registered_entity_ids_var.get()
if ids is None:
ids = set()
_registered_entity_ids_var.set(ids)
return ids
@_registered_entity_ids.setter
def _registered_entity_ids(self, value: set[int]) -> None:
_registered_entity_ids_var.set(value)
def reset_runtime_state(self) -> None:
"""Detach the RuntimeState and clear the entity registry."""
self._runtime_state = None
self._registered_entity_ids = set()
def _enter_runtime_scope(self) -> bool:
depth = _runtime_scope_depth.get()
_runtime_scope_depth.set(depth + 1)
if depth != 0:
return False
if _runtime_state_var.get() is None:
from crewai import RuntimeState
if RuntimeState is not None:
_runtime_state_var.set(RuntimeState(root=[]))
_registered_entity_ids_var.set(set())
return True
def _exit_runtime_scope(self, outermost: bool) -> None:
depth = _runtime_scope_depth.get()
_runtime_scope_depth.set(depth - 1 if depth > 0 else 0)
if outermost:
_runtime_state_var.set(None)
_registered_entity_ids_var.set(None)
def register_entity(self, entity: Any) -> None:
"""Add an entity to the RuntimeState, creating it if needed.
@@ -349,6 +402,7 @@ class CrewAIEventsBus:
source: Any,
event: BaseEvent,
handlers: SyncHandlerSet,
state: RuntimeState | None,
) -> None:
"""Call provided synchronous handlers.
@@ -356,8 +410,8 @@ class CrewAIEventsBus:
source: The emitting object
event: The event instance
handlers: Frozenset of sync handlers to call
state: The RuntimeState captured on the emitting context
"""
state = self._runtime_state
errors: list[tuple[SyncHandler, Exception]] = [
(handler, error)
for handler in handlers
@@ -376,6 +430,7 @@ class CrewAIEventsBus:
source: Any,
event: BaseEvent,
handlers: AsyncHandlerSet,
state: RuntimeState | None,
) -> None:
"""Asynchronously call provided async handlers.
@@ -383,8 +438,8 @@ class CrewAIEventsBus:
source: The object that emitted the event
event: The event instance
handlers: Frozenset of async handlers to call
state: The RuntimeState captured on the emitting context
"""
state = self._runtime_state
async def _call(handler: AsyncHandler) -> Any:
if _get_param_count(handler) >= 3:
@@ -399,7 +454,9 @@ class CrewAIEventsBus:
f"[CrewAIEventsBus] Async handler error in {getattr(handler, '__name__', handler)}: {result}"
)
async def _emit_with_dependencies(self, source: Any, event: BaseEvent) -> None:
async def _emit_with_dependencies(
self, source: Any, event: BaseEvent, state: RuntimeState | None
) -> None:
"""Emit an event with dependency-aware handler execution.
Handlers are grouped into execution levels based on their dependencies.
@@ -450,18 +507,18 @@ class CrewAIEventsBus:
if level_sync:
if event_type is LLMStreamChunkEvent:
self._call_handlers(source, event, level_sync)
self._call_handlers(source, event, level_sync, state)
else:
ctx = contextvars.copy_context()
future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, level_sync
ctx.run, self._call_handlers, source, event, level_sync, state
)
await asyncio.get_running_loop().run_in_executor(
None, future.result
)
if level_async:
await self._acall_handlers(source, event, level_async)
await self._acall_handlers(source, event, level_async, state)
def _register_source(self, source: Any) -> None:
"""Register the source entity in RuntimeState if applicable."""
@@ -556,21 +613,23 @@ class CrewAIEventsBus:
self._ensure_executor_initialized()
self._has_pending_events = True
state = self._runtime_state
if has_dependencies:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies(source, event),
self._emit_with_dependencies(source, event, state),
self._loop,
)
)
if sync_handlers:
if event_type is LLMStreamChunkEvent:
self._call_handlers(source, event, sync_handlers)
self._call_handlers(source, event, sync_handlers, state)
else:
ctx = contextvars.copy_context()
sync_future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, sync_handlers
ctx.run, self._call_handlers, source, event, sync_handlers, state
)
if not async_handlers:
return self._track_future(sync_future)
@@ -578,7 +637,7 @@ class CrewAIEventsBus:
if async_handlers:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers(source, event, async_handlers),
self._acall_handlers(source, event, async_handlers, state),
self._loop,
)
)
@@ -590,21 +649,22 @@ class CrewAIEventsBus:
source: Any,
event: BaseEvent,
handlers: AsyncHandlerSet,
state: RuntimeState | None,
) -> None:
"""Call async handlers with the replaying flag set on the loop thread."""
token = _replaying.set(True)
try:
await self._acall_handlers(source, event, handlers)
await self._acall_handlers(source, event, handlers, state)
finally:
_replaying.reset(token)
async def _emit_with_dependencies_replaying(
self, source: Any, event: BaseEvent
self, source: Any, event: BaseEvent, state: RuntimeState | None
) -> None:
"""Dependency-aware dispatch with the replaying flag set."""
token = _replaying.set(True)
try:
await self._emit_with_dependencies(source, event)
await self._emit_with_dependencies(source, event, state)
finally:
_replaying.reset(token)
@@ -638,12 +698,13 @@ class CrewAIEventsBus:
self._ensure_executor_initialized()
self._has_pending_events = True
state = self._runtime_state
token = _replaying.set(True)
try:
if has_dependencies:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies_replaying(source, event),
self._emit_with_dependencies_replaying(source, event, state),
self._loop,
)
)
@@ -651,7 +712,7 @@ class CrewAIEventsBus:
if sync_handlers:
ctx = contextvars.copy_context()
sync_future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, sync_handlers
ctx.run, self._call_handlers, source, event, sync_handlers, state
)
self._track_future(sync_future)
if not async_handlers:
@@ -659,7 +720,9 @@ class CrewAIEventsBus:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers_replaying(source, event, async_handlers),
self._acall_handlers_replaying(
source, event, async_handlers, state
),
self._loop,
)
)
@@ -727,7 +790,9 @@ class CrewAIEventsBus:
async_handlers = self._async_handlers.get(event_type, frozenset())
if async_handlers:
await self._acall_handlers(source, event, async_handlers)
await self._acall_handlers(
source, event, async_handlers, self._runtime_state
)
def register_handler(
self,

View File

@@ -292,7 +292,7 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None:
self._handle_trace_event("crew_kickoff_completed", source, event)
if self.batch_manager.defer_session_finalization:
if self._should_defer_session_finalization():
return
if self._nested_in_flow_execution():
return
@@ -306,7 +306,7 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None:
self._handle_trace_event("crew_kickoff_failed", source, event)
if self.batch_manager.defer_session_finalization:
if self._should_defer_session_finalization():
return
if self._nested_in_flow_execution():
return
@@ -734,7 +734,7 @@ class TraceCollectionListener(BaseEventListener):
if not self.batch_manager.is_batch_initialized():
return
# Multi-turn flows defer batch finalization to finalize_session_traces().
if self.batch_manager.defer_session_finalization:
if self._should_defer_session_finalization():
return
self.batch_manager.finalize_batch()
@@ -745,6 +745,15 @@ class TraceCollectionListener(BaseEventListener):
return current_flow_id.get() is not None
def _should_defer_session_finalization(self) -> bool:
"""True when the active trace belongs to a deferred flow session."""
from crewai.flow.flow_context import current_flow_defer_trace_finalization
return (
self.batch_manager.defer_session_finalization
or current_flow_defer_trace_finalization.get()
)
def _flow_owns_trace_batch(self) -> bool:
"""True when an in-flight conversational flow already owns the trace batch."""
if self.batch_manager.batch_owner_type == "flow":
@@ -786,7 +795,11 @@ class TraceCollectionListener(BaseEventListener):
(``current_flow_id``) to keep LLM/tool events from falling back to an
implicit crew batch.
"""
from crewai.flow.flow_context import current_flow_id, current_flow_name
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
current_flow_name,
)
flow_id = current_flow_id.get()
if flow_id is None:
@@ -802,6 +815,8 @@ class TraceCollectionListener(BaseEventListener):
}
self.batch_manager.batch_owner_type = "flow"
self.batch_manager.batch_owner_id = flow_id
if current_flow_defer_trace_finalization.get():
self.batch_manager.defer_session_finalization = True
self._initialize_batch(user_context, execution_metadata)
return True

View File

@@ -1,6 +1,6 @@
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, field_serializer
from crewai.events.base_events import BaseEvent
@@ -57,6 +57,10 @@ class MethodExecutionFailedEvent(FlowEvent):
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_serializer("error")
def _serialize_error(self, error: Exception) -> str:
return str(error)
class MethodExecutionPausedEvent(FlowEvent):
"""Event emitted when a flow method is paused waiting for human feedback.

View File

@@ -46,7 +46,9 @@ from crewai.flow.conversation import (
get_conversation_messages,
receive_user_message as _receive_user_message,
)
from crewai.flow.dsl import listen, router, start
from crewai.flow.dsl import listen, start
from crewai.flow.dsl._utils import _method_action, _set_flow_method_definition
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.utilities.types import LLMMessage
@@ -72,6 +74,15 @@ def _iter_condition_labels(condition: Any) -> set[str]:
return set()
def _conversation_start_router(func: Callable[..., Any]) -> Any:
wrapper = start()(func)
_set_flow_method_definition(
cast(Any, wrapper),
FlowMethodDefinition(do=_method_action(func), start=True, router=True),
)
return wrapper
class _ConversationalMixin:
"""Experimental conversational graph for ``Flow``.
@@ -85,10 +96,7 @@ class _ConversationalMixin:
conversational: ClassVar[bool] = False
conversational_config: ClassVar[ConversationConfig | None] = None
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
internal_routes: ClassVar[tuple[str, ...]] = (
"answer_from_history",
"conversation_start",
)
internal_routes: ClassVar[tuple[str, ...]] = ("answer_from_history",)
builtin_route_descriptions: ClassVar[dict[str, str]] = {
"converse": (
"Ordinary chat, follow-ups, summaries, clarifications, and "
@@ -138,23 +146,24 @@ class _ConversationalMixin:
def kickoff(self, *args: Any, **kwargs: Any) -> Any:
pass
@start()
@_conversational_only
def conversation_start(self) -> str | None:
"""Internal Flow entrypoint that hands the user message to the router.
"""Return the current user message for conversational route selection.
In conversational mode, ``Flow.kickoff_async`` runs all ``@start``
methods sequentially and this one is registered last, so any user
``@start`` methods (e.g. permission loading) have already finished
before the returned value triggers ``route_conversation``.
This remains as a plain overridable helper for compatibility. It is not
registered as a Flow method; ``route_conversation`` is the synthetic
built-in start/router that begins a conversational turn.
"""
state = cast(ConversationState, self.state)
return state.current_user_message
@router(conversation_start)
@_conversation_start_router
@_conversational_only
def route_conversation(self) -> str:
"""Route the current turn to a listener label."""
if "conversation_start" not in {
str(method_name) for method_name in self._completed_methods
}:
self.conversation_start()
state = cast(ConversationState, self.state)
context = self.build_router_context()
previous_intent = state.last_intent
@@ -651,16 +660,16 @@ class _ConversationalMixin:
if not type(self)._is_conversational():
return start_methods, False
conversation_start = "conversation_start"
if conversation_start not in {str(method) for method in start_methods}:
route_conversation = "route_conversation"
if route_conversation not in {str(method) for method in start_methods}:
return start_methods, False
ordered_starts = [
method for method in start_methods if str(method) != conversation_start
method for method in start_methods if str(method) != route_conversation
]
ordered_starts.append(
next(
method for method in start_methods if str(method) == conversation_start
method for method in start_methods if str(method) == route_conversation
)
)
return ordered_starts, True
@@ -1047,12 +1056,15 @@ class _ConversationalMixin:
trace_listener = TraceCollectionListener()
batch_manager = trace_listener.batch_manager
if batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
batch_manager.finalize_batch()
try:
if batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
batch_manager.finalize_batch()
finally:
batch_manager.defer_session_finalization = False
__all__ = ["_ConversationalMixin"]

View File

@@ -39,9 +39,7 @@ class FlowConversationalDefinition(BaseModel):
visible_agent_outputs: list[str] | Literal["all"] | None = None
defer_trace_finalization: bool = True
builtin_routes: list[str] = Field(default_factory=lambda: ["converse", "end"])
internal_routes: list[str] = Field(
default_factory=lambda: ["answer_from_history", "conversation_start"]
)
internal_routes: list[str] = Field(default_factory=lambda: ["answer_from_history"])
__all__ = [

View File

@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -45,7 +46,11 @@ def listen(condition: FlowTrigger) -> FlowMethodDecorator:
wrapper = ListenMethod(func)
_set_flow_method_definition(
wrapper, FlowMethodDefinition(listen=_to_definition_condition(condition))
wrapper,
FlowMethodDefinition(
do=_method_action(func),
listen=_to_definition_condition(condition),
),
)
return wrapper

View File

@@ -19,6 +19,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -148,6 +149,7 @@ def router(
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
do=_method_action(func),
listen=_to_definition_condition(condition),
router=True,
emit=router_events or None,

View File

@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -53,13 +54,17 @@ def start(
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
wrapper = StartMethod(func)
if condition is not None:
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(start=_to_definition_condition(condition)),
)
else:
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
do=_method_action(func),
start=(
_to_definition_condition(condition)
if condition is not None
else True
),
),
)
return wrapper
return cast(FlowMethodDecorator, decorator)

View File

@@ -8,6 +8,7 @@ from pydantic import BaseModel
from typing_extensions import TypeIs
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowConfigDefinition,
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
@@ -84,6 +85,10 @@ def _stamp_inherited_conversational_metadata(
return method
def _method_action(method: Any) -> FlowActionDefinition:
return FlowActionDefinition(ref=f"{method.__module__}:{method.__qualname__}")
def _set_flow_method_definition(
wrapper: FlowMethod[P, R],
definition: FlowMethodDefinition,
@@ -214,16 +219,19 @@ def _build_config_definition(
) -> FlowConfigDefinition:
config_field_names = set(FlowConfigDefinition.model_fields)
field_defaults = {
name: field.default
name: field.get_default(call_default_factory=True)
for name, field in getattr(flow_class, "model_fields", {}).items()
if name in config_field_names
}
values: dict[str, Any] = {}
for field_name, default in field_defaults.items():
value = getattr(flow_class, field_name, default)
values[field_name] = _serialize_static_value(
value, diagnostics, f"config.{field_name}"
)
if field_name == "input_provider":
values[field_name] = None if value is None else _object_ref(value)
else:
values[field_name] = _serialize_static_value(
value, diagnostics, f"config.{field_name}"
)
return FlowConfigDefinition(**values)
@@ -313,7 +321,7 @@ def _build_conversational_definition(
internal_routes = getattr(
flow_class,
"internal_routes",
("answer_from_history", "conversation_start"),
("answer_from_history",),
)
if config is None:
return FlowConversationalDefinition(
@@ -373,9 +381,11 @@ def _build_method_definition(
) -> FlowMethodDefinition:
fragment = _get_flow_method_definition(method)
if fragment is None:
method_definition = FlowMethodDefinition()
method_definition = FlowMethodDefinition(do=_method_action(method))
else:
method_definition = fragment.model_copy(deep=True)
method_definition = fragment.model_copy(
deep=True, update={"do": _method_action(method)}
)
human_feedback = _build_human_feedback_definition(
method, diagnostics, f"{path}.human_feedback"

View File

@@ -15,6 +15,10 @@ current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_id", default=None
)
current_flow_defer_trace_finalization: contextvars.ContextVar[bool] = (
contextvars.ContextVar("flow_defer_trace_finalization", default=False)
)
current_flow_method_name: contextvars.ContextVar[str] = contextvars.ContextVar(
"flow_method_name", default="unknown"
)

View File

@@ -27,6 +27,7 @@ logger = logging.getLogger(__name__)
FlowDefinitionCondition = str | dict[str, Any]
__all__ = [
"FlowActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
@@ -52,8 +53,9 @@ class FlowDefinitionDiagnostic(BaseModel):
class FlowStateDefinition(BaseModel):
"""Static description of a Flow state contract."""
type: TypingLiteral["dict", "pydantic", "unknown"] = "dict"
type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict"
ref: str | None = None
json_schema: dict[str, Any] | None = None
default: Any = None
@@ -62,10 +64,12 @@ class FlowConfigDefinition(BaseModel):
tracing: bool | None = None
stream: bool = False
memory: Any = None
input_provider: Any = None
memory: dict[str, Any] | None = None
input_provider: str | None = None
suppress_flow_events: bool = False
max_method_calls: int = 100
defer_trace_finalization: bool = False
checkpoint: bool | dict[str, Any] | None = None
class FlowPersistenceDefinition(BaseModel):
@@ -73,7 +77,7 @@ class FlowPersistenceDefinition(BaseModel):
enabled: bool = False
verbose: bool = False
persistence: Any = None
persistence: dict[str, Any] | None = None
class FlowHumanFeedbackDefinition(BaseModel):
@@ -90,9 +94,17 @@ class FlowHumanFeedbackDefinition(BaseModel):
learn_strict: bool = False
class FlowActionDefinition(BaseModel):
"""What a Flow method node executes, independent of when it fires."""
call: TypingLiteral["code"] = "code"
ref: str
class FlowMethodDefinition(BaseModel):
"""Static definition of one Flow method and its execution roles."""
do: FlowActionDefinition
start: bool | FlowDefinitionCondition | None = None
listen: FlowDefinitionCondition | None = None
router: bool = False
@@ -116,7 +128,9 @@ class FlowDefinition(BaseModel):
model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True)
schema_: str = Field(default="crewai.flow/v1", alias="schema")
schema_: TypingLiteral["crewai.flow/v1"] = Field(
default="crewai.flow/v1", alias="schema"
)
name: str
description: str | None = None
state: FlowStateDefinition | None = None

View File

@@ -24,12 +24,10 @@ Example:
from __future__ import annotations
import asyncio
from collections.abc import Callable
import functools
import logging
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any, Final, TypeVar, cast
from typing import TYPE_CHECKING, Any, Final, TypeVar
from crewai_core.printer import PRINTER
from pydantic import BaseModel
@@ -39,7 +37,7 @@ from crewai.flow.persistence.factory import default_flow_persistence
if TYPE_CHECKING:
from crewai.flow.flow import Flow
from crewai.flow.runtime import Flow
logger = logging.getLogger(__name__)
@@ -66,14 +64,6 @@ def _stamp_persistence_metadata(
)
_PRESERVED_FLOW_ATTRS: Final[tuple[str, ...]] = (
"__human_feedback_config__",
"__flow_persistence_config__",
"__flow_method_definition__",
"_human_feedback_llm",
)
class PersistenceDecorator:
"""Class to handle flow state persistence with consistent logging."""
@@ -164,6 +154,10 @@ def persist(
states. When applied at the method level, it persists only that method's
state.
The decorator is a pure metadata stamper: it records the persistence
configuration on the class or method, and the Flow engine saves state
after each persisted method completes, driven by the flow's definition.
Args:
persistence: Optional FlowPersistence implementation to use.
If not provided, uses ``default_flow_persistence()`` (the
@@ -193,120 +187,10 @@ def persist(
if isinstance(target, type):
_stamp_persistence_metadata(target, actual_persistence, verbose)
original_init = target.__init__ # type: ignore[misc]
@functools.wraps(original_init)
def new_init(self: Any, *args: Any, **kwargs: Any) -> None:
if "persistence" not in kwargs:
kwargs["persistence"] = actual_persistence
original_init(self, *args, **kwargs)
target.__init__ = new_init # type: ignore[misc]
# Preserve original methods' decorators
original_methods = {
name: method
for name, method in target.__dict__.items()
if callable(method)
and (
hasattr(method, "__is_flow_method__")
or hasattr(method, "__flow_method_definition__")
)
}
for name, method in original_methods.items():
if asyncio.iscoroutinefunction(method):
# Closure captures the current name and method
def create_async_wrapper(
method_name: str, original_method: Callable[..., Any]
) -> Callable[..., Any]:
@functools.wraps(original_method)
async def method_wrapper(
self: Any, *args: Any, **kwargs: Any
) -> Any:
result = await original_method(self, *args, **kwargs)
PersistenceDecorator.persist_state(
self, method_name, actual_persistence, verbose
)
return result
return method_wrapper
wrapped = create_async_wrapper(name, method)
for attr in _PRESERVED_FLOW_ATTRS:
if hasattr(method, attr):
setattr(wrapped, attr, getattr(method, attr))
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
setattr(target, name, wrapped)
else:
def create_sync_wrapper(
method_name: str, original_method: Callable[..., Any]
) -> Callable[..., Any]:
@functools.wraps(original_method)
def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
result = original_method(self, *args, **kwargs)
PersistenceDecorator.persist_state(
self, method_name, actual_persistence, verbose
)
return result
return method_wrapper
wrapped = create_sync_wrapper(name, method)
for attr in _PRESERVED_FLOW_ATTRS:
if hasattr(method, attr):
setattr(wrapped, attr, getattr(method, attr))
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
setattr(target, name, wrapped)
return target
method = target
method.__is_flow_method__ = True # type: ignore[attr-defined]
_stamp_persistence_metadata(method, actual_persistence, verbose)
if asyncio.iscoroutinefunction(method):
@functools.wraps(method)
async def method_async_wrapper(
flow_instance: Any, *args: Any, **kwargs: Any
) -> T:
method_coro = method(flow_instance, *args, **kwargs)
if asyncio.iscoroutine(method_coro):
result = await method_coro
else:
result = method_coro
PersistenceDecorator.persist_state(
flow_instance, method.__name__, actual_persistence, verbose
)
return cast(T, result)
for attr in _PRESERVED_FLOW_ATTRS:
if hasattr(method, attr):
setattr(method_async_wrapper, attr, getattr(method, attr))
method_async_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
_stamp_persistence_metadata(
method_async_wrapper, actual_persistence, verbose
)
return cast(Callable[..., T], method_async_wrapper)
@functools.wraps(method)
def method_sync_wrapper(flow_instance: Any, *args: Any, **kwargs: Any) -> T:
result = method(flow_instance, *args, **kwargs)
PersistenceDecorator.persist_state(
flow_instance, method.__name__, actual_persistence, verbose
)
return result
for attr in _PRESERVED_FLOW_ATTRS:
if hasattr(method, attr):
setattr(method_sync_wrapper, attr, getattr(method, attr))
method_sync_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
_stamp_persistence_metadata(method_sync_wrapper, actual_persistence, verbose)
return cast(Callable[..., T], method_sync_wrapper)
target.__is_flow_method__ = True # type: ignore[attr-defined]
_stamp_persistence_metadata(target, actual_persistence, verbose)
return target
return decorator

View File

@@ -22,6 +22,7 @@ from concurrent.futures import Future, ThreadPoolExecutor
import contextvars
import copy
import enum
import importlib
import inspect
import logging
import threading
@@ -84,12 +85,20 @@ from crewai.events.types.flow_events import (
MethodExecutionPausedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import LLMCallCompletedEvent
from crewai.flow.dsl._utils import build_flow_definition
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
current_flow_name,
current_flow_request_id,
)
from crewai.flow.flow_definition import (
FlowDefinition,
FlowDefinitionCondition,
FlowMethodDefinition,
FlowPersistenceDefinition,
FlowStateDefinition,
)
from crewai.flow.flow_wrappers import (
FlowMethod,
@@ -100,6 +109,7 @@ from crewai.flow.flow_wrappers import (
from crewai.flow.human_feedback import HumanFeedbackResult
from crewai.flow.input_provider import InputProvider
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.runtime._action_resolvers import resolve_action
from crewai.flow.types import (
FlowExecutionData,
FlowMethodName,
@@ -124,6 +134,7 @@ if TYPE_CHECKING:
from crewai.flow.visualization import build_flow_structure, render_interactive
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.env import get_env_context
from crewai.utilities.streaming import (
TaskInfo,
@@ -164,6 +175,57 @@ def _condition_satisfied(condition: FlowDefinitionCondition, events: set[str]) -
return combine(_condition_satisfied(branch, events) for branch in branches)
def _build_definition_state_model(
state_definition: FlowStateDefinition,
) -> BaseModel | None:
kwargs = (
dict(state_definition.default)
if isinstance(state_definition.default, dict)
else {}
)
model_class: type[BaseModel] | None = None
if state_definition.ref:
try:
module_name, _, qualname = state_definition.ref.partition(":")
resolved: Any = importlib.import_module(module_name)
for part in qualname.split("."):
resolved = getattr(resolved, part)
except Exception:
logger.warning(
"Could not import state ref %r", state_definition.ref, exc_info=True
)
else:
if isinstance(resolved, type) and issubclass(resolved, BaseModel):
model_class = resolved
else:
logger.warning(
"State ref %r is not a pydantic model", state_definition.ref
)
if model_class is None and state_definition.json_schema:
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
try:
model_class = create_model_from_schema(state_definition.json_schema)
except Exception:
logger.warning(
"Could not build a state model from the declared json_schema",
exc_info=True,
)
if model_class is None:
return None
if not issubclass(model_class, FlowState):
class StateWithId(FlowState, model_class): # type: ignore[misc, valid-type]
pass
model_class = StateWithId
return model_class(**kwargs)
def _iter_condition_events(condition: FlowDefinitionCondition) -> Iterator[str]:
if isinstance(condition, str):
yield condition
@@ -196,6 +258,16 @@ def _is_multi_event_or(
return operator == "or" and len(branches) > 1
def _usage_dict_to_metrics(usage: dict[str, Any] | None) -> UsageMetrics | None:
"""Normalize an LLM call's raw usage dict into ``UsageMetrics``.
Thin wrapper around ``UsageMetrics.from_provider_dict`` so the flow
aggregator and ``BaseLLM._track_token_usage_internal`` agree on the
set of provider key aliases (LiteLLM, Anthropic, Gemini).
"""
return UsageMetrics.from_provider_dict(usage)
def _resolve_persistence(value: Any) -> Any:
if value is None or isinstance(value, FlowPersistence):
return value
@@ -223,9 +295,12 @@ def _serialize_persistence(value: Any) -> dict[str, Any] | None:
def _validate_input_provider(value: Any) -> Any:
if value is None or isinstance(value, InputProvider):
return value
from crewai.types.callback import _dotted_path_to_instance
if isinstance(value, str) and ":" in value:
resolved = _resolve_input_provider_ref(value)
else:
from crewai.types.callback import _dotted_path_to_instance
resolved = _dotted_path_to_instance(value)
resolved = _dotted_path_to_instance(value)
if resolved is None or isinstance(resolved, InputProvider):
return resolved
raise ValueError(
@@ -234,6 +309,15 @@ def _validate_input_provider(value: Any) -> Any:
)
def _resolve_input_provider_ref(ref: str) -> Any:
from crewai.flow.runtime._action_resolvers import import_ref
target = import_ref(ref)
if inspect.isclass(target):
return target()
return target
def _serialize_input_provider(value: Any) -> str | None:
if value is None:
return None
@@ -690,21 +774,27 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return flow_definition
@classmethod
def _start_method_names(cls) -> list[FlowMethodName]:
def from_definition(cls, definition: FlowDefinition) -> Flow[Any]:
"""Build a runnable Flow directly from a definition; no subclass required."""
return cls.model_validate(
definition.config.model_dump(),
context={"flow_definition": definition},
)
def _start_method_names(self) -> list[FlowMethodName]:
return [
FlowMethodName(method_name)
for method_name, method_definition in cls.flow_definition().methods.items()
for method_name, method_definition in self._definition.methods.items()
if method_definition.is_start
]
@classmethod
def _listener_methods(
cls,
self,
) -> Iterator[tuple[FlowMethodName, FlowMethodDefinition, FlowDefinitionCondition]]:
# (name, definition, condition) for every non-start method that listens.
# Routers are included (they listen too); callers wanting only plain
# listeners filter on definition.router.
for method_name, method_definition in cls.flow_definition().methods.items():
for method_name, method_definition in self._definition.methods.items():
if method_definition.listen is not None and not method_definition.is_start:
yield (
FlowMethodName(method_name),
@@ -712,25 +802,22 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
method_definition.listen,
)
@classmethod
def _start_condition(
cls, method_name: FlowMethodName
self, method_name: FlowMethodName
) -> FlowDefinitionCondition | None:
method_definition = cls.flow_definition().methods[str(method_name)]
method_definition = self._definition.methods[str(method_name)]
start = method_definition.start
if isinstance(start, (str, dict)):
return start
return None
@classmethod
def _listen_condition(
cls, method_name: FlowMethodName
self, method_name: FlowMethodName
) -> FlowDefinitionCondition | None:
return cls.flow_definition().methods[str(method_name)].listen
return self._definition.methods[str(method_name)].listen
@classmethod
def _is_router(cls, method_name: FlowMethodName) -> bool:
return cls.flow_definition().methods[str(method_name)].router
def _is_router(self, method_name: FlowMethodName) -> bool:
return self._definition.methods[str(method_name)].router
initial_state: Annotated[ # type: ignore[type-arg]
type[BaseModel] | type[dict] | dict[str, Any] | BaseModel | None,
@@ -857,6 +944,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self._completed_methods = {
FlowMethodName(m) for m in self.checkpoint_completed_methods
}
self._restored_from_checkpoint = True
if self.checkpoint_method_outputs is not None:
self._method_outputs = list(self.checkpoint_method_outputs)
if self.checkpoint_method_counts is not None:
@@ -873,7 +961,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
restore_event_scope(())
reset_last_event_id()
_methods: dict[FlowMethodName, FlowMethod[Any, Any]] = PrivateAttr(
_methods: dict[FlowMethodName, Callable[..., Any]] = PrivateAttr(
default_factory=dict
)
_method_execution_counts: dict[FlowMethodName, int] = PrivateAttr(
@@ -887,17 +975,25 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
PrivateAttr(default=None)
)
_method_outputs: list[Any] = PrivateAttr(default_factory=list)
_definition: FlowDefinition = PrivateAttr()
_state_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_or_listeners_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_completed_methods: set[FlowMethodName] = PrivateAttr(default_factory=set)
_method_call_counts: dict[FlowMethodName, int] = PrivateAttr(default_factory=dict)
_is_execution_resuming: bool = PrivateAttr(default=False)
_restored_from_checkpoint: bool = PrivateAttr(default=False)
_event_futures: list[Future[None]] = PrivateAttr(default_factory=list)
_pending_feedback_context: PendingFeedbackContext | None = PrivateAttr(default=None)
_human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict)
_input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list)
_state: Any = PrivateAttr(default=None)
_deferred_flow_started_event_id: str | None = PrivateAttr(default=None)
_aggregated_usage_metrics: UsageMetrics = PrivateAttr(default_factory=UsageMetrics)
_usage_metrics_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_flow_match_id: str | None = PrivateAttr(default=None)
_usage_aggregation_handler: Callable[..., Any] | None = PrivateAttr(default=None)
_persist_backends: dict[int, FlowPersistence] = PrivateAttr(default_factory=dict)
_instance_persistence: bool = PrivateAttr(default=False)
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
class _FlowGeneric(cls): # type: ignore[valid-type,misc]
@@ -915,15 +1011,36 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
object.__setattr__(self, name, value)
def model_post_init(self, __context: Any) -> None:
self._flow_post_init()
definition = (
__context.get("flow_definition") if isinstance(__context, dict) else None
)
self._flow_post_init(definition)
def _flow_post_init(self) -> None:
def _flow_post_init(self, definition: FlowDefinition | None = None) -> None:
"""Heavy initialization: state creation, events, memory, method registration."""
if getattr(self, "_flow_post_init_done", False):
return
object.__setattr__(self, "_flow_post_init_done", True)
self._initialize_runtime_extension_attrs()
self._definition = definition or type(self).flow_definition()
if self.name and self.name != self._definition.name:
self._definition = self._definition.model_copy(update={"name": self.name})
methods = (
self._action_bound_methods()
if definition is not None
else self._class_bound_methods()
)
flow_persist = self._definition.persist
self._instance_persistence = self.persistence is not None
if (
self.persistence is None
and flow_persist is not None
and flow_persist.enabled
):
self.persistence = self._persist_backend_for(flow_persist)
if self._state is None:
self._state = self._create_initial_state()
@@ -938,7 +1055,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowCreatedEvent(
type="flow_created",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
),
)
@@ -948,17 +1065,107 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if self.memory is None and not getattr(self, "_skip_auto_memory", False):
from crewai.memory.utils import sanitize_scope_name
flow_name = sanitize_scope_name(self.name or self.__class__.__name__)
flow_name = sanitize_scope_name(self._definition.name)
self.memory = Memory(root_scope=f"/flow/{flow_name}")
# Build the runtime method lookup from the static FlowDefinition.
for method_name in type(self).flow_definition().methods:
self._methods.update(methods)
def _action_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
def resolve(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]:
try:
return resolve_action(self, definition.do)
except Exception as e:
unresolved.append(f"{name}: {e}")
return lambda *args, **kwargs: None
methods: dict[FlowMethodName, Callable[..., Any]] = {}
unresolved: list[str] = []
for method_name, method_definition in self._definition.methods.items():
methods[FlowMethodName(method_name)] = resolve(
method_name, method_definition
)
if unresolved:
raise ValueError(
f"Cannot build flow {self._definition.name!r} from its definition; "
"methods with unresolvable actions: " + "; ".join(unresolved)
)
return methods
def _class_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
methods: dict[FlowMethodName, Callable[..., Any]] = {}
for method_name in self._definition.methods:
method = getattr(self, method_name, None)
if method is None:
continue
if not hasattr(method, "__self__"):
method = method.__get__(self, self.__class__)
self._methods[FlowMethodName(method_name)] = method
method = method.__get__(self, type(self))
methods[FlowMethodName(method_name)] = method
return methods
def _attach_usage_aggregation_listener(self) -> None:
"""Wire an ``LLMCallCompletedEvent`` listener for the duration of one
``kickoff_async`` call.
"""
if self._usage_aggregation_handler is not None:
return
# Capture the accumulator object in the closure so a stale handler
# still queued in the bus thread pool from a prior kickoff writes
# into its own (orphaned) UsageMetrics instead of the next kickoff's
# fresh one.
accumulator = self._aggregated_usage_metrics
match_id = self._flow_match_id
lock = self._usage_metrics_lock
def _accumulate(source: Any, event: LLMCallCompletedEvent) -> None:
if current_flow_id.get() != match_id:
return
metrics = _usage_dict_to_metrics(event.usage)
if metrics is None:
return
with lock:
accumulator.add_usage_metrics(metrics)
crewai_event_bus.on(LLMCallCompletedEvent)(_accumulate)
self._usage_aggregation_handler = _accumulate
def _detach_usage_aggregation_listener(self) -> None:
handler = self._usage_aggregation_handler
if handler is None:
return
crewai_event_bus.off(LLMCallCompletedEvent, handler)
self._usage_aggregation_handler = None
@property
def usage_metrics(self) -> UsageMetrics:
"""Aggregated LLM token usage for the most recent kickoff (or
resume) of this flow instance.
Aggregation is correlated by the ``current_flow_id`` contextvar
captured at kickoff time. Nested kickoffs (a parent flow calling
a child flow's ``kickoff``) intentionally roll the child's
tokens up into the parent because the contextvar is inherited.
Sibling kickoffs that run in parallel under the same parent
contextvar share the same correlation id and may therefore
over-count each other; if you need strict per-flow isolation
in that pattern, run the children in separate tasks that
explicitly set their own ``current_flow_id`` before kickoff.
LLM calls that complete without exposing token usage (e.g.
structured-output / Instructor paths) are not counted in
``successful_requests`` either, since we never see the call's
token data the metric stays a faithful summary of usage we
actually observed rather than a partial count.
Cross-process pause/resume (``Flow.from_pending`` in a new
process) starts aggregation from zero on the restored instance
because pre-pause totals are not yet persisted alongside the
pending feedback context. Same-process pause/resume where the
caller keeps the flow instance and calls ``resume`` on it
preserves the running totals end-to-end.
"""
with self._usage_metrics_lock:
return self._aggregated_usage_metrics.model_copy()
def recall(self, query: str, **kwargs: Any) -> Any:
"""Recall relevant memories. Delegates to this flow's memory.
@@ -1036,7 +1243,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
def _start_condition_triggered_by(
self, method_name: FlowMethodName, trigger: FlowMethodName
) -> bool:
condition = type(self)._start_condition(method_name)
condition = self._start_condition(method_name)
if condition is None:
return False
return self._condition_met(
@@ -1064,7 +1271,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
trigger_str = str(trigger)
to_discard: list[FlowMethodName] = []
for listener_name in candidates:
condition = type(self)._listen_condition(listener_name)
condition = self._listen_condition(listener_name)
if condition is None:
continue
if trigger_str in _iter_condition_events(condition):
@@ -1086,9 +1293,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
racing_groups: dict[frozenset[FlowMethodName], FlowMethodName] = {}
listener_conditions: dict[FlowMethodName, FlowDefinitionCondition] = {
listener_name: condition
for listener_name, method_definition, condition in type(
self
)._listener_methods()
for listener_name, method_definition, condition in self._listener_methods()
if not method_definition.router
}
@@ -1254,6 +1459,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
instance._initialize_state(state_data)
instance._pending_feedback_context = pending_context
instance._is_execution_resuming = True
# Seed the match id so the resume-phase listener filters its own
# LLM events (which run with `current_flow_id == instance.flow_id`)
# instead of dropping or absorbing unrelated ones.
instance._flow_match_id = instance.flow_id
return instance
@@ -1343,15 +1552,34 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
Raises:
ValueError: If no pending feedback context exists
"""
from datetime import datetime
from crewai.flow.human_feedback import HumanFeedbackResult
if self._pending_feedback_context is None:
raise ValueError(
"No pending feedback context. Use from_pending() to restore a paused flow."
)
# Force `current_flow_id` to this flow's match id for the
# duration of the resume so the usage listener's filter passes
# even when resume runs under another flow's active context.
flow_id_token = None
if self._flow_match_id is not None:
flow_id_token = current_flow_id.set(self._flow_match_id)
self._attach_usage_aggregation_listener()
try:
return await self._resume_async_body(feedback)
finally:
# Match kickoff_async: drain pending handlers so the resumed
# phase's LLM events all hit `_aggregated_usage_metrics`
# before the listener is detached.
crewai_event_bus.flush()
self._detach_usage_aggregation_listener()
if flow_id_token is not None:
current_flow_id.reset(flow_id_token)
async def _resume_async_body(self, feedback: str = "") -> Any:
from datetime import datetime
from crewai.flow.human_feedback import HumanFeedbackResult
if get_current_parent_id() is None:
reset_emission_counter()
reset_last_event_id()
@@ -1361,7 +1589,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
inputs=None,
),
)
@@ -1374,6 +1602,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
get_env_context()
context = self._pending_feedback_context
if context is None:
raise ValueError(
"No pending feedback context. Use from_pending() to restore a paused flow."
)
emit = context.emit
default_outcome = context.default_outcome
@@ -1427,6 +1659,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self._completed_methods.add(FlowMethodName(context.method_name))
self._persist_method_completion(FlowMethodName(context.method_name))
self._pending_feedback_context = None
if self.persistence is not None:
@@ -1437,7 +1671,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
method_name=context.method_name,
result=collapsed_outcome if emit else result,
state=self._state,
@@ -1491,7 +1725,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowPausedEvent(
type="flow_paused",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
flow_id=e.context.flow_id,
method_name=e.context.method_name,
state=self._copy_and_serialize_state(),
@@ -1514,12 +1748,15 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
self._event_futures.clear()
if not self.suppress_flow_events:
if (
not self.suppress_flow_events
and not self._should_defer_trace_finalization()
):
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
result=final_result,
state=self._copy_and_serialize_state(),
),
@@ -1531,7 +1768,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
logger.warning("FlowFinishedEvent handler failed", exc_info=True)
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if (
trace_listener.batch_manager.batch_owner_type == "flow"
and current_flow_id.get() == self.flow_id
and not trace_listener.batch_manager.defer_session_finalization
and not current_flow_defer_trace_finalization.get()
):
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
@@ -1580,7 +1822,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return cast(T, {"id": str(uuid4())})
if init_state is None:
return cast(T, {"id": str(uuid4())})
return cast(T, self._create_definition_state())
if isinstance(init_state, type):
state_class = init_state
@@ -1622,6 +1864,34 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
f"Initial state must be dict or BaseModel, got {type(self.initial_state)}"
)
def _create_definition_state(self) -> dict[str, Any] | BaseModel:
state_definition = self._definition.state
if state_definition is None:
return {"id": str(uuid4())}
if state_definition.type in ("pydantic", "json_schema"):
state = _build_definition_state_model(state_definition)
if state is not None:
return state
logger.error(
"Flow %r declares %s state but neither ref nor json_schema "
"produced a model; falling back to dict state",
self._definition.name,
state_definition.type,
)
elif state_definition.type == "unknown":
logger.warning(
"Flow %r declares state of unknown type; falling back to dict state",
self._definition.name,
)
dict_state: dict[str, Any] = (
dict(state_definition.default)
if isinstance(state_definition.default, dict)
else {}
)
if "id" not in dict_state:
dict_state["id"] = str(uuid4())
return dict_state
def _copy_state(self) -> T:
"""Create a copy of the current state.
@@ -1922,13 +2192,17 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
restore_from_state_id=restore_from_state_id,
)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
asyncio.get_running_loop()
ctx = contextvars.copy_context()
with ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(ctx.run, asyncio.run, _run_flow()).result()
except RuntimeError:
return asyncio.run(_run_flow())
try:
asyncio.get_running_loop()
ctx = contextvars.copy_context()
with ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(ctx.run, asyncio.run, _run_flow()).result()
except RuntimeError:
return asyncio.run(_run_flow())
finally:
crewai_event_bus._exit_runtime_scope(runtime_scope)
async def kickoff_async(
self,
@@ -2020,17 +2294,38 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
flow_token = attach(ctx)
flow_id_token = None
flow_name_token = None
flow_defer_trace_finalization_token = None
request_id_token = None
if current_flow_id.get() is None:
flow_id_token = current_flow_id.set(self.flow_id)
flow_name_token = current_flow_name.set(
self.name or self.__class__.__name__
)
flow_defer_trace_finalization_token = (
current_flow_defer_trace_finalization.set(
self._should_defer_trace_finalization()
)
)
if current_flow_request_id.get() is None:
request_id_token = current_flow_request_id.set(self.flow_id)
runtime_scope = crewai_event_bus._enter_runtime_scope()
# Reentrant kickoffs on the same Flow share the outer call's
# listener and accumulator; only the outermost call wires usage
# aggregation.
owns_usage_aggregation = self._usage_aggregation_handler is None
if owns_usage_aggregation:
self._flow_match_id = current_flow_id.get()
self._aggregated_usage_metrics = UsageMetrics()
self._attach_usage_aggregation_listener()
try:
# Reset flow state for fresh execution unless restoring from persistence
is_restoring = (
inputs and "id" in inputs and self.persistence is not None
) or self.checkpoint_completed_methods is not None
) or self._restored_from_checkpoint
if not is_restoring:
# Clear completed methods and outputs for a fresh start
self._completed_methods.clear()
@@ -2047,6 +2342,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if self._completed_methods:
self._is_execution_resuming = True
# Restore is single-shot: a later kickoff on the same instance
# starts fresh.
self._restored_from_checkpoint = False
# Fork hydration: when restore_from_state_id is set and persistence is
# available, hydrate self._state from the source UUID's latest snapshot
# and reassign state.id to a fresh value so subsequent @persist writes
@@ -2117,6 +2416,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
should_emit_flow_started = not (
defer_trace_finalization and deferred_started_event_id
)
if current_flow_id.get() == self.flow_id:
TraceCollectionListener().batch_manager.defer_session_finalization = (
defer_trace_finalization
)
if (
defer_trace_finalization
@@ -2134,7 +2437,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# explicit finalization call closes the batch.
started_event = FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
inputs=inputs,
)
future = crewai_event_bus.emit(self, started_event)
@@ -2174,11 +2477,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# Determine which start methods to execute at kickoff
# Conditional start methods are only triggered by their conditions
# UNLESS there are no unconditional starts (then all starts run as entry points)
start_methods = type(self)._start_method_names()
start_methods = self._start_method_names()
unconditional_starts = [
start_method
for start_method in start_methods
if type(self)._start_condition(start_method) is None
if self._start_condition(start_method) is None
]
# If there are unconditional starts, only run those at kickoff
# If there are NO unconditional starts, run all starts (including conditional ones)
@@ -2226,7 +2529,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowPausedEvent(
type="flow_paused",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
flow_id=e.context.flow_id,
method_name=e.context.method_name,
state=self._copy_and_serialize_state(),
@@ -2276,7 +2579,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
result=final_output,
state=self._copy_and_serialize_state(),
),
@@ -2290,7 +2593,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if (
trace_listener.batch_manager.batch_owner_type == "flow"
and current_flow_id.get() == self.flow_id
and not trace_listener.batch_manager.defer_session_finalization
and not current_flow_defer_trace_finalization.get()
):
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
@@ -2302,11 +2610,26 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# Ensure all background memory saves complete before returning
if self.memory is not None and hasattr(self.memory, "drain_writes"):
self.memory.drain_writes()
# Drain pending LLMCallCompletedEvent handlers before
# detaching so `flow.usage_metrics` reflects every call
# emitted during this kickoff — mirrors `Crew.kickoff()`,
# which flushes before reporting `token_usage`. Resume paths
# re-attach a fresh listener via `resume_async`.
if owns_usage_aggregation:
crewai_event_bus.flush()
self._detach_usage_aggregation_listener()
if request_id_token is not None:
current_flow_request_id.reset(request_id_token)
if flow_defer_trace_finalization_token is not None:
current_flow_defer_trace_finalization.reset(
flow_defer_trace_finalization_token
)
if flow_name_token is not None:
current_flow_name.reset(flow_name_token)
if flow_id_token is not None:
current_flow_id.reset(flow_id_token)
detach(flow_token)
crewai_event_bus._exit_runtime_scope(runtime_scope)
async def akickoff(
self,
@@ -2350,7 +2673,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
)
flow_name = self.name or self.__class__.__name__
flow_name = self._definition.name
nodes = sorted(
(
n
@@ -2409,7 +2732,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
# If start method is a router, use its result as an additional trigger
if type(self)._is_router(start_method_name) and result is not None:
if self._is_router(start_method_name) and result is not None:
# Execute listeners for the start method name first
await self._execute_listeners(start_method_name, result, finished_event_id)
# Then execute listeners for the router result (e.g., "approved")
@@ -2429,15 +2752,16 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
def _inject_trigger_payload_for_start_method(
self, original_method: Callable[..., Any]
) -> Callable[..., Any]:
accepts_trigger_payload = (
"crewai_trigger_payload" in inspect.signature(original_method).parameters
)
def prepare_kwargs(
*args: Any, **kwargs: Any
) -> tuple[tuple[Any, ...], dict[str, Any]]:
inputs = cast(dict[str, Any], baggage.get_baggage("flow_inputs") or {})
trigger_payload = inputs.get("crewai_trigger_payload")
sig = inspect.signature(original_method)
accepts_trigger_payload = "crewai_trigger_payload" in sig.parameters
if trigger_payload is not None and accepts_trigger_payload:
kwargs["crewai_trigger_payload"] = trigger_payload
elif trigger_payload is not None:
@@ -2487,7 +2811,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
params=dumped_params,
state=self._copy_and_serialize_state(),
),
@@ -2534,12 +2858,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self._completed_methods.add(method_name)
self._persist_method_completion(method_name)
finished_event_id: str | None = None
if not self.suppress_flow_events:
finished_event = MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
state=self._copy_and_serialize_state(),
result=result,
)
@@ -2568,7 +2894,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionPausedEvent(
type="method_execution_paused",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
state=self._copy_and_serialize_state(),
flow_id=e.context.flow_id,
message=e.context.message,
@@ -2584,7 +2910,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
error=e,
),
)
@@ -2592,6 +2918,55 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self._event_futures.append(future)
raise e
def _persist_method_completion(self, method_name: FlowMethodName) -> None:
method_definition = self._definition.methods.get(method_name)
persist_definition = (
method_definition.persist
if method_definition is not None and method_definition.persist is not None
else self._definition.persist
)
if persist_definition is None or not persist_definition.enabled:
return
from crewai.flow.persistence.decorators import PersistenceDecorator
# An instance-supplied backend overrides definition backends; one the
# engine derived from the flow-level definition must not shadow a
# method-scoped persist config.
backend = (
self.persistence
if self._instance_persistence and self.persistence is not None
else self._persist_backend_for(persist_definition)
)
PersistenceDecorator.persist_state(
self, method_name, backend, verbose=persist_definition.verbose
)
def _persist_backend_for(
self, persist_definition: FlowPersistenceDefinition
) -> FlowPersistence:
cached = self._persist_backends.get(id(persist_definition))
if cached is None:
cached = self._resolve_persist_backend(persist_definition)
self._persist_backends[id(persist_definition)] = cached
return cached
def _resolve_persist_backend(
self, persist_definition: FlowPersistenceDefinition
) -> FlowPersistence:
if persist_definition.persistence is None:
from crewai.flow.persistence.factory import default_flow_persistence
return default_flow_persistence()
resolved = _resolve_persistence(persist_definition.persistence)
if not isinstance(resolved, FlowPersistence):
raise ValueError(
f"Cannot resolve persistence backend "
f"{persist_definition.persistence!r} from the flow definition "
f"for flow {self._definition.name!r}."
)
return resolved
def _copy_and_serialize_state(self) -> dict[str, Any]:
state_copy = self._copy_state()
if isinstance(state_copy, BaseModel):
@@ -2716,7 +3091,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
await asyncio.gather(*tasks)
if current_trigger in router_results:
for method_name in type(self)._start_method_names():
for method_name in self._start_method_names():
if self._start_condition_triggered_by(
method_name, current_trigger
):
@@ -2747,9 +3122,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
) -> list[FlowMethodName]:
triggered: list[FlowMethodName] = []
for listener_name, method_definition, condition in type(
self
)._listener_methods():
for listener_name, method_definition, condition in self._listener_methods():
is_router = method_definition.router
if router_only != is_router:
continue
@@ -2815,10 +3188,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# For routers, also check if any conditional starts they triggered are completed
# If so, continue their chains
if type(self)._is_router(listener_name):
for start_method_name in type(self)._start_method_names():
if self._is_router(listener_name):
for start_method_name in self._start_method_names():
if (
type(self)._start_condition(start_method_name) is not None
self._start_condition(start_method_name) is not None
and start_method_name in self._completed_methods
):
# This conditional start was executed, continue its chain
@@ -2837,8 +3210,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
method = self._methods[listener_name]
sig = inspect.signature(method)
params = list(sig.parameters.values())
method_params = [p for p in params if p.name != "self"]
method_params = [p for p in sig.parameters.values() if p.name != "self"]
if triggering_event_id:
with triggered_by_scope(triggering_event_id):
@@ -2994,7 +3366,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowInputRequestedEvent(
type="flow_input_requested",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
method_name=method_name,
message=message,
metadata=metadata,
@@ -3061,7 +3433,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowInputReceivedEvent(
type="flow_input_received",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
method_name=method_name,
message=message,
response=response,
@@ -3099,7 +3471,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
HumanFeedbackRequestedEvent(
type="human_feedback_requested",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
method_name="", # Will be set by decorator if needed
output=output,
message=message,
@@ -3128,7 +3500,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
HumanFeedbackReceivedEvent(
type="human_feedback_received",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
method_name="", # Will be set by decorator if needed
feedback=feedback,
outcome=None, # Will be determined after collapsing
@@ -3303,7 +3675,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self,
FlowPlotEvent(
type="flow_plot",
flow_name=self.name or self.__class__.__name__,
flow_name=self._definition.name,
),
)
structure = build_flow_structure(cast(Any, self))

View File

@@ -0,0 +1,48 @@
from __future__ import annotations
from collections.abc import Callable
import importlib
from operator import attrgetter
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.flow_definition import FlowActionDefinition
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
class InvalidActionRefError(ValueError):
def __init__(self, ref: str) -> None:
super().__init__(f"invalid callable {ref!r}; expected 'module:qualname'")
def import_ref(ref: str) -> Any:
"""Import the object a `module:qualname` reference points to."""
module_name, _, qualname = ref.partition(":")
if "<" in ref or not module_name or not qualname:
raise InvalidActionRefError(ref)
try:
return attrgetter(qualname)(importlib.import_module(module_name))
except (ImportError, AttributeError) as e:
raise InvalidActionRefError(ref) from e
def _resolve_code_action(
flow: Flow[Any], action: FlowActionDefinition
) -> Callable[..., Any]:
ref = action.ref
target = import_ref(ref)
if not callable(target):
raise InvalidActionRefError(ref)
handler = cast(Callable[..., Any], target)
if getattr(handler, "__self__", None) is None:
handler = handler.__get__(flow, type(flow))
return handler
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
"""Turn one `do:` action into the callable the flow runs for that node."""
if action.call == "code":
return _resolve_code_action(flow, action)
raise ValueError(f"unknown call type {action.call!r}")

View File

@@ -890,41 +890,17 @@ class BaseLLM(BaseModel, ABC):
Args:
usage_data: Token usage data from the API response
"""
prompt_tokens = (
usage_data.get("prompt_tokens")
or usage_data.get("prompt_token_count")
or usage_data.get("input_tokens")
or 0
)
metrics = UsageMetrics.from_provider_dict(usage_data)
if metrics is None:
return
completion_tokens = (
usage_data.get("completion_tokens")
or usage_data.get("candidates_token_count")
or usage_data.get("output_tokens")
or 0
)
cached_tokens = (
usage_data.get("cached_tokens")
or usage_data.get("cached_prompt_tokens")
or usage_data.get("cache_read_input_tokens")
or 0
)
if not cached_tokens:
prompt_details = usage_data.get("prompt_tokens_details")
if isinstance(prompt_details, dict):
cached_tokens = prompt_details.get("cached_tokens", 0) or 0
reasoning_tokens = usage_data.get("reasoning_tokens", 0) or 0
cache_creation_tokens = usage_data.get("cache_creation_tokens", 0) or 0
self._token_usage["prompt_tokens"] += prompt_tokens
self._token_usage["completion_tokens"] += completion_tokens
self._token_usage["total_tokens"] += prompt_tokens + completion_tokens
self._token_usage["successful_requests"] += 1
self._token_usage["cached_prompt_tokens"] += cached_tokens
self._token_usage["reasoning_tokens"] += reasoning_tokens
self._token_usage["cache_creation_tokens"] += cache_creation_tokens
self._token_usage["prompt_tokens"] += metrics.prompt_tokens
self._token_usage["completion_tokens"] += metrics.completion_tokens
self._token_usage["total_tokens"] += metrics.total_tokens
self._token_usage["successful_requests"] += metrics.successful_requests
self._token_usage["cached_prompt_tokens"] += metrics.cached_prompt_tokens
self._token_usage["reasoning_tokens"] += metrics.reasoning_tokens
self._token_usage["cache_creation_tokens"] += metrics.cache_creation_tokens
def get_token_usage_summary(self) -> UsageMetrics:
"""Get summary of token usage for this LLM instance.

View File

@@ -30,7 +30,7 @@ from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import Span
from opentelemetry.trace import ProxyTracerProvider, Span
from typing_extensions import Self
from crewai.events.event_bus import crewai_event_bus
@@ -162,6 +162,10 @@ class Telemetry:
if self.ready and not self.trace_set:
try:
with suppress_warnings():
existing_provider = trace.get_tracer_provider()
if not isinstance(existing_provider, ProxyTracerProvider):
self.trace_set = True
return
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception as e:

View File

@@ -4,10 +4,31 @@ This module provides models for tracking token usage and request metrics
during crew and agent execution.
"""
from typing import Any
from pydantic import BaseModel, Field
from typing_extensions import Self
def _coerce_int(value: Any) -> int:
if value is None:
return 0
try:
return int(value)
except (TypeError, ValueError):
return 0
def _first_int(usage_data: dict[str, Any], *keys: str) -> int:
"""Return the first integer-coercible value from ``usage_data`` under any
of ``keys``. Falls back to ``0`` when nothing matches."""
for key in keys:
coerced = _coerce_int(usage_data.get(key))
if coerced:
return coerced
return 0
class UsageMetrics(BaseModel):
"""Track usage metrics for crew execution.
@@ -54,3 +75,50 @@ class UsageMetrics(BaseModel):
self.reasoning_tokens += usage_metrics.reasoning_tokens
self.cache_creation_tokens += usage_metrics.cache_creation_tokens
self.successful_requests += usage_metrics.successful_requests
@classmethod
def from_provider_dict(cls, usage_data: dict[str, Any] | None) -> Self | None:
"""Normalize a provider's raw usage dict into a ``UsageMetrics``.
Accepts the full set of key aliases CrewAI providers emit:
``prompt_tokens`` / ``prompt_token_count`` (Gemini) / ``input_tokens``
(Anthropic), and the equivalent completion / cached-prompt aliases.
Mirrors ``BaseLLM._track_token_usage_internal`` so per-LLM totals,
flow-level aggregation, and OTel spans agree on every provider.
Returns ``None`` for missing/empty input so callers can decide
whether to skip the event entirely or treat it as a zero-token
successful request.
"""
if not usage_data:
return None
prompt_tokens = _first_int(
usage_data, "prompt_tokens", "prompt_token_count", "input_tokens"
)
completion_tokens = _first_int(
usage_data,
"completion_tokens",
"candidates_token_count",
"output_tokens",
)
cached_prompt_tokens = _first_int(
usage_data,
"cached_tokens",
"cached_prompt_tokens",
"cache_read_input_tokens",
)
if not cached_prompt_tokens:
details = usage_data.get("prompt_tokens_details")
if isinstance(details, dict):
cached_prompt_tokens = _coerce_int(details.get("cached_tokens"))
return cls(
total_tokens=prompt_tokens + completion_tokens,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cached_prompt_tokens=cached_prompt_tokens,
reasoning_tokens=_coerce_int(usage_data.get("reasoning_tokens")),
cache_creation_tokens=_coerce_int(usage_data.get("cache_creation_tokens")),
successful_requests=1,
)

View File

@@ -999,7 +999,11 @@ def _json_schema_to_pydantic_field(
if examples:
schema_extra["examples"] = examples
default = ... if is_required else None
default = (
json_schema["default"]
if "default" in json_schema
else (... if is_required else None)
)
if isinstance(type_, type) and issubclass(type_, (int, float)):
if "minimum" in json_schema:

View File

@@ -4,6 +4,7 @@ import os
import threading
from unittest import mock
from unittest.mock import MagicMock, patch
import warnings
from crewai.agents.crew_agent_executor import AgentFinish, CrewAgentExecutor
from crewai.constants import DEFAULT_LLM_MODEL
@@ -77,6 +78,51 @@ def test_agent_creation():
assert agent.backstory == "test backstory"
def test_agent_exposes_i18n_for_backward_compatibility():
from crewai.utilities.i18n import I18N_DEFAULT
agent = Agent(role="test role", goal="test goal", backstory="test backstory")
with pytest.warns(DeprecationWarning, match="Agent.i18n is deprecated"):
i18n = agent.i18n
assert i18n is I18N_DEFAULT
assert isinstance(i18n.slice("role_playing"), str)
def test_agent_accepts_custom_i18n():
from crewai.utilities.i18n import I18N
prompt_file = os.path.join(
os.path.dirname(__file__), "..", "utilities", "prompts.json"
)
i18n = I18N(prompt_file=prompt_file)
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
i18n=i18n,
)
with pytest.warns(DeprecationWarning, match="Agent.i18n is deprecated"):
agent_i18n = agent.i18n
assert agent_i18n is i18n
assert agent_i18n.slice("role_playing") == "Lorem ipsum dolor sit amet"
def test_agent_copy_does_not_emit_i18n_deprecation_warning():
agent = Agent(role="test role", goal="test goal", backstory="test backstory")
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter("always", DeprecationWarning)
agent.copy()
assert not any(
"Agent.i18n is deprecated" in str(w.message) for w in caught_warnings
)
def test_agent_with_only_system_template():
"""Test that an agent with only system_template works without errors."""
agent = Agent(

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import threading
from typing import Any
from unittest.mock import patch
@@ -109,10 +110,79 @@ class TestCheckpointListenerOptsOut:
assert do_cp.call_count == 0
class TestFlowResumeReplaysEvents:
"""End-to-end: a resumed flow emits MethodExecution* events for completed methods."""
class TestCheckpointResumeReplaysEvents:
"""A flow resumed from a checkpoint replays MethodExecution* events for
completed methods and executes the pending ones. The checkpoint persists
the event record, which is reloaded into the per-run runtime state.
def test_resume_dispatches_completed_method_events(self, tmp_path) -> None:
``step_c`` is gated on a threading.Event so the flow is frozen with exactly
``step_a`` and ``step_b`` completed when the checkpoint is written — the
mid-run snapshot is deterministic rather than dependent on write timing.
"""
def test_resume_replays_completed_and_executes_pending(self, tmp_path) -> None:
from crewai.flow.flow import Flow, listen, start
from crewai.state.checkpoint_config import CheckpointConfig
at_step_c = threading.Event()
release = threading.Event()
captured: list[Any] = []
class ThreeStepFlow(Flow[dict]):
@start()
def step_a(self) -> str:
return "a"
@listen(step_a)
def step_b(self) -> str:
return "b"
@listen(step_b)
def step_c(self) -> str:
captured.append(crewai_event_bus.runtime_state)
at_step_c.set()
release.wait(timeout=10)
return "c"
runner = threading.Thread(target=ThreeStepFlow().kickoff)
runner.start()
try:
assert at_step_c.wait(timeout=10)
location = captured[0].checkpoint(str(tmp_path / "cp"))
finally:
release.set()
runner.join(timeout=10)
captured_started: list[str] = []
captured_finished: list[str] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def _cs(_: Any, event: MethodExecutionStartedEvent) -> None:
captured_started.append(event.method_name)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def _cf(_: Any, event: MethodExecutionFinishedEvent) -> None:
captured_finished.append(event.method_name)
ThreeStepFlow().kickoff(
from_checkpoint=CheckpointConfig(restore_from=location)
)
assert captured_started == ["step_a", "step_b", "step_c"]
assert captured_finished == ["step_a", "step_b", "step_c"]
class TestPersistResumeDoesNotReplayCompletedEvents:
"""A @persist resume continues from pending methods only.
@persist stores flow state, not the event record, so completed-method
events have no persisted source to replay from. Runtime state is scoped
per run, so flow1's events are not visible to flow2.
"""
def test_persist_resume_executes_only_pending_methods(self, tmp_path) -> None:
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
@@ -132,9 +202,6 @@ class TestFlowResumeReplaysEvents:
def step_c(self) -> str:
return "c"
if crewai_event_bus.runtime_state is not None:
crewai_event_bus.runtime_state.event_record.clear()
flow1 = ThreeStepFlow(persistence=persistence)
flow1.kickoff()
flow_id = flow1.state["id"]
@@ -157,9 +224,5 @@ class TestFlowResumeReplaysEvents:
flow2.kickoff(inputs={"id": flow_id})
assert captured_started.count("step_a") == 1
assert captured_started.count("step_b") == 1
assert captured_started.count("step_c") == 1
assert captured_finished.count("step_a") == 1
assert captured_finished.count("step_b") == 1
assert captured_finished.count("step_c") == 1
assert captured_started == ["step_c"]
assert captured_finished == ["step_c"]

View File

@@ -6,6 +6,7 @@ import pytest
from crewai import Agent, Crew, Task
from crewai.telemetry import Telemetry
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
@pytest.fixture(autouse=True)
@@ -53,6 +54,23 @@ def test_telemetry_enabled_by_default():
assert telemetry.ready is True
def test_set_tracer_skips_when_provider_already_configured():
"""A second telemetry instance must not re-install the global provider."""
with (
patch.dict(os.environ, {}, clear=True),
patch(
"crewai.telemetry.telemetry.trace.get_tracer_provider",
return_value=TracerProvider(),
),
patch("crewai.telemetry.telemetry.trace.set_tracer_provider") as mock_set,
):
telemetry = Telemetry()
telemetry.set_tracer()
mock_set.assert_not_called()
assert telemetry.trace_set is True
@patch("crewai.telemetry.telemetry.logger.error")
@patch(
"opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export",

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import inspect
import json
import os
import sqlite3
@@ -16,6 +17,7 @@ from pydantic import BaseModel
from crewai.agent.core import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.crew import Crew
from crewai.llms.base_llm import BaseLLM
from crewai.flow.flow import _INITIAL_STATE_CLASS_MARKER, Flow, start
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.state.checkpoint_listener import (
@@ -682,3 +684,85 @@ class TestAgentCheckpoint:
cfg = CheckpointConfig(restore_from=loc)
restored = Agent.from_checkpoint(cfg)
assert restored._kickoff_event_id == "evt-456"
class _FinalAnswerLLM(BaseLLM):
"""Stub LLM that always returns a final answer without any API calls."""
def __init__(self) -> None:
super().__init__(model="stub")
def call(
self,
messages,
tools=None,
callbacks=None,
available_functions=None,
from_task=None,
from_agent=None,
response_model=None,
):
return "Final Answer: done."
def supports_function_calling(self) -> bool:
return False
def supports_stop_words(self) -> bool:
return False
def get_context_window_size(self) -> int:
return 4096
async def acall(self, *args, **kwargs):
raise NotImplementedError
class TestCheckpointReusedExecutor:
"""Checkpoint serialization stamps every live Flow's completed methods.
The agent executor is a Flow reused across a crew's tasks, so the stamp
must not be read back as a restore signal on the next task — otherwise the
second task replays as a resume and never reaches a final answer.
"""
def test_second_task_runs_with_checkpointing_enabled(self) -> None:
agent = Agent(role="r", goal="g", backstory="b", llm=_FinalAnswerLLM())
task1 = Task(description="first", expected_output="x", agent=agent)
task2 = Task(description="second", expected_output="y", agent=agent)
with tempfile.TemporaryDirectory() as d:
crew = Crew(
agents=[agent],
tasks=[task1, task2],
verbose=False,
checkpoint=CheckpointConfig(
provider=JsonProvider(location=d),
on_events=["task_started", "task_completed"],
),
)
result = crew.kickoff()
assert len(result.tasks_output) == 2
assert result.tasks_output[1].raw
class TestCustomLLMCheckpointRestore:
"""A custom BaseLLM subclass serializes with the inherited llm_type "base".
Restoring it must not try to instantiate the abstract BaseLLM; it is rebuilt
as a concrete LLM from the saved config instead.
"""
def test_restore_does_not_instantiate_abstract_base_llm(self) -> None:
agent = Agent(role="r", goal="g", backstory="b", llm=_FinalAnswerLLM())
task = Task(description="d", expected_output="e", agent=agent)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
raw = RuntimeState(root=[crew]).model_dump_json()
restored = RuntimeState.model_validate_json(
raw, context={"from_checkpoint": True}
)
llm = restored.root[0].agents[0].llm
assert isinstance(llm, BaseLLM)
assert not inspect.isabstract(type(llm))
assert llm.model == "stub"

View File

@@ -409,4 +409,31 @@ class TestRuntimeStateIntegration:
old_json, context={"from_checkpoint": True}
)
assert len(restored.root) == 1
assert len(restored.event_record) == 0
assert len(restored.event_record) == 0
def test_reset_runtime_state_clears_state_and_registry(self):
from crewai import Agent, Crew, RuntimeState
from crewai.events.event_bus import crewai_event_bus
if RuntimeState is None:
pytest.skip("RuntimeState unavailable (model_rebuild failed)")
agent = Agent(role="test", goal="test", backstory="test", llm="gpt-4o-mini")
crew = Crew(agents=[agent], tasks=[], verbose=False)
previous_state = crewai_event_bus._runtime_state
previous_ids = crewai_event_bus._registered_entity_ids
crewai_event_bus._runtime_state = None
crewai_event_bus._registered_entity_ids = set()
try:
crewai_event_bus.register_entity(crew)
assert crewai_event_bus.runtime_state is not None
assert crewai_event_bus._registered_entity_ids
crewai_event_bus.reset_runtime_state()
assert crewai_event_bus.runtime_state is None
assert crewai_event_bus._registered_entity_ids == set()
finally:
crewai_event_bus._runtime_state = previous_state
crewai_event_bus._registered_entity_ids = previous_ids

View File

@@ -1040,7 +1040,7 @@ def test_flow_plotting():
received_events.append(event)
event_received.set()
flow.plot("test_flow")
flow.plot("test_flow", show=False)
assert event_received.wait(timeout=5), "Timeout waiting for plot event"
assert len(received_events) == 1
@@ -1157,6 +1157,26 @@ def test_flow_name():
assert flow.name == "MyFlow"
def test_flow_custom_name_overrides_class_name_in_events():
class InternalFlowClass(Flow):
name = "PublicName"
@start()
def begin(self):
return "done"
received = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowStartedEvent)
def handle(source, event):
received.append(event)
InternalFlowClass().kickoff()
assert received[0].flow_name == "PublicName"
def test_nested_and_or_conditions():
"""Test nested conditions like or_(and_(A, B), and_(C, D)).

View File

@@ -26,7 +26,11 @@ from crewai.experimental import (
RouterConfig,
)
from crewai.flow import Flow, ChatState, listen, start
from crewai.flow.flow_context import current_flow_id, current_flow_name
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
current_flow_name,
)
from crewai.flow.conversation import (
append_message,
get_conversation_messages,
@@ -598,9 +602,9 @@ class TestConversationalFlow:
"""Conversational flows: user ``@start`` methods finish before router fires.
Non-chat flows run ``@start`` methods in parallel via ``asyncio.gather``,
which would race with ``conversation_start`` and let the router fire
which would race with ``route_conversation`` and let the router fire
before user setup finished. In conversational mode the framework runs
them sequentially, with ``conversation_start`` last.
them sequentially, with ``route_conversation`` last.
"""
order: list[str] = []
@@ -643,15 +647,10 @@ class TestConversationalFlow:
assert "attach_bus" in order # still fires every turn
assert "route_turn" in order
def test_subclass_can_override_conversation_start_without_redecorating(
def test_subclass_can_override_conversation_start_helper(
self,
) -> None:
"""Overriding an inherited ``@start`` method must not unregister it.
Before the metaclass fix, subclasses had to re-apply ``@start()`` on
every override or the parent's ``conversation_start`` would silently
drop out of the start registry — leaving the flow with nothing to fire.
"""
"""The compatibility helper remains overridable without adding a Flow node."""
bootstrap_calls: list[str] = []
@@ -672,6 +671,38 @@ class TestConversationalFlow:
flow = BootstrapFlow()
flow.handle_turn("hi")
assert bootstrap_calls == ["ran"]
assert "conversation_start" not in BootstrapFlow.flow_definition().methods
route_definition = BootstrapFlow.flow_definition().methods["route_conversation"]
assert route_definition.start is True
assert route_definition.router is True
assert flow.state.messages[-1].content == "worked"
def test_legacy_decorated_conversation_start_runs_once_per_turn(
self,
) -> None:
"""Legacy ``@start`` overrides are not invoked again by the router."""
bootstrap_calls: list[str] = []
@ConversationConfig()
class BootstrapFlow(ConversationalFlow):
@start()
def conversation_start(self) -> str | None:
bootstrap_calls.append("ran")
return super().conversation_start()
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> str:
self.append_assistant_message("worked")
return "worked"
flow = BootstrapFlow()
flow.handle_turn("hi")
assert bootstrap_calls == ["ran"]
assert flow.state.messages[-1].content == "worked"
@@ -1170,6 +1201,40 @@ class TestConversationalFlow:
"finalize_session_traces must finalize the trace batch once"
)
def test_deferred_resume_skips_per_resume_flow_finished_event(self) -> None:
"""Deferred sessions do not emit terminal events while resuming."""
from crewai.events.types.flow_events import FlowFinishedEvent
from crewai.flow.async_feedback.types import PendingFeedbackContext
class DeferredResumeFlow(Flow[ChatState]):
defer_trace_finalization = True
@start()
def begin(self) -> str:
return "started"
flow = DeferredResumeFlow()
flow._pending_feedback_context = PendingFeedbackContext(
flow_id=flow.flow_id,
flow_class="DeferredResumeFlow",
method_name="begin",
method_output="started",
message="Review",
)
finished_events: list[FlowFinishedEvent] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowFinishedEvent)
def capture(_: Any, event: FlowFinishedEvent) -> None:
finished_events.append(event)
flow.resume("approved")
crewai_event_bus.flush()
assert finished_events == []
def test_finalize_session_traces_restores_event_scope(self, capsys) -> None:
"""No ``empty scope stack`` warning when deferred ``flow_finished`` fires.
@@ -1471,6 +1536,44 @@ class TestDeferredFlowLifecycleEvents:
listener.batch_manager.finalize_batch()
mock_finalize.assert_not_called()
def test_deferred_flow_kickoff_marks_trace_manager_session_deferred(
self,
) -> None:
class DeferredTraceFlow(Flow[ChatState]):
@start()
def begin(self) -> str:
return "done"
listener = TraceCollectionListener()
listener.batch_manager.defer_session_finalization = False
flow = DeferredTraceFlow()
flow.defer_trace_finalization = True
with patch.object(listener.batch_manager, "finalize_batch"):
flow.kickoff()
assert listener.batch_manager.defer_session_finalization is True
flow.finalize_session_traces()
assert listener.batch_manager.defer_session_finalization is False
def test_non_deferred_flow_kickoff_clears_stale_trace_manager_flag(
self,
) -> None:
class PlainTraceFlow(Flow[ChatState]):
@start()
def begin(self) -> str:
return "done"
listener = TraceCollectionListener()
listener.batch_manager.defer_session_finalization = True
PlainTraceFlow().kickoff()
assert listener.batch_manager.defer_session_finalization is False
class TestNestedCrewTracing:
def test_is_inside_active_flow_context_when_kickoff_running(self) -> None:
@@ -1524,3 +1627,130 @@ class TestNestedCrewTracing:
elif listener.batch_manager.batch_owner_type == "crew":
listener.batch_manager.finalize_batch()
mock_finalize.assert_not_called()
def test_lazy_flow_batch_from_context_preserves_deferred_parent(self) -> None:
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
listener = TraceCollectionListener()
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
listener.batch_manager.defer_session_finalization = False
listener.batch_manager.event_buffer.clear()
flow_id_token = current_flow_id.set("parent-flow-id")
flow_name_token = current_flow_name.set("ParentChatFlow")
defer_token = current_flow_defer_trace_finalization.set(True)
try:
initialized = listener._try_initialize_flow_batch_from_context(
type("Event", (), {"timestamp": None})()
)
assert initialized is True
assert listener.batch_manager.batch_owner_type == "flow"
assert listener.batch_manager.batch_owner_id == "parent-flow-id"
assert listener.batch_manager.defer_session_finalization is True
assert listener.batch_manager.current_batch is not None
assert (
listener.batch_manager.current_batch.execution_metadata[
"execution_type"
]
== "flow"
)
assert (
listener.batch_manager.current_batch.execution_metadata["flow_name"]
== "ParentChatFlow"
)
finally:
current_flow_defer_trace_finalization.reset(defer_token)
current_flow_name.reset(flow_name_token)
current_flow_id.reset(flow_id_token)
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
listener.batch_manager.trace_batch_id = None
listener.batch_manager.defer_session_finalization = False
listener.batch_manager.event_buffer.clear()
def test_nested_agent_executor_flow_does_not_finalize_parent_batch(
self,
) -> None:
from crewai import Agent, Crew, Task
from crewai.llms.base_llm import BaseLLM
class StaticLLM(BaseLLM):
def __init__(self) -> None:
super().__init__(model="debug-static-llm", provider="debug")
def call(
self,
messages: Any,
tools: Any = None,
callbacks: Any = None,
available_functions: Any = None,
from_task: Any = None,
from_agent: Any = None,
response_model: Any = None,
) -> str:
return (
"Thought: I can answer directly.\n"
"Final Answer: nested crew result"
)
class NestedCrewFlow(Flow[ChatState]):
defer_trace_finalization = True
tracing = True
@start()
def begin(self) -> str:
return "run_nested_crew"
@listen(begin)
def run_nested_crew(self, _: str) -> str:
agent = Agent(
role="Debug Agent",
goal="Return a short deterministic result",
backstory="Used only for trace finalization debugging.",
llm=StaticLLM(),
verbose=False,
)
task = Task(
description="Return the deterministic nested crew result.",
expected_output="nested crew result",
agent=agent,
)
return Crew(agents=[agent], tasks=[task], verbose=False).kickoff().raw
listener = TraceCollectionListener()
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
listener.batch_manager.trace_batch_id = None
listener.batch_manager.defer_session_finalization = False
listener.batch_manager.event_buffer.clear()
listener.first_time_handler.is_first_time = False
def initialize_backend_batch(*_: Any, **__: Any) -> None:
listener.batch_manager.trace_batch_id = "debug-trace-batch"
flow = NestedCrewFlow()
with (
patch.object(
listener.batch_manager,
"_initialize_backend_batch",
side_effect=initialize_backend_batch,
),
patch.object(listener.batch_manager, "finalize_batch") as mock_finalize,
):
flow.kickoff()
crewai_event_bus.flush()
flow.kickoff()
crewai_event_bus.flush()
assert mock_finalize.call_count == 0, (
"nested AgentExecutor flows inside a deferred parent Flow must "
"not finalize the parent trace batch"
)

View File

@@ -36,6 +36,7 @@ def test_flow_public_exports_are_explicit():
"start",
}
assert set(flow_definition.__all__) == {
"FlowActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
@@ -223,10 +224,11 @@ def test_flow_definition_includes_conversational_builtins_when_enabled():
assert definition.conversational.enabled is True
assert definition.conversational.defer_trace_finalization is True
assert definition.conversational.builtin_routes == ["converse", "end"]
assert "conversation_start" in methods
assert "conversation_start" not in methods
assert "route_conversation" in methods
assert "converse_turn" in methods
assert methods["conversation_start"].start is True
assert methods["route_conversation"].start is True
assert methods["route_conversation"].router is True
def test_flow_definition_serializes_conversational_config():
@@ -260,7 +262,7 @@ def test_flow_definition_serializes_conversational_config():
assert conversational.router.fallback_intent == "end"
def test_flow_definition_preserves_undecorated_conversational_override():
def test_flow_definition_uses_collapsed_conversational_router_start():
class ChatFlow(Flow):
conversational = True
@@ -269,8 +271,10 @@ def test_flow_definition_preserves_undecorated_conversational_override():
methods = ChatFlow.flow_definition().methods
assert methods["conversation_start"].start is True
assert "conversation_start" not in methods
assert "route_conversation" in methods
assert methods["route_conversation"].start is True
assert methods["route_conversation"].router is True
def test_flow_definition_serializes_human_feedback_metadata():
@@ -626,6 +630,7 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract():
"name": "LoadedDiagnosticsFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"},
"router": True,
"emit": ["continue"],
}
@@ -659,6 +664,7 @@ def test_router_start_false_without_listen_reports_missing_trigger():
"name": "LoadedFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
"router": True,
"start": False,
"emit": ["continue"],
@@ -737,8 +743,14 @@ def test_static_string_listener_is_allowed_by_contract():
"schema": "crewai.flow/v1",
"name": "TypoFlow",
"methods": {
"begin": {"start": True},
"handle": {"listen": "begni"},
"begin": {
"do": {"ref": "loaded_flows:TypoFlow.begin"},
"start": True,
},
"handle": {
"do": {"ref": "loaded_flows:TypoFlow.handle"},
"listen": "begni",
},
},
}
)
@@ -751,8 +763,15 @@ def test_start_false_not_classified_as_start_method():
"schema": "crewai.flow/v1",
"name": "ExplicitNonStartFlow",
"methods": {
"begin": {"start": True},
"handle": {"start": False, "listen": "begin"},
"begin": {
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.begin"},
"start": True,
},
"handle": {
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.handle"},
"start": False,
"listen": "begin",
},
},
}
)
@@ -809,6 +828,7 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
"name": "LoadedFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
"router": True,
"emit": ["continue"],
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,511 @@
"""Tests for flow-level token usage aggregation
``flow.usage_metrics`` listens to ``LLMCallCompletedEvent`` for the duration
of ``kickoff_async`` so it covers every LLM call inside the flow — crew-led,
tool-led, AND bare ``LLM.call(...)`` from a flow method. We exercise the
aggregator end-to-end through the real event bus with fabricated events and
explicit contextvar control; no live LLM provider is required.
"""
from __future__ import annotations
import contextvars
import os
import tempfile
from typing import Any, Callable
from uuid import uuid4
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent, LLMCallType
from crewai.flow.async_feedback.types import PendingFeedbackContext
from crewai.flow.flow import Flow, listen, start
from crewai.flow.flow_context import current_flow_id
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
from crewai.flow.runtime import _usage_dict_to_metrics
from crewai.types.usage_metrics import UsageMetrics
def _emit_llm_call(
*,
flow_id: str | None,
prompt_tokens: int = 0,
completion_tokens: int = 0,
cached_prompt_tokens: int = 0,
reasoning_tokens: int = 0,
cache_creation_tokens: int = 0,
) -> None:
"""Emit one fake ``LLMCallCompletedEvent`` with ``current_flow_id`` pinned
to ``flow_id``.
Runs in a freshly-copied context so the value the bus snapshots at emit
time is exactly ``flow_id`` — independent of the calling thread's outer
context. Mirrors how the real ``LLM.call`` emits events at runtime.
"""
usage: dict[str, Any] = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
}
for key, value in (
("cached_prompt_tokens", cached_prompt_tokens),
("reasoning_tokens", reasoning_tokens),
("cache_creation_tokens", cache_creation_tokens),
):
if value:
usage[key] = value
event = LLMCallCompletedEvent(
call_id=str(uuid4()),
model="gpt-4o-mini",
response="ok",
call_type=LLMCallType.LLM_CALL,
usage=usage,
)
ctx = contextvars.copy_context()
def _emit() -> None:
current_flow_id.set(flow_id)
future = crewai_event_bus.emit(object(), event)
if future is not None:
future.result(timeout=5.0)
ctx.run(_emit)
class _ScriptedFlow(Flow):
"""A Flow whose ``@start`` delegates to a per-instance ``_script`` closure.
Each test attaches a script with ``flow._script = lambda f: ...`` so we
don't redefine a Flow subclass for every scenario.
"""
@start()
def run(self) -> None:
script: Callable[[Flow], None] = getattr(self, "_script", lambda _f: None)
script(self)
def _run(script: Callable[[Flow], None] = lambda _f: None) -> Flow:
"""Build a ``_ScriptedFlow``, attach ``script``, kickoff. Returns the flow."""
flow = _ScriptedFlow()
flow._script = script
flow.kickoff()
return flow
class TestUsageDictToMetrics:
"""Unit tests for the dict-to-UsageMetrics normalizer."""
@pytest.mark.parametrize(
"usage, expected",
[
(None, None),
({}, None),
(
{"prompt_tokens": 10, "completion_tokens": 20, "total_tokens": 30},
UsageMetrics(
prompt_tokens=10,
completion_tokens=20,
total_tokens=30,
successful_requests=1,
),
),
# total_tokens missing → derived from prompt + completion
(
{"prompt_tokens": 4, "completion_tokens": 6},
UsageMetrics(
prompt_tokens=4,
completion_tokens=6,
total_tokens=10,
successful_requests=1,
),
),
# Extended provider-specific keys flow through normalization
(
{
"prompt_tokens": 100,
"completion_tokens": 80,
"total_tokens": 180,
"cached_prompt_tokens": 40,
"reasoning_tokens": 25,
"cache_creation_tokens": 10,
},
UsageMetrics(
prompt_tokens=100,
completion_tokens=80,
total_tokens=180,
cached_prompt_tokens=40,
reasoning_tokens=25,
cache_creation_tokens=10,
successful_requests=1,
),
),
# Garbage / non-int values coerce to 0 instead of crashing
(
{"prompt_tokens": "n/a", "completion_tokens": None, "total_tokens": 7},
UsageMetrics(
prompt_tokens=0,
completion_tokens=0,
total_tokens=0,
successful_requests=1,
),
),
# Native Anthropic provider emits input_tokens/output_tokens
(
{"input_tokens": 12, "output_tokens": 8},
UsageMetrics(
prompt_tokens=12,
completion_tokens=8,
total_tokens=20,
successful_requests=1,
),
),
# Native Gemini provider emits prompt_token_count/candidates_token_count
(
{
"prompt_token_count": 30,
"candidates_token_count": 20,
"reasoning_tokens": 5,
},
UsageMetrics(
prompt_tokens=30,
completion_tokens=20,
total_tokens=50,
reasoning_tokens=5,
successful_requests=1,
),
),
# OpenAI nests cached_tokens under prompt_tokens_details
(
{
"prompt_tokens": 100,
"completion_tokens": 50,
"prompt_tokens_details": {"cached_tokens": 30},
},
UsageMetrics(
prompt_tokens=100,
completion_tokens=50,
total_tokens=150,
cached_prompt_tokens=30,
successful_requests=1,
),
),
],
ids=[
"none",
"empty",
"all_keys",
"no_total",
"extended_keys",
"garbage",
"anthropic_aliases",
"gemini_aliases",
"openai_nested_cached",
],
)
def test_normalization(
self, usage: dict[str, Any] | None, expected: UsageMetrics | None
) -> None:
assert _usage_dict_to_metrics(usage) == expected
class TestFlowUsageAggregation:
"""End-to-end tests driving the listener through the real event bus."""
def test_sums_every_llm_call_in_the_flow(self) -> None:
"""Multiple LLM calls — including bare ``LLM.call(...)`` made outside
any crew — accumulate; ``successful_requests`` tracks the call count."""
def script(flow: Flow) -> None:
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=300, completion_tokens=300)
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=200, completion_tokens=100)
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=20, completion_tokens=20)
flow = _run(script)
assert flow.usage_metrics.total_tokens == 940
assert flow.usage_metrics.prompt_tokens == 520
assert flow.usage_metrics.completion_tokens == 420
assert flow.usage_metrics.successful_requests == 3
def test_returns_zero_when_no_calls_happen(self) -> None:
flow = _run()
assert flow.usage_metrics == UsageMetrics()
def test_ignores_events_from_other_flows(self) -> None:
"""Concurrent flow runs share the singleton bus, so the listener must
scope itself to its own flow via the contextvar match."""
def script(flow: Flow) -> None:
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=50, completion_tokens=50)
_emit_llm_call(flow_id="some-other-flow", prompt_tokens=49_000, completion_tokens=50_999)
flow = _run(script)
assert flow.usage_metrics.total_tokens == 100
assert flow.usage_metrics.successful_requests == 1
def test_resets_between_kickoffs(self) -> None:
flow = _ScriptedFlow()
flow._script = lambda f: _emit_llm_call(
flow_id=f._flow_match_id, prompt_tokens=250, completion_tokens=250
)
flow.kickoff()
flow.kickoff()
assert flow.usage_metrics.total_tokens == 500
assert flow.usage_metrics.successful_requests == 1
def test_usage_metrics_returns_independent_copy(self) -> None:
"""``usage_metrics`` must return a copy, not the internal instance —
otherwise callers can clobber the in-flight accumulator."""
flow = _run(
lambda f: _emit_llm_call(
flow_id=f._flow_match_id, prompt_tokens=50, completion_tokens=50
)
)
snapshot = flow.usage_metrics
snapshot.total_tokens = 999_999
assert flow.usage_metrics.total_tokens == 100
def test_handler_is_unregistered_after_kickoff(self) -> None:
"""Long-lived workers (Celery, devkit) must not leak one handler per
kickoff on the singleton bus, on either the success or failure path."""
def handler_count() -> int:
return len(
crewai_event_bus._sync_handlers.get(LLMCallCompletedEvent, frozenset())
)
before = handler_count()
flow = _ScriptedFlow()
flow._script = lambda f: _emit_llm_call(
flow_id=f._flow_match_id, prompt_tokens=5, completion_tokens=5
)
for _ in range(3):
flow.kickoff()
assert handler_count() == before
def boom(_f: Flow) -> None:
raise RuntimeError("boom")
failing = _ScriptedFlow()
failing._script = boom
with pytest.raises(RuntimeError, match="boom"):
failing.kickoff()
assert handler_count() == before
def test_kickoff_flushes_event_bus_before_returning(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""`kickoff_async` must drain pending LLMCallCompletedEvent handlers
before detaching the listener — otherwise late handlers landing on
the threadpool would be lost on short flows. Mirrors the flush
``Crew.kickoff()`` performs before reporting ``token_usage``."""
flush_calls: list[None] = []
original_flush = crewai_event_bus.flush
def tracked_flush(*args: Any, **kwargs: Any) -> bool:
flush_calls.append(None)
return original_flush(*args, **kwargs)
monkeypatch.setattr(crewai_event_bus, "flush", tracked_flush)
flow = _ScriptedFlow()
flow._script = lambda f: _emit_llm_call(
flow_id=f._flow_match_id, prompt_tokens=3, completion_tokens=4
)
flow.kickoff()
assert flush_calls, "kickoff did not flush the event bus before returning"
assert flow.usage_metrics.total_tokens == 7
def test_stale_handler_from_prior_kickoff_does_not_contaminate(self) -> None:
"""A handler still queued from a prior kickoff must not write into
a later kickoff's accumulator. The handler's closure captures its
own accumulator object, so any late writes land on an orphaned
instance and the live ``usage_metrics`` is unaffected."""
captured: dict[str, Any] = {}
def script(flow: Flow) -> None:
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=10, completion_tokens=10)
captured["handler"] = flow._usage_aggregation_handler
captured["match_id"] = flow._flow_match_id
flow = _run(script)
assert flow.usage_metrics.total_tokens == 20
flow._script = lambda f: None
flow.kickoff()
assert flow.usage_metrics.total_tokens == 0
stale_handler = captured["handler"]
assert stale_handler is not None
stale_event = LLMCallCompletedEvent(
call_id=str(uuid4()),
model="gpt-4o-mini",
response="ok",
call_type=LLMCallType.LLM_CALL,
usage={"prompt_tokens": 999, "completion_tokens": 999, "total_tokens": 1998},
)
ctx = contextvars.copy_context()
ctx.run(lambda: (current_flow_id.set(captured["match_id"]), stale_handler(object(), stale_event)))
assert flow.usage_metrics.total_tokens == 0
def test_pause_detaches_listener_and_does_not_leak(self) -> None:
"""When ``kickoff_async`` pauses for human feedback, the listener
must be detached from the singleton bus to avoid leaking handlers
across abandoned paused instances. Pre-pause LLM events still
count because the bus snapshots handlers at emit time. Late
events emitted after the pause returns do not count for this
instance — resume paths re-attach a fresh listener."""
from crewai.flow.async_feedback.types import HumanFeedbackPending
captured: dict[str, Any] = {}
class _PausingFlow(Flow):
@start()
def begin(self) -> None:
_emit_llm_call(
flow_id=self._flow_match_id,
prompt_tokens=10,
completion_tokens=20,
)
captured["pre_pause_total"] = self.usage_metrics.total_tokens
raise HumanFeedbackPending(
context=PendingFeedbackContext(
flow_id=self.flow_id,
flow_class="_PausingFlow",
method_name="begin",
method_output="content",
message="Review:",
)
)
with tempfile.TemporaryDirectory() as tmpdir:
persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db"))
flow = _PausingFlow(persistence=persistence)
result = flow.kickoff()
assert isinstance(result, HumanFeedbackPending)
assert captured["pre_pause_total"] == 30
assert flow._usage_aggregation_handler is None
# A late event emitted after the pause does not reach the
# detached listener, so the running total is unchanged.
_emit_llm_call(
flow_id=flow._flow_match_id,
prompt_tokens=2,
completion_tokens=3,
)
assert flow.usage_metrics.total_tokens == 30
def test_aggregates_resume_after_from_pending(self) -> None:
"""A flow restored via ``from_pending`` is a fresh instance with no
``_flow_match_id``; without seeding it, the listener attached in
``resume_async`` either ignores its own LLM calls or absorbs unrelated
ones. ``from_pending`` must seed the match id so the resume-phase
aggregator counts our own calls and only our own calls."""
class _ResumeFlow(Flow):
@start()
def begin(self) -> str:
return "content"
@listen(begin)
def on_begin(self, _feedback: Any) -> str:
_emit_llm_call(
flow_id=self._flow_match_id,
prompt_tokens=100,
completion_tokens=50,
)
_emit_llm_call(
flow_id="some-other-flow",
prompt_tokens=9_999,
completion_tokens=9_999,
)
return "done"
with tempfile.TemporaryDirectory() as tmpdir:
persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db"))
flow_id = "usage-resume-test"
persistence.save_pending_feedback(
flow_uuid=flow_id,
context=PendingFeedbackContext(
flow_id=flow_id,
flow_class="_ResumeFlow",
method_name="begin",
method_output="content",
message="Review:",
),
state_data={"id": flow_id},
)
flow = _ResumeFlow.from_pending(flow_id, persistence)
assert flow._flow_match_id == flow.flow_id
flow.resume("ok")
assert flow.usage_metrics.total_tokens == 150
assert flow.usage_metrics.prompt_tokens == 100
assert flow.usage_metrics.completion_tokens == 50
assert flow.usage_metrics.successful_requests == 1
def test_resume_aggregates_under_foreign_flow_context(self) -> None:
"""Resume must override an already-set ``current_flow_id`` so its
own LLM events match the listener's filter even when invoked from
inside another flow's active context."""
class _ResumeFlow(Flow):
@start()
def begin(self) -> str:
return "content"
@listen(begin)
def on_begin(self, _feedback: Any) -> str:
_emit_llm_call(
flow_id=self._flow_match_id,
prompt_tokens=42,
completion_tokens=8,
)
return "done"
with tempfile.TemporaryDirectory() as tmpdir:
persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db"))
flow_id = "resume-foreign-context"
persistence.save_pending_feedback(
flow_uuid=flow_id,
context=PendingFeedbackContext(
flow_id=flow_id,
flow_class="_ResumeFlow",
method_name="begin",
method_output="content",
message="Review:",
),
state_data={"id": flow_id},
)
foreign_token = current_flow_id.set("some-parent-flow")
try:
flow = _ResumeFlow.from_pending(flow_id, persistence)
flow.resume("ok")
finally:
current_flow_id.reset(foreign_token)
assert flow.usage_metrics.total_tokens == 50
assert flow.usage_metrics.successful_requests == 1

View File

@@ -77,12 +77,22 @@ class ComplexFlow(Flow):
return "complete"
def _attach_flow_definition(flow_class: type[Flow], methods: dict[str, object]) -> None:
def _attach_flow_definition(
flow_class: type[Flow], methods: dict[str, dict[str, object]]
) -> None:
flow_class._flow_definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": flow_class.__name__,
"methods": methods,
"methods": {
name: {
"do": {
"ref": f"{flow_class.__module__}:{flow_class.__name__}.{name}"
},
**spec,
}
for name, spec in methods.items()
},
}
)
@@ -125,13 +135,20 @@ def test_build_flow_structure_from_flow_definition():
"schema": "crewai.flow/v1",
"name": "DefinedFlow",
"methods": {
"begin": {"start": True},
"begin": {
"do": {"ref": "defined_flows:DefinedFlow.begin"},
"start": True,
},
"decide": {
"do": {"ref": "defined_flows:DefinedFlow.decide"},
"listen": "begin",
"router": True,
"emit": ["done"],
},
"finish": {"listen": "done"},
"finish": {
"do": {"ref": "defined_flows:DefinedFlow.finish"},
"listen": "done",
},
},
}
)

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.14.7a4"
__version__ = "1.14.7"