Compare commits

...

8 Commits

Author SHA1 Message Date
Lucas Gomide
fb2170407e Merge branch 'main' into luzk/flow-token-usage 2026-06-11 17:34:31 -03:00
Lucas Gomide
c48501ae38 fix: address PR review on flow.usage_metrics 2026-06-11 17:08:08 -03:00
Greyson LaLonde
21fa8e32d9 docs: update changelog and version for v1.14.7
Some checks failed
Check Documentation Broken Links / Check broken links (push) Waiting to run
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
2026-06-11 10:13:40 -07:00
Greyson LaLonde
f18c03cd8f feat: bump versions to 1.14.7 2026-06-11 10:06:07 -07:00
Lucas Gomide
b720139eca chore: ruff format runtime.py after PR review changes 2026-06-11 13:46:13 -03:00
Lucas Gomide
540f5df767 fix: address PR review on flow.usage_metrics
- Protect _aggregated_usage_metrics with a lock so concurrent
  LLMCallCompletedEvent handlers can't race the read-modify-write
  inside add_usage_metrics, and so usage_metrics snapshots are
  consistent.
- Wire the usage aggregation listener into resume_async so LLM
  calls during outcome collapsing and downstream crews continue
  to roll up into flow.usage_metrics after a paused-then-resumed
  kickoff. Restores current_flow_id to the original kickoff's
  match id when none is set, and detaches in finally.
- Guard against reentrant kickoff on the same Flow instance:
  only the outer kickoff captures _flow_match_id, resets the
  accumulator, and owns the listener lifecycle. Inner reentrant
  calls pass through and no longer wipe outer state or detach
  the shared handler.
- Rename test_snapshot_is_immutable to
  test_usage_metrics_returns_independent_copy to reflect that
  the property returns a copy of a (still-mutable) UsageMetrics.
- Extend test_handler_is_unregistered_after_kickoff to also
  cover the failure path, confirming the handler is removed
  when kickoff raises.
2026-06-11 13:39:46 -03:00
Lucas Gomide
c4476366ff fix: aggregate token usage across all LLM calls
`flow.kickoff().token_usage` only returned the last @listen method's
`CrewOutput.token_usage`, so multi-crew flows under-reported by a
factor of N and bare `LLM.call(...)` invocations were ignored
entirely. SDK totals therefore disagreed with the CrewAI Enterprise UI
(Wharf), which aggregates every LLM span.

Add a new `flow.usage_metrics` property that wires an
`LLMCallCompletedEvent` listener for the duration of `kickoff_async`.
The listener scopes to the active flow via the `current_flow_id`
contextvar (the event bus copies the context at emit time, so the
value the handler sees is the one set when the LLM call fired) and
normalizes the provider-specific usage dict into a `UsageMetrics`.
This covers every LLM call inside the flow — crew-led, tool-led, and
bare `LLM.call(...)` — and matches the UI totals 1:1.
2026-06-11 13:29:38 -03:00
Greyson LaLonde
50b9c02272 fix(checkpoint): rebuild custom BaseLLM as concrete LLM on restore
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
A custom BaseLLM subclass serializes with the inherited llm_type "base",
which the registry maps to the abstract BaseLLM. Restore then crashed on
cls(**value). Rebuild a concrete LLM from the saved config when the
resolved class is abstract.
2026-06-10 22:21:35 -07:00
25 changed files with 3233 additions and 245 deletions

View File

@@ -4,6 +4,55 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="11 يونيو 2026">
## v1.14.7
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
## ما الذي تغير
### الميزات
- إضافة واجهات خلفية افتراضية قابلة للتوصيل للذاكرة، والمعرفة، وrag، وflow.
- عرض السبب الحقيقي للإنهاء، ومعلمات العينة، وresponse.id في أحداث LLM.
- تصنيف مشغلات DSL كزخارف واعية للمسار.
- إضافة واجهة برمجة تطبيقات الدردشة لتدفقات المحادثة.
- جعل واجهة القفل قابلة للتجاوز.
- بناء FlowDefinition من بيانات التعريف الخاصة بـ Flow DSL.
- إضافة مزود LLM من Snowflake Cortex الأصلي.
- إضافة دعم لملفات الوكلاء المدربين من crew.
### إصلاحات الأخطاء
- إصلاح نقطة التحقق لإعادة بناء BaseLLM مخصص كـ LLM ملموس عند الاستعادة.
- تقييد الاستعادة على علامة لمنع اللقطات الحية من إعادة التشغيل كاستئناف.
- تحديد حالة وقت التشغيل لكل تشغيل للحد من النمو وعزل التشغيل المتزامن.
- إصلاح إعدادات التتبع على crewai-login.
- احترام suppress_flow_events لأحداث تنفيذ الطريقة.
- استعادة [project.scripts] في حزمة crewai لتثبيت أداة uv.
- حل مشكلات CVE الخاصة بـ pip-audit لـ aiohttp وdocling وdocling-core.
- إصلاح إدخال الملفات الذي لا يعمل بشكل موثوق.
- إصلاح تاريخ نتائج أدوات Snowflake Claude غير المكتملة.
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.7.
- تحديث وثائق جامع OpenTelemetry.
- تحديث دليل NVIDIA Nemotron LLM.
- إضافة دليل تكامل Databricks.
- إضافة دليل تكامل Snowflake.
### الأداء
- تحسين سرعة استيراد crewai من خلال تحميل مستندات docling بشكل كسول.
### إعادة الهيكلة
- تبسيط تقييم شروط التدفق ليكون بلا حالة لكل حدث.
- فصل منطق المحادثة عن وقت التشغيل وإضافة تعريف المحادثة.
- تقسيم `flow.py` إلى DSL، وتعريف، ووقت تشغيل.
## المساهمون
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
</Update>
<Update label="10 يونيو 2026">
## v1.14.7rc2

View File

@@ -226,6 +226,48 @@ counter=2 message='Hello from first_method - updated by second_method'
من خلال ضمان إعادة مخرجات الدالة الأخيرة وتوفير الوصول إلى الحالة، تجعل تدفقات CrewAI من السهل دمج نتائج سير عمل الذكاء الاصطناعي في التطبيقات أو الأنظمة الأكبر،
مع الحفاظ على الوصول إلى الحالة طوال تنفيذ التدفق.
## مقاييس استخدام التدفق
بعد اكتمال تنفيذ التدفق، يمكنك الوصول إلى الخاصية `usage_metrics` لعرض إجمالي استخدام التوكنات عبر **كل استدعاء لنموذج اللغة** يتم خلال التشغيل — بما في ذلك الاستدعاءات من كل فريق (Crew) ينظمه التدفق، والاستدعاءات داخل أدوات الـ Agents، والاستدعاءات المباشرة لـ `LLM.call(...)` من دوال التدفق. هذا هو المكافئ على جانب الـ SDK للإجماليات المعروضة في واجهة CrewAI Enterprise.
```python Code
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
class UsageMetricsFlow(Flow):
@start()
def run_first_crew(self):
self.state.first_result = FirstCrew().crew().kickoff()
@listen(run_first_crew)
def call_llm_directly(self):
# استدعاء مباشر لنموذج اللغة — يُحسب أيضًا ضمن flow.usage_metrics
llm = LLM(model="openai/gpt-4o-mini")
self.state.summary = llm.call("لخّص النقاط الرئيسية.")
@listen(call_llm_directly)
def run_second_crew(self):
self.state.second_result = SecondCrew().crew().kickoff()
flow = UsageMetricsFlow()
flow.kickoff()
print(flow.usage_metrics)
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
# cached_prompt_tokens=0, reasoning_tokens=0,
# cache_creation_tokens=0, successful_requests=5)
```
<Note>
`flow.usage_metrics` **ليست** نفس `flow.kickoff().token_usage`. هذه الأخيرة
ترجع فقط `CrewOutput.token_usage` لـ **آخر** دالة `@listen` أعادت
`CrewOutput`، مما يعني أنها تعكس فقط الفريق الأخير وتتجاهل الفرق السابقة
وكذلك أي استدعاءات مباشرة لـ `LLM.call(...)`. استخدم `flow.usage_metrics`
كلما احتجت إلى الإجمالي **الكامل** للتوكنات لتنفيذ التدفق.
</Note>
كل حقل في [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) المُعاد هو مجموع جميع استدعاءات نموذج اللغة التي حدثت خلال استدعاء واحد لـ `flow.kickoff()`. تتم إعادة تعيين العدادات عند الاستدعاء التالي لـ `kickoff()` (وفي كل تكرار من `kickoff_for_each`)، لذلك لن تتكرر العدّات عبر التشغيلات المتتالية. يمكن قراءة هذه الخاصية بأمان في أي وقت بعد اكتمال `kickoff()`؛ قراءتها أثناء التنفيذ تُرجع المجموع الجزئي المتراكم حتى تلك اللحظة.
## إدارة حالة التدفق
إدارة الحالة بفعالية أمر بالغ الأهمية لبناء سير عمل ذكاء اصطناعي موثوق وقابل للصيانة. توفر تدفقات CrewAI آليات قوية لإدارة الحالة غير المهيكلة والمهيكلة،

File diff suppressed because it is too large Load Diff

View File

@@ -4,6 +4,55 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Jun 11, 2026">
## v1.14.7
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
## What's Changed
### Features
- Add pluggable default backends for memory, knowledge, rag, and flow.
- Surface real finish_reason, sampling params, and response.id on LLM events.
- Type DSL triggers as route-aware decorators.
- Add chat API for conversational flows.
- Make locking backend overridable.
- Build FlowDefinition from Flow DSL metadata.
- Add native Snowflake Cortex LLM provider.
- Add crew trained agents file support.
### Bug Fixes
- Fix checkpoint to rebuild custom BaseLLM as concrete LLM on restore.
- Gate restore on a flag to prevent live snapshots from replaying as resume.
- Scope runtime state per run to bound growth and isolate concurrent runs.
- Fix telemetry setup on crewai-login.
- Respect suppress_flow_events for method-execution events.
- Restore [project.scripts] in crewai package for uv tool install.
- Resolve pip-audit CVEs for aiohttp, docling, and docling-core.
- Fix file input not working reliably.
- Fix Snowflake Claude incomplete tool result histories.
### Documentation
- Update changelog and version for v1.14.7.
- Update OpenTelemetry collector documentation.
- Update NVIDIA Nemotron LLM guide.
- Add Databricks integration guide.
- Add Snowflake integration guide.
### Performance
- Improve crewai import speed by lazy-loading docling imports.
### Refactoring
- Simplify flow condition evaluation to be stateless per event.
- Decouple convo logic from runtime and add a conversational_definition.
- Split `flow.py` into DSL, definition, and runtime.
## Contributors
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
</Update>
<Update label="Jun 10, 2026">
## v1.14.7rc2

View File

@@ -226,6 +226,49 @@ After the Flow has run, you can access the final state to see the updates made b
By ensuring that the final method's output is returned and providing access to the state, CrewAI Flows make it easy to integrate the results of your AI workflows into larger applications or systems,
while also maintaining and accessing the state throughout the Flow's execution.
## Flow Usage Metrics
After a Flow execution completes, you can access the `usage_metrics` property to view aggregated token usage across **every LLM call** made during the run — including calls from every Crew the Flow orchestrated, calls inside Agent tools, and bare `LLM.call(...)` invocations from Flow methods. This is the SDK-side equivalent of the totals shown in the CrewAI Enterprise UI.
```python Code
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
class UsageMetricsFlow(Flow):
@start()
def run_first_crew(self):
self.state.first_result = FirstCrew().crew().kickoff()
@listen(run_first_crew)
def call_llm_directly(self):
# Bare LLM call — still counted by flow.usage_metrics
llm = LLM(model="openai/gpt-4o-mini")
self.state.summary = llm.call("Summarize the key takeaways.")
@listen(call_llm_directly)
def run_second_crew(self):
self.state.second_result = SecondCrew().crew().kickoff()
flow = UsageMetricsFlow()
flow.kickoff()
print(flow.usage_metrics)
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
# cached_prompt_tokens=0, reasoning_tokens=0,
# cache_creation_tokens=0, successful_requests=5)
```
<Note>
`flow.usage_metrics` is **not** the same as `flow.kickoff().token_usage`. The
latter returns the `CrewOutput.token_usage` of the **last** `@listen` method
that returned a `CrewOutput`, which means it only reflects the final Crew and
ignores prior Crews and bare `LLM.call(...)` invocations entirely. Use
`flow.usage_metrics` whenever you need the **full** token rollup for the Flow
execution.
</Note>
Each entry in the returned [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) is the sum across all LLM calls made within a single `flow.kickoff()` invocation. Counters reset on the next `kickoff()` call (or on each iteration of `kickoff_for_each`), so successive runs don't double-count. The property is safe to read at any point after `kickoff()` completes; reading it during execution returns the partial total accumulated so far.
## Flow State Management
Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management,

View File

@@ -4,6 +4,55 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 6월 11일">
## v1.14.7
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
## 변경 사항
### 기능
- 메모리, 지식, RAG 및 흐름에 대한 플러그 가능한 기본 백엔드를 추가했습니다.
- LLM 이벤트에서 실제 finish_reason, 샘플링 매개변수 및 response.id를 표시합니다.
- 경로 인식 장식자로서의 타입 DSL 트리거를 설정합니다.
- 대화 흐름을 위한 채팅 API를 추가했습니다.
- 잠금 백엔드를 재정의 가능하도록 만듭니다.
- Flow DSL 메타데이터에서 FlowDefinition을 빌드합니다.
- 네이티브 Snowflake Cortex LLM 공급자를 추가했습니다.
- 훈련된 에이전트 파일 지원을 추가했습니다.
### 버그 수정
- 복원 시 사용자 정의 BaseLLM을 구체적인 LLM으로 재구성하도록 체크포인트를 수정했습니다.
- 라이브 스냅샷이 재개로 재생되지 않도록 플래그를 사용하여 복원을 제한합니다.
- 실행마다 런타임 상태의 범위를 설정하여 성장을 제한하고 동시 실행을 격리합니다.
- crewai-login에서 텔레메트리 설정을 수정했습니다.
- 메서드 실행 이벤트에 대해 suppress_flow_events를 존중합니다.
- uv 도구 설치를 위해 crewai 패키지에서 [project.scripts]를 복원합니다.
- aiohttp, docling 및 docling-core에 대한 pip-audit CVE를 해결합니다.
- 파일 입력이 신뢰할 수 없게 작동하는 문제를 수정했습니다.
- Snowflake Claude의 불완전한 도구 결과 기록을 수정했습니다.
### 문서
- v1.14.7에 대한 변경 로그 및 버전을 업데이트했습니다.
- OpenTelemetry 수집기 문서를 업데이트했습니다.
- NVIDIA Nemotron LLM 가이드를 업데이트했습니다.
- Databricks 통합 가이드를 추가했습니다.
- Snowflake 통합 가이드를 추가했습니다.
### 성능
- docling 가져오기를 지연 로딩하여 crewai 가져오기 속도를 개선했습니다.
### 리팩토링
- 흐름 조건 평가를 이벤트별로 상태 비저장으로 단순화했습니다.
- 대화 논리를 런타임에서 분리하고 conversational_definition을 추가했습니다.
- `flow.py`를 DSL, 정의 및 런타임으로 분리했습니다.
## 기여자
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
</Update>
<Update label="2026년 6월 10일">
## v1.14.7rc2

View File

@@ -221,6 +221,48 @@ Flow가 실행된 후, 이러한 메소드들에 의해 수행된 업데이트
최종 메소드의 출력이 반환되고 상태에 접근할 수 있도록 함으로써, CrewAI Flow는 AI 워크플로우의 결과를 더 큰 애플리케이션이나 시스템에 쉽게 통합할 수 있게 하며,
Flow 실행 과정 전반에 걸쳐 상태를 유지하고 접근하면서도 이를 용이하게 만듭니다.
## 플로우 사용 메트릭
Flow 실행이 완료된 후, `usage_metrics` 속성에 접근하여 실행 동안 발생한 **모든 LLM 호출**의 토큰 사용량 집계를 확인할 수 있습니다. 여기에는 Flow가 오케스트레이션한 모든 Crew의 호출, Agent의 도구 내부에서 발생한 호출, 그리고 Flow 메서드에서 직접 호출한 `LLM.call(...)`이 모두 포함됩니다. 이는 CrewAI Enterprise UI에 표시되는 총량과 동등한 SDK 측 값입니다.
```python Code
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
class UsageMetricsFlow(Flow):
@start()
def run_first_crew(self):
self.state.first_result = FirstCrew().crew().kickoff()
@listen(run_first_crew)
def call_llm_directly(self):
# 직접 LLM 호출 — flow.usage_metrics에서도 집계됩니다
llm = LLM(model="openai/gpt-4o-mini")
self.state.summary = llm.call("핵심 내용을 요약해 주세요.")
@listen(call_llm_directly)
def run_second_crew(self):
self.state.second_result = SecondCrew().crew().kickoff()
flow = UsageMetricsFlow()
flow.kickoff()
print(flow.usage_metrics)
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
# cached_prompt_tokens=0, reasoning_tokens=0,
# cache_creation_tokens=0, successful_requests=5)
```
<Note>
`flow.usage_metrics`는 `flow.kickoff().token_usage`와 **동일하지 않습니다**.
후자는 `CrewOutput`을 반환한 **마지막** `@listen` 메서드의
`CrewOutput.token_usage`만 반환하므로, 이전에 실행된 Crew들과 Flow 메서드에서
직접 호출한 `LLM.call(...)`은 전혀 포함되지 않습니다. Flow 실행에 대한
**전체** 토큰 집계가 필요할 때는 항상 `flow.usage_metrics`를 사용하십시오.
</Note>
반환되는 [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py)의 각 항목은 단일 `flow.kickoff()` 실행 동안 발생한 모든 LLM 호출의 합계입니다. 다음 `kickoff()` 호출(및 `kickoff_for_each`의 각 반복)에서 카운터가 초기화되므로 연속 실행이 이중으로 집계되지 않습니다. 이 속성은 `kickoff()` 완료 후 언제든지 안전하게 읽을 수 있으며, 실행 중에 읽으면 그 시점까지 누적된 부분 합계를 반환합니다.
## 플로우 상태 관리
상태를 효과적으로 관리하는 것은 신뢰할 수 있고 유지 보수가 용이한 AI 워크플로를 구축하는 데 매우 중요합니다. CrewAI 플로우는 비정형 및 정형 상태 관리를 위한 강력한 메커니즘을 제공하여, 개발자가 자신의 애플리케이션에 가장 적합한 접근 방식을 선택할 수 있도록 합니다.

View File

@@ -4,6 +4,55 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="11 jun 2026">
## v1.14.7
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7)
## O que Mudou
### Recursos
- Adicionar backends padrão plugáveis para memória, conhecimento, rag e fluxo.
- Exibir o verdadeiro finish_reason, parâmetros de amostragem e response.id em eventos LLM.
- Tipar os gatilhos DSL como decoradores cientes de rotas.
- Adicionar API de chat para fluxos de conversa.
- Tornar o backend de bloqueio substituível.
- Construir FlowDefinition a partir de metadados Flow DSL.
- Adicionar provedor nativo Snowflake Cortex LLM.
- Adicionar suporte a arquivos de agentes treinados pela equipe.
### Correções de Bugs
- Corrigir checkpoint para reconstruir BaseLLM personalizado como LLM concreto na restauração.
- Controlar a restauração com uma flag para evitar que snapshots ao vivo sejam reproduzidos como retomar.
- Escopar o estado de execução por execução para limitar o crescimento e isolar execuções concorrentes.
- Corrigir configuração de telemetria no crewai-login.
- Respeitar suppress_flow_events para eventos de execução de método.
- Restaurar [project.scripts] no pacote crewai para instalação da ferramenta uv.
- Resolver CVEs de pip-audit para aiohttp, docling e docling-core.
- Corrigir entrada de arquivo que não estava funcionando de forma confiável.
- Corrigir histórias de resultados de ferramentas incompletas do Snowflake Claude.
### Documentação
- Atualizar changelog e versão para v1.14.7.
- Atualizar documentação do coletor OpenTelemetry.
- Atualizar guia do LLM NVIDIA Nemotron.
- Adicionar guia de integração do Databricks.
- Adicionar guia de integração do Snowflake.
### Desempenho
- Melhorar a velocidade de importação do crewai através do carregamento preguiçoso de imports do docling.
### Refatoração
- Simplificar a avaliação de condições de fluxo para ser sem estado por evento.
- Desacoplar a lógica de conversa da execução e adicionar uma conversational_definition.
- Dividir `flow.py` em DSL, definição e execução.
## Contribuidores
@Luzk, @alex-clawd, @devin-ai-integration[bot], @greysonlalonde, @gvieira, @jessemiller, @lorenzejay, @lucasgomide, @mattatcha, @vinibrsl
</Update>
<Update label="10 jun 2026">
## v1.14.7rc2

View File

@@ -219,6 +219,49 @@ Após o término da execução, é possível acessar o estado final e observar a
Ao garantir que a saída do método final seja retornada e oferecer acesso ao estado, o CrewAI Flows facilita a integração dos resultados dos seus workflows de IA em aplicações maiores,
além de permitir o gerenciamento e o acesso ao estado durante toda a execução do Flow.
## Métricas de Uso do Flow
Após a execução de um Flow, você pode acessar a propriedade `usage_metrics` para visualizar o consumo agregado de tokens em **todas as chamadas de LLM** realizadas durante a execução — incluindo chamadas das Crews orquestradas pelo Flow, chamadas dentro de tools de Agents, e invocações diretas de `LLM.call(...)` feitas a partir de métodos do Flow. Esse é o equivalente, do lado do SDK, ao total exibido na interface do CrewAI Enterprise.
```python Code
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
class UsageMetricsFlow(Flow):
@start()
def run_first_crew(self):
self.state.first_result = FirstCrew().crew().kickoff()
@listen(run_first_crew)
def call_llm_directly(self):
# Chamada direta de LLM — também contabilizada por flow.usage_metrics
llm = LLM(model="openai/gpt-4o-mini")
self.state.summary = llm.call("Resuma os principais pontos.")
@listen(call_llm_directly)
def run_second_crew(self):
self.state.second_result = SecondCrew().crew().kickoff()
flow = UsageMetricsFlow()
flow.kickoff()
print(flow.usage_metrics)
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
# cached_prompt_tokens=0, reasoning_tokens=0,
# cache_creation_tokens=0, successful_requests=5)
```
<Note>
`flow.usage_metrics` **não** é o mesmo que `flow.kickoff().token_usage`. Este
último retorna apenas o `CrewOutput.token_usage` do **último** método
`@listen` que retornou um `CrewOutput`, ou seja, reflete somente a Crew
final e ignora completamente as Crews anteriores e quaisquer chamadas
diretas de `LLM.call(...)`. Use `flow.usage_metrics` sempre que precisar do
rollup **completo** de tokens da execução do Flow.
</Note>
Cada campo do [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) retornado representa a soma de todas as chamadas de LLM feitas em uma única invocação de `flow.kickoff()`. Os contadores são resetados a cada novo `kickoff()` (e em cada iteração de `kickoff_for_each`), de modo que execuções sucessivas não duplicam o total. A propriedade é segura para ser lida em qualquer momento após o `kickoff()`; lê-la durante a execução retorna o total parcial acumulado até aquele instante.
## Gerenciamento de Estado em Flows
Gerenciar o estado de forma eficaz é fundamental para construir fluxos de trabalho de IA confiáveis e de fácil manutenção. O CrewAI Flows oferece mecanismos robustos para o gerenciamento de estado tanto não estruturado quanto estruturado,

View File

@@ -8,7 +8,7 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7rc2",
"crewai-core==1.14.7",
"click>=8.1.7,<9",
"pydantic>=2.11.9,<2.13",
"pydantic-settings~=2.10.1",

View File

@@ -1 +1 @@
__version__ = "1.14.7rc2"
__version__ = "1.14.7"

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7rc2"
"crewai[tools]==1.14.7"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7rc2"
"crewai[tools]==1.14.7"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7rc2"
"crewai[tools]==1.14.7"
]
[tool.crewai]

View File

@@ -1 +1 @@
__version__ = "1.14.7rc2"
__version__ = "1.14.7"

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.7rc2"
__version__ = "1.14.7"

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
"pytube~=15.0.0",
"requests>=2.33.0,<3",
"crewai==1.14.7rc2",
"crewai==1.14.7",
"tiktoken>=0.8.0,<0.13",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -330,4 +330,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.14.7rc2"
__version__ = "1.14.7"

View File

@@ -8,8 +8,8 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7rc2",
"crewai-cli==1.14.7rc2",
"crewai-core==1.14.7",
"crewai-cli==1.14.7",
# Core Dependencies
"pydantic>=2.11.9,<2.13",
"openai>=2.30.0,<3",
@@ -54,7 +54,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.14.7rc2",
"crewai-tools==1.14.7",
]
embeddings = [
"tiktoken>=0.8.0,<0.13"

View File

@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.14.7rc2"
__version__ = "1.14.7"
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
"Memory": ("crewai.memory.unified_memory", "Memory"),

View File

@@ -82,6 +82,7 @@ _LLM_TYPE_REGISTRY: dict[str, str] = {
def _validate_llm_ref(value: Any) -> Any:
if isinstance(value, dict):
import importlib
import inspect
llm_type = value.get("llm_type")
if not llm_type or llm_type not in _LLM_TYPE_REGISTRY:
@@ -92,6 +93,12 @@ def _validate_llm_ref(value: Any) -> Any:
dotted = _LLM_TYPE_REGISTRY[llm_type]
mod_path, cls_name = dotted.rsplit(".", 1)
cls = getattr(importlib.import_module(mod_path), cls_name)
if inspect.isabstract(cls):
from crewai.llm import LLM
return LLM(
**{k: v for k, v in value.items() if v is not None and k != "llm_type"}
)
return cls(**value)
return value

View File

@@ -84,6 +84,7 @@ from crewai.events.types.flow_events import (
MethodExecutionPausedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import LLMCallCompletedEvent
from crewai.flow.dsl._utils import build_flow_definition
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
@@ -129,6 +130,7 @@ if TYPE_CHECKING:
from crewai.flow.visualization import build_flow_structure, render_interactive
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.env import get_env_context
from crewai.utilities.streaming import (
TaskInfo,
@@ -153,6 +155,32 @@ ExecutionContext = Any # type: ignore[assignment,misc]
logger = logging.getLogger(__name__)
def _usage_dict_to_metrics(usage: dict[str, Any] | None) -> UsageMetrics | None:
if not usage:
return None
def _int(key: str) -> int:
value = usage.get(key)
try:
return int(value) if value is not None else 0
except (TypeError, ValueError):
return 0
prompt_tokens = _int("prompt_tokens")
completion_tokens = _int("completion_tokens")
total_tokens = _int("total_tokens") or (prompt_tokens + completion_tokens)
return UsageMetrics(
total_tokens=total_tokens,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cached_prompt_tokens=_int("cached_prompt_tokens"),
reasoning_tokens=_int("reasoning_tokens"),
cache_creation_tokens=_int("cache_creation_tokens"),
successful_requests=1,
)
def _condition_branches(
condition: dict[str, Any],
) -> tuple[Literal["and", "or"], list[FlowDefinitionCondition]]:
@@ -905,6 +933,20 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list)
_state: Any = PrivateAttr(default=None)
_deferred_flow_started_event_id: str | None = PrivateAttr(default=None)
_aggregated_usage_metrics: UsageMetrics = PrivateAttr(default_factory=UsageMetrics)
# Serializes mutations and snapshot reads on `_aggregated_usage_metrics`.
# The bus dispatches sync handlers from a `ThreadPoolExecutor`, so two
# concurrent `LLMCallCompletedEvent`s can race the read-modify-write
# inside `add_usage_metrics`.
_usage_metrics_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_flow_match_id: str | None = PrivateAttr(default=None)
_usage_aggregation_handler: Callable[..., Any] | None = PrivateAttr(default=None)
# Incremented on every kickoff that takes ownership of usage aggregation.
# The listener closure snapshots the epoch at attach time; a stale
# handler still queued in the bus thread pool from a prior kickoff
# compares its snapshot against the current value and bails out so it
# cannot contaminate a later kickoff's accumulator.
_usage_epoch: int = PrivateAttr(default=0)
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
class _FlowGeneric(cls): # type: ignore[valid-type,misc]
@@ -967,6 +1009,60 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
method = method.__get__(self, self.__class__)
self._methods[FlowMethodName(method_name)] = method
def _attach_usage_aggregation_listener(self) -> None:
"""Wire an ``LLMCallCompletedEvent`` listener for the duration of one
``kickoff_async`` call.
"""
if self._usage_aggregation_handler is not None:
return
flow_ref = self
captured_epoch = self._usage_epoch
def _accumulate(source: Any, event: LLMCallCompletedEvent) -> None:
# Stale-handler guard: the bus dispatches sync handlers on a
# thread pool that `emit` does not wait on, so a handler from
# a prior kickoff can still be queued when a later kickoff
# bumps the epoch and resets the accumulator. Bail out so we
# don't leak prior-run usage into the new accumulator.
if captured_epoch != flow_ref._usage_epoch:
return
if current_flow_id.get() != flow_ref._flow_match_id:
return
metrics = _usage_dict_to_metrics(event.usage)
if metrics is None:
return
with flow_ref._usage_metrics_lock:
flow_ref._aggregated_usage_metrics.add_usage_metrics(metrics)
crewai_event_bus.on(LLMCallCompletedEvent)(_accumulate)
self._usage_aggregation_handler = _accumulate
def _detach_usage_aggregation_listener(self) -> None:
handler = self._usage_aggregation_handler
if handler is None:
return
crewai_event_bus.off(LLMCallCompletedEvent, handler)
self._usage_aggregation_handler = None
@property
def usage_metrics(self) -> UsageMetrics:
"""Aggregated LLM token usage for the most recent kickoff (or
resume) of this flow instance.
Aggregation is correlated by the ``current_flow_id`` contextvar
captured at kickoff time. Nested kickoffs (a parent flow calling
a child flow's ``kickoff``) intentionally roll the child's
tokens up into the parent because the contextvar is inherited.
Sibling kickoffs that run in parallel under the same parent
contextvar share the same correlation id and may therefore
over-count each other; if you need strict per-flow isolation
in that pattern, run the children in separate tasks that
explicitly set their own ``current_flow_id`` before kickoff.
"""
with self._usage_metrics_lock:
return self._aggregated_usage_metrics.model_copy()
def recall(self, query: str, **kwargs: Any) -> Any:
"""Recall relevant memories. Delegates to this flow's memory.
@@ -1261,6 +1357,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
instance._initialize_state(state_data)
instance._pending_feedback_context = pending_context
instance._is_execution_resuming = True
# Seed the usage-aggregation match id so `resume_async` can wire its
# listener and restore `current_flow_id` correctly. Without this,
# a restored flow has a None match id and the handler would either
# ignore its own LLM calls or absorb unrelated ones from sibling
# flows. The accumulator itself starts at zero — any usage from
# before the pause was only observable on the original kickoff
# instance.
instance._flow_match_id = instance.flow_id
return instance
@@ -1359,201 +1463,228 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
"No pending feedback context. Use from_pending() to restore a paused flow."
)
if get_current_parent_id() is None:
reset_emission_counter()
reset_last_event_id()
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=None,
),
)
if future and isinstance(future, Future):
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowStartedEvent handler failed", exc_info=True)
get_env_context()
context = self._pending_feedback_context
emit = context.emit
default_outcome = context.default_outcome
# Try to get the live LLM from the re-imported decorator first.
# This preserves the fully-configured object (credentials, safety_settings, etc.)
# for same-process resume. For cross-process resume, fall back to the
# serialized context.llm which is now a dict with full config (or a legacy string).
from crewai.flow.human_feedback import _deserialize_llm_from_context
llm = None
method = self._methods.get(FlowMethodName(context.method_name))
if method is not None:
live_llm = getattr(method, "_human_feedback_llm", None)
if live_llm is not None:
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
if isinstance(live_llm, BaseLLMClass):
llm = live_llm
if llm is None:
llm = _deserialize_llm_from_context(context.llm)
collapsed_outcome: str | None = None
if not feedback.strip():
if default_outcome:
collapsed_outcome = default_outcome
elif emit:
collapsed_outcome = emit[0]
elif emit:
if llm is not None:
collapsed_outcome = self._collapse_to_outcome(
feedback=feedback,
outcomes=emit,
llm=llm,
)
else:
collapsed_outcome = emit[0]
result = HumanFeedbackResult(
output=context.method_output,
feedback=feedback,
outcome=collapsed_outcome,
timestamp=datetime.now(),
method_name=context.method_name,
metadata=context.metadata,
)
self.human_feedback_history.append(result)
self.last_human_feedback = result
self._completed_methods.add(FlowMethodName(context.method_name))
self._pending_feedback_context = None
if self.persistence is not None:
self.persistence.clear_pending_feedback(context.flow_id)
if not self.suppress_flow_events:
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
flow_name=self.name or self.__class__.__name__,
method_name=context.method_name,
result=collapsed_outcome if emit else result,
state=self._state,
),
)
# Clear resumption flag before triggering listeners
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
self._is_execution_resuming = False
if emit and collapsed_outcome is None:
collapsed_outcome = default_outcome or emit[0]
result.outcome = collapsed_outcome
# Wire usage aggregation for the resume phase. The previous
# `kickoff_async`/`resume_async` already detached its listener,
# and `_attach_usage_aggregation_listener` is idempotent, so we
# can always call it here. We also restore `current_flow_id`
# when missing so the handler's filter passes for LLM calls
# made during outcome collapsing and downstream listener
# execution.
flow_id_token = None
if current_flow_id.get() is None and self._flow_match_id is not None:
flow_id_token = current_flow_id.set(self._flow_match_id)
self._attach_usage_aggregation_listener()
try:
if emit and collapsed_outcome:
self._method_outputs.append(collapsed_outcome)
await self._execute_listeners(
FlowMethodName(collapsed_outcome),
result,
)
else:
await self._execute_listeners(
FlowMethodName(context.method_name),
result,
)
except Exception as e:
# Check if flow was paused again for human feedback (loop case)
from crewai.flow.async_feedback.types import HumanFeedbackPending
if get_current_parent_id() is None:
reset_emission_counter()
reset_last_event_id()
if isinstance(e, HumanFeedbackPending):
self._pending_feedback_context = e.context
if self.persistence is None:
from crewai.flow.persistence.factory import default_flow_persistence
self.persistence = default_flow_persistence()
state_data = (
self._state
if isinstance(self._state, dict)
else self._state.model_dump()
)
self.persistence.save_pending_feedback(
flow_uuid=e.context.flow_id,
context=e.context,
state_data=state_data,
)
crewai_event_bus.emit(
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
FlowPausedEvent(
type="flow_paused",
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
flow_id=e.context.flow_id,
method_name=e.context.method_name,
state=self._copy_and_serialize_state(),
message=e.context.message,
emit=e.context.emit,
inputs=None,
),
)
return e
raise
if future and isinstance(future, Future):
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowStartedEvent handler failed", exc_info=True)
final_result = self._method_outputs[-1] if self._method_outputs else result
get_env_context()
if self._event_futures:
await asyncio.gather(
*[
asyncio.wrap_future(f)
for f in self._event_futures
if isinstance(f, Future)
]
)
self._event_futures.clear()
context = self._pending_feedback_context
emit = context.emit
default_outcome = context.default_outcome
if (
not self.suppress_flow_events
and not self._should_defer_trace_finalization()
):
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
result=final_result,
state=self._copy_and_serialize_state(),
),
)
if future and isinstance(future, Future):
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowFinishedEvent handler failed", exc_info=True)
# Try to get the live LLM from the re-imported decorator first.
# This preserves the fully-configured object (credentials, safety_settings, etc.)
# for same-process resume. For cross-process resume, fall back to the
# serialized context.llm which is now a dict with full config (or a legacy string).
from crewai.flow.human_feedback import _deserialize_llm_from_context
trace_listener = TraceCollectionListener()
if (
trace_listener.batch_manager.batch_owner_type == "flow"
and current_flow_id.get() == self.flow_id
and not trace_listener.batch_manager.defer_session_finalization
and not current_flow_defer_trace_finalization.get()
):
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
llm = None
method = self._methods.get(FlowMethodName(context.method_name))
if method is not None:
live_llm = getattr(method, "_human_feedback_llm", None)
if live_llm is not None:
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
if isinstance(live_llm, BaseLLMClass):
llm = live_llm
if llm is None:
llm = _deserialize_llm_from_context(context.llm)
collapsed_outcome: str | None = None
if not feedback.strip():
if default_outcome:
collapsed_outcome = default_outcome
elif emit:
collapsed_outcome = emit[0]
elif emit:
if llm is not None:
collapsed_outcome = self._collapse_to_outcome(
feedback=feedback,
outcomes=emit,
llm=llm,
)
else:
trace_listener.batch_manager.finalize_batch()
collapsed_outcome = emit[0]
return final_result
result = HumanFeedbackResult(
output=context.method_output,
feedback=feedback,
outcome=collapsed_outcome,
timestamp=datetime.now(),
method_name=context.method_name,
metadata=context.metadata,
)
self.human_feedback_history.append(result)
self.last_human_feedback = result
self._completed_methods.add(FlowMethodName(context.method_name))
self._pending_feedback_context = None
if self.persistence is not None:
self.persistence.clear_pending_feedback(context.flow_id)
if not self.suppress_flow_events:
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
flow_name=self.name or self.__class__.__name__,
method_name=context.method_name,
result=collapsed_outcome if emit else result,
state=self._state,
),
)
# Clear resumption flag before triggering listeners
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
self._is_execution_resuming = False
if emit and collapsed_outcome is None:
collapsed_outcome = default_outcome or emit[0]
result.outcome = collapsed_outcome
try:
if emit and collapsed_outcome:
self._method_outputs.append(collapsed_outcome)
await self._execute_listeners(
FlowMethodName(collapsed_outcome),
result,
)
else:
await self._execute_listeners(
FlowMethodName(context.method_name),
result,
)
except Exception as e:
# Check if flow was paused again for human feedback (loop case)
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
self._pending_feedback_context = e.context
if self.persistence is None:
from crewai.flow.persistence.factory import (
default_flow_persistence,
)
self.persistence = default_flow_persistence()
state_data = (
self._state
if isinstance(self._state, dict)
else self._state.model_dump()
)
self.persistence.save_pending_feedback(
flow_uuid=e.context.flow_id,
context=e.context,
state_data=state_data,
)
crewai_event_bus.emit(
self,
FlowPausedEvent(
type="flow_paused",
flow_name=self.name or self.__class__.__name__,
flow_id=e.context.flow_id,
method_name=e.context.method_name,
state=self._copy_and_serialize_state(),
message=e.context.message,
emit=e.context.emit,
),
)
return e
raise
final_result = self._method_outputs[-1] if self._method_outputs else result
if self._event_futures:
await asyncio.gather(
*[
asyncio.wrap_future(f)
for f in self._event_futures
if isinstance(f, Future)
]
)
self._event_futures.clear()
if (
not self.suppress_flow_events
and not self._should_defer_trace_finalization()
):
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
result=final_result,
state=self._copy_and_serialize_state(),
),
)
if future and isinstance(future, Future):
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning(
"FlowFinishedEvent handler failed", exc_info=True
)
trace_listener = TraceCollectionListener()
if (
trace_listener.batch_manager.batch_owner_type == "flow"
and current_flow_id.get() == self.flow_id
and not trace_listener.batch_manager.defer_session_finalization
and not current_flow_defer_trace_finalization.get()
):
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
return final_result
finally:
# Always release the singleton-bus listener at the end of
# this resume call. In-flight handlers the bus already
# snapshotted continue updating `_aggregated_usage_metrics`;
# only future emits stop reaching this flow. A subsequent
# `resume_async` on the same instance re-attaches a fresh
# listener.
self._detach_usage_aggregation_listener()
if flow_id_token is not None:
current_flow_id.reset(flow_id_token)
def _create_initial_state(self) -> T:
"""Create and initialize flow state with UUID and default values.
@@ -2056,6 +2187,25 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
request_id_token = current_flow_request_id.set(self.flow_id)
runtime_scope = crewai_event_bus._enter_runtime_scope()
# Guard against a reentrant kickoff on the same Flow instance: only
# the outermost call captures `_flow_match_id`, resets the accumulator,
# and owns the listener lifecycle. An inner reentrant call passes
# through so it doesn't wipe outer's state or detach the shared handler.
owns_usage_aggregation = self._usage_aggregation_handler is None
if owns_usage_aggregation:
# Capture the flow id seen by `FlowTrackable._set_flow_context` so
# we can match LLM call events back to this flow even if `state.id`
# gets overwritten later by `inputs["id"]`.
self._flow_match_id = current_flow_id.get()
self._aggregated_usage_metrics = UsageMetrics()
# Bump the epoch BEFORE attaching so any in-flight handler from
# a prior kickoff queued in the bus thread pool sees its stale
# snapshot and bails out instead of writing into the fresh
# accumulator.
self._usage_epoch += 1
self._attach_usage_aggregation_listener()
try:
# Reset flow state for fresh execution unless restoring from persistence
is_restoring = (
@@ -2345,6 +2495,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
# Ensure all background memory saves complete before returning
if self.memory is not None and hasattr(self.memory, "drain_writes"):
self.memory.drain_writes()
# Detach the singleton-bus listener as soon as this kickoff
# finishes (whether by completion, pause, or error). Handlers
# the bus already snapshotted via `_sync_executor.submit` keep
# running and updating `_aggregated_usage_metrics`; only
# subsequent emits stop reaching this flow. Resume paths
# re-attach a fresh listener via `resume_async`.
if owns_usage_aggregation:
self._detach_usage_aggregation_listener()
if request_id_token is not None:
current_flow_request_id.reset(request_id_token)
if flow_defer_trace_finalization_token is not None:

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import inspect
import json
import os
import sqlite3
@@ -742,3 +743,26 @@ class TestCheckpointReusedExecutor:
assert len(result.tasks_output) == 2
assert result.tasks_output[1].raw
class TestCustomLLMCheckpointRestore:
"""A custom BaseLLM subclass serializes with the inherited llm_type "base".
Restoring it must not try to instantiate the abstract BaseLLM; it is rebuilt
as a concrete LLM from the saved config instead.
"""
def test_restore_does_not_instantiate_abstract_base_llm(self) -> None:
agent = Agent(role="r", goal="g", backstory="b", llm=_FinalAnswerLLM())
task = Task(description="d", expected_output="e", agent=agent)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
raw = RuntimeState(root=[crew]).model_dump_json()
restored = RuntimeState.model_validate_json(
raw, context={"from_checkpoint": True}
)
llm = restored.root[0].agents[0].llm
assert isinstance(llm, BaseLLM)
assert not inspect.isabstract(type(llm))
assert llm.model == "stub"

View File

@@ -0,0 +1,394 @@
"""Tests for flow-level token usage aggregation
``flow.usage_metrics`` listens to ``LLMCallCompletedEvent`` for the duration
of ``kickoff_async`` so it covers every LLM call inside the flow — crew-led,
tool-led, AND bare ``LLM.call(...)`` from a flow method. We exercise the
aggregator end-to-end through the real event bus with fabricated events and
explicit contextvar control; no live LLM provider is required.
"""
from __future__ import annotations
import contextvars
import os
import tempfile
from typing import Any, Callable
from uuid import uuid4
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent, LLMCallType
from crewai.flow.async_feedback.types import PendingFeedbackContext
from crewai.flow.flow import Flow, listen, start
from crewai.flow.flow_context import current_flow_id
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
from crewai.flow.runtime import _usage_dict_to_metrics
from crewai.types.usage_metrics import UsageMetrics
def _emit_llm_call(
*,
flow_id: str | None,
prompt_tokens: int = 0,
completion_tokens: int = 0,
cached_prompt_tokens: int = 0,
reasoning_tokens: int = 0,
cache_creation_tokens: int = 0,
) -> None:
"""Emit one fake ``LLMCallCompletedEvent`` with ``current_flow_id`` pinned
to ``flow_id``.
Runs in a freshly-copied context so the value the bus snapshots at emit
time is exactly ``flow_id`` — independent of the calling thread's outer
context. Mirrors how the real ``LLM.call`` emits events at runtime.
"""
usage: dict[str, Any] = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
}
for key, value in (
("cached_prompt_tokens", cached_prompt_tokens),
("reasoning_tokens", reasoning_tokens),
("cache_creation_tokens", cache_creation_tokens),
):
if value:
usage[key] = value
event = LLMCallCompletedEvent(
call_id=str(uuid4()),
model="gpt-4o-mini",
response="ok",
call_type=LLMCallType.LLM_CALL,
usage=usage,
)
ctx = contextvars.copy_context()
def _emit() -> None:
current_flow_id.set(flow_id)
future = crewai_event_bus.emit(object(), event)
if future is not None:
future.result(timeout=5.0)
ctx.run(_emit)
class _ScriptedFlow(Flow):
"""A Flow whose ``@start`` delegates to a per-instance ``_script`` closure.
Each test attaches a script with ``flow._script = lambda f: ...`` so we
don't redefine a Flow subclass for every scenario.
"""
@start()
def run(self) -> None:
script: Callable[[Flow], None] = getattr(self, "_script", lambda _f: None)
script(self)
def _run(script: Callable[[Flow], None] = lambda _f: None) -> Flow:
"""Build a ``_ScriptedFlow``, attach ``script``, kickoff. Returns the flow."""
flow = _ScriptedFlow()
flow._script = script
flow.kickoff()
return flow
class TestUsageDictToMetrics:
"""Unit tests for the dict-to-UsageMetrics normalizer."""
@pytest.mark.parametrize(
"usage, expected",
[
(None, None),
({}, None),
(
{"prompt_tokens": 10, "completion_tokens": 20, "total_tokens": 30},
UsageMetrics(
prompt_tokens=10,
completion_tokens=20,
total_tokens=30,
successful_requests=1,
),
),
# total_tokens missing → derived from prompt + completion
(
{"prompt_tokens": 4, "completion_tokens": 6},
UsageMetrics(
prompt_tokens=4,
completion_tokens=6,
total_tokens=10,
successful_requests=1,
),
),
# Extended provider-specific keys flow through normalization
(
{
"prompt_tokens": 100,
"completion_tokens": 80,
"total_tokens": 180,
"cached_prompt_tokens": 40,
"reasoning_tokens": 25,
"cache_creation_tokens": 10,
},
UsageMetrics(
prompt_tokens=100,
completion_tokens=80,
total_tokens=180,
cached_prompt_tokens=40,
reasoning_tokens=25,
cache_creation_tokens=10,
successful_requests=1,
),
),
# Garbage / non-int values coerce to 0 instead of crashing
(
{"prompt_tokens": "n/a", "completion_tokens": None, "total_tokens": 7},
UsageMetrics(
prompt_tokens=0,
completion_tokens=0,
total_tokens=7,
successful_requests=1,
),
),
],
ids=["none", "empty", "all_keys", "no_total", "extended_keys", "garbage"],
)
def test_normalization(
self, usage: dict[str, Any] | None, expected: UsageMetrics | None
) -> None:
assert _usage_dict_to_metrics(usage) == expected
class TestFlowUsageAggregation:
"""End-to-end tests driving the listener through the real event bus."""
def test_sums_every_llm_call_in_the_flow(self) -> None:
"""Multiple LLM calls — including bare ``LLM.call(...)`` made outside
any crew — accumulate; ``successful_requests`` tracks the call count."""
def script(flow: Flow) -> None:
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=300, completion_tokens=300)
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=200, completion_tokens=100)
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=20, completion_tokens=20)
flow = _run(script)
assert flow.usage_metrics.total_tokens == 940
assert flow.usage_metrics.prompt_tokens == 520
assert flow.usage_metrics.completion_tokens == 420
assert flow.usage_metrics.successful_requests == 3
def test_returns_zero_when_no_calls_happen(self) -> None:
flow = _run()
assert flow.usage_metrics == UsageMetrics()
def test_ignores_events_from_other_flows(self) -> None:
"""Concurrent flow runs share the singleton bus, so the listener must
scope itself to its own flow via the contextvar match."""
def script(flow: Flow) -> None:
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=50, completion_tokens=50)
_emit_llm_call(flow_id="some-other-flow", prompt_tokens=49_000, completion_tokens=50_999)
flow = _run(script)
assert flow.usage_metrics.total_tokens == 100
assert flow.usage_metrics.successful_requests == 1
def test_resets_between_kickoffs(self) -> None:
flow = _ScriptedFlow()
flow._script = lambda f: _emit_llm_call(
flow_id=f._flow_match_id, prompt_tokens=250, completion_tokens=250
)
flow.kickoff()
flow.kickoff()
assert flow.usage_metrics.total_tokens == 500
assert flow.usage_metrics.successful_requests == 1
def test_usage_metrics_returns_independent_copy(self) -> None:
"""``usage_metrics`` must return a copy, not the internal instance —
otherwise callers can clobber the in-flight accumulator."""
flow = _run(
lambda f: _emit_llm_call(
flow_id=f._flow_match_id, prompt_tokens=50, completion_tokens=50
)
)
snapshot = flow.usage_metrics
snapshot.total_tokens = 999_999
assert flow.usage_metrics.total_tokens == 100
def test_handler_is_unregistered_after_kickoff(self) -> None:
"""Long-lived workers (Celery, devkit) must not leak one handler per
kickoff on the singleton bus, on either the success or failure path."""
def handler_count() -> int:
return len(
crewai_event_bus._sync_handlers.get(LLMCallCompletedEvent, frozenset())
)
before = handler_count()
flow = _ScriptedFlow()
flow._script = lambda f: _emit_llm_call(
flow_id=f._flow_match_id, prompt_tokens=5, completion_tokens=5
)
for _ in range(3):
flow.kickoff()
assert handler_count() == before
def boom(_f: Flow) -> None:
raise RuntimeError("boom")
failing = _ScriptedFlow()
failing._script = boom
with pytest.raises(RuntimeError, match="boom"):
failing.kickoff()
assert handler_count() == before
def test_stale_handler_from_prior_kickoff_does_not_contaminate(self) -> None:
"""The bus dispatches sync handlers on a thread pool that ``emit``
does not wait on. A handler still queued from a prior kickoff
must not write into a later kickoff's accumulator — the epoch
snapshot in the handler closure bails out on mismatch."""
captured: dict[str, Any] = {}
def script(flow: Flow) -> None:
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=10, completion_tokens=10)
captured["handler"] = flow._usage_aggregation_handler
captured["match_id"] = flow._flow_match_id
flow = _run(script)
first_total = flow.usage_metrics.total_tokens
assert first_total == 20
# A second kickoff bumps the epoch and resets the accumulator.
flow._script = lambda f: None
flow.kickoff()
assert flow.usage_metrics.total_tokens == 0
stale_handler = captured["handler"]
assert stale_handler is not None
stale_event = LLMCallCompletedEvent(
call_id=str(uuid4()),
model="gpt-4o-mini",
response="ok",
call_type=LLMCallType.LLM_CALL,
usage={"prompt_tokens": 999, "completion_tokens": 999, "total_tokens": 1998},
)
ctx = contextvars.copy_context()
ctx.run(lambda: (current_flow_id.set(captured["match_id"]), stale_handler(object(), stale_event)))
# Stale handler bailed: second kickoff's accumulator is still zero.
assert flow.usage_metrics.total_tokens == 0
def test_pause_detaches_listener_and_does_not_leak(self) -> None:
"""When ``kickoff_async`` pauses for human feedback, the listener
must be detached from the singleton bus to avoid leaking handlers
across abandoned paused instances. Pre-pause LLM events still
count because the bus snapshots handlers at emit time. Late
events emitted after the pause returns do not count for this
instance — resume paths re-attach a fresh listener."""
from crewai.flow.async_feedback.types import HumanFeedbackPending
captured: dict[str, Any] = {}
class _PausingFlow(Flow):
@start()
def begin(self) -> None:
_emit_llm_call(
flow_id=self._flow_match_id,
prompt_tokens=10,
completion_tokens=20,
)
captured["pre_pause_total"] = self.usage_metrics.total_tokens
raise HumanFeedbackPending(
context=PendingFeedbackContext(
flow_id=self.flow_id,
flow_class="_PausingFlow",
method_name="begin",
method_output="content",
message="Review:",
)
)
with tempfile.TemporaryDirectory() as tmpdir:
persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db"))
flow = _PausingFlow(persistence=persistence)
result = flow.kickoff()
assert isinstance(result, HumanFeedbackPending)
assert captured["pre_pause_total"] == 30
assert flow._usage_aggregation_handler is None
# A late event emitted after the pause does not reach the
# detached listener, so the running total is unchanged.
_emit_llm_call(
flow_id=flow._flow_match_id,
prompt_tokens=2,
completion_tokens=3,
)
assert flow.usage_metrics.total_tokens == 30
def test_aggregates_resume_after_from_pending(self) -> None:
"""A flow restored via ``from_pending`` is a fresh instance with no
``_flow_match_id``; without seeding it, the listener attached in
``resume_async`` either ignores its own LLM calls or absorbs unrelated
ones. ``from_pending`` must seed the match id so the resume-phase
aggregator counts our own calls and only our own calls."""
class _ResumeFlow(Flow):
@start()
def begin(self) -> str:
return "content"
@listen(begin)
def on_begin(self, _feedback: Any) -> str:
_emit_llm_call(
flow_id=self._flow_match_id,
prompt_tokens=100,
completion_tokens=50,
)
_emit_llm_call(
flow_id="some-other-flow",
prompt_tokens=9_999,
completion_tokens=9_999,
)
return "done"
with tempfile.TemporaryDirectory() as tmpdir:
persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db"))
flow_id = "usage-resume-test"
persistence.save_pending_feedback(
flow_uuid=flow_id,
context=PendingFeedbackContext(
flow_id=flow_id,
flow_class="_ResumeFlow",
method_name="begin",
method_output="content",
message="Review:",
),
state_data={"id": flow_id},
)
flow = _ResumeFlow.from_pending(flow_id, persistence)
assert flow._flow_match_id == flow.flow_id
flow.resume("ok")
assert flow.usage_metrics.total_tokens == 150
assert flow.usage_metrics.prompt_tokens == 100
assert flow.usage_metrics.completion_tokens == 50
assert flow.usage_metrics.successful_requests == 1

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.14.7rc2"
__version__ = "1.14.7"