From 887adafd2c569606816f3a8d356128ff6af9ffef Mon Sep 17 00:00:00 2001 From: Lucas Gomide Date: Fri, 12 Jun 2026 13:55:22 -0300 Subject: [PATCH] fix: aggregate token usage across all LLM calls (#6122) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: aggregate LLM token usage at the flow level Introduces `flow.usage_metrics`, a snapshot of every LLMCallCompletedEvent emitted under the flow's `current_flow_id` for the duration of one kickoff (or resume) call. Aggregation happens on the singleton event bus so it covers crews, direct `LLM.call`s, and nested listener calls — solving the mismatch where the SDK reported only the last crew's usage while the Enterprise UI showed the correct full total. Co-authored-by: Cursor * refactor: centralize provider key normalization in UsageMetrics Add UsageMetrics.from_provider_dict to normalize raw LLM usage dicts across providers (LiteLLM, native Anthropic, native Gemini, OpenAI nested cached). BaseLLM._track_token_usage_internal and the flow-level aggregator now share this single source of truth, so `flow.usage_metrics` agrees with per-LLM totals on every provider — including the native Anthropic path that emits `input_tokens`/`output_tokens` instead of `prompt_tokens`/`completion_tokens`. * fix: flush event bus before reading aggregated usage_metrics `crewai_event_bus.emit` dispatches LLMCallCompletedEvent handlers on a ThreadPoolExecutor (fire-and-forget), so a flow whose last LLM call completes right before kickoff_async/resume_async returns can detach the usage listener while that handler is still queued, leaving its tokens off `flow.usage_metrics`. Match `Crew.kickoff()` and call `crewai_event_bus.flush()` in both finally blocks so every handler drains before the listener is detached. --------- Co-authored-by: Cursor --- docs/ar/concepts/flows.mdx | 42 ++ docs/en/concepts/flows.mdx | 43 ++ docs/ko/concepts/flows.mdx | 42 ++ docs/pt-BR/concepts/flows.mdx | 43 ++ .../src/crewai/flow/runtime/__init__.py | 134 ++++- lib/crewai/src/crewai/llms/base_llm.py | 44 +- lib/crewai/src/crewai/types/usage_metrics.py | 68 +++ lib/crewai/tests/test_flow_usage_metrics.py | 511 ++++++++++++++++++ 8 files changed, 889 insertions(+), 38 deletions(-) create mode 100644 lib/crewai/tests/test_flow_usage_metrics.py diff --git a/docs/ar/concepts/flows.mdx b/docs/ar/concepts/flows.mdx index 000c550de..ccab43d9d 100644 --- a/docs/ar/concepts/flows.mdx +++ b/docs/ar/concepts/flows.mdx @@ -226,6 +226,48 @@ counter=2 message='Hello from first_method - updated by second_method' من خلال ضمان إعادة مخرجات الدالة الأخيرة وتوفير الوصول إلى الحالة، تجعل تدفقات CrewAI من السهل دمج نتائج سير عمل الذكاء الاصطناعي في التطبيقات أو الأنظمة الأكبر، مع الحفاظ على الوصول إلى الحالة طوال تنفيذ التدفق. +## مقاييس استخدام التدفق + +بعد اكتمال تنفيذ التدفق، يمكنك الوصول إلى الخاصية `usage_metrics` لعرض إجمالي استخدام التوكنات عبر **كل استدعاء لنموذج اللغة** يتم خلال التشغيل — بما في ذلك الاستدعاءات من كل فريق (Crew) ينظمه التدفق، والاستدعاءات داخل أدوات الـ Agents، والاستدعاءات المباشرة لـ `LLM.call(...)` من دوال التدفق. هذا هو المكافئ على جانب الـ SDK للإجماليات المعروضة في واجهة CrewAI Enterprise. + +```python Code +from crewai import LLM +from crewai.flow.flow import Flow, listen, start + +class UsageMetricsFlow(Flow): + @start() + def run_first_crew(self): + self.state.first_result = FirstCrew().crew().kickoff() + + @listen(run_first_crew) + def call_llm_directly(self): + # استدعاء مباشر لنموذج اللغة — يُحسب أيضًا ضمن flow.usage_metrics + llm = LLM(model="openai/gpt-4o-mini") + self.state.summary = llm.call("لخّص النقاط الرئيسية.") + + @listen(call_llm_directly) + def run_second_crew(self): + self.state.second_result = SecondCrew().crew().kickoff() + +flow = UsageMetricsFlow() +flow.kickoff() + +print(flow.usage_metrics) +# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369, +# cached_prompt_tokens=0, reasoning_tokens=0, +# cache_creation_tokens=0, successful_requests=5) +``` + + + `flow.usage_metrics` **ليست** نفس `flow.kickoff().token_usage`. هذه الأخيرة + ترجع فقط `CrewOutput.token_usage` لـ **آخر** دالة `@listen` أعادت + `CrewOutput`، مما يعني أنها تعكس فقط الفريق الأخير وتتجاهل الفرق السابقة + وكذلك أي استدعاءات مباشرة لـ `LLM.call(...)`. استخدم `flow.usage_metrics` + كلما احتجت إلى الإجمالي **الكامل** للتوكنات لتنفيذ التدفق. + + +كل حقل في [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) المُعاد هو مجموع جميع استدعاءات نموذج اللغة التي حدثت خلال استدعاء واحد لـ `flow.kickoff()`. تتم إعادة تعيين العدادات عند الاستدعاء التالي لـ `kickoff()` (وفي كل تكرار من `kickoff_for_each`)، لذلك لن تتكرر العدّات عبر التشغيلات المتتالية. يمكن قراءة هذه الخاصية بأمان في أي وقت بعد اكتمال `kickoff()`؛ قراءتها أثناء التنفيذ تُرجع المجموع الجزئي المتراكم حتى تلك اللحظة. + ## إدارة حالة التدفق إدارة الحالة بفعالية أمر بالغ الأهمية لبناء سير عمل ذكاء اصطناعي موثوق وقابل للصيانة. توفر تدفقات CrewAI آليات قوية لإدارة الحالة غير المهيكلة والمهيكلة، diff --git a/docs/en/concepts/flows.mdx b/docs/en/concepts/flows.mdx index cd10afa40..0ac429d95 100644 --- a/docs/en/concepts/flows.mdx +++ b/docs/en/concepts/flows.mdx @@ -226,6 +226,49 @@ After the Flow has run, you can access the final state to see the updates made b By ensuring that the final method's output is returned and providing access to the state, CrewAI Flows make it easy to integrate the results of your AI workflows into larger applications or systems, while also maintaining and accessing the state throughout the Flow's execution. +## Flow Usage Metrics + +After a Flow execution completes, you can access the `usage_metrics` property to view aggregated token usage across **every LLM call** made during the run — including calls from every Crew the Flow orchestrated, calls inside Agent tools, and bare `LLM.call(...)` invocations from Flow methods. This is the SDK-side equivalent of the totals shown in the CrewAI Enterprise UI. + +```python Code +from crewai import LLM +from crewai.flow.flow import Flow, listen, start + +class UsageMetricsFlow(Flow): + @start() + def run_first_crew(self): + self.state.first_result = FirstCrew().crew().kickoff() + + @listen(run_first_crew) + def call_llm_directly(self): + # Bare LLM call — still counted by flow.usage_metrics + llm = LLM(model="openai/gpt-4o-mini") + self.state.summary = llm.call("Summarize the key takeaways.") + + @listen(call_llm_directly) + def run_second_crew(self): + self.state.second_result = SecondCrew().crew().kickoff() + +flow = UsageMetricsFlow() +flow.kickoff() + +print(flow.usage_metrics) +# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369, +# cached_prompt_tokens=0, reasoning_tokens=0, +# cache_creation_tokens=0, successful_requests=5) +``` + + + `flow.usage_metrics` is **not** the same as `flow.kickoff().token_usage`. The + latter returns the `CrewOutput.token_usage` of the **last** `@listen` method + that returned a `CrewOutput`, which means it only reflects the final Crew and + ignores prior Crews and bare `LLM.call(...)` invocations entirely. Use + `flow.usage_metrics` whenever you need the **full** token rollup for the Flow + execution. + + +Each entry in the returned [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) is the sum across all LLM calls made within a single `flow.kickoff()` invocation. Counters reset on the next `kickoff()` call (or on each iteration of `kickoff_for_each`), so successive runs don't double-count. The property is safe to read at any point after `kickoff()` completes; reading it during execution returns the partial total accumulated so far. + ## Flow State Management Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management, diff --git a/docs/ko/concepts/flows.mdx b/docs/ko/concepts/flows.mdx index 368efd701..56278dcc9 100644 --- a/docs/ko/concepts/flows.mdx +++ b/docs/ko/concepts/flows.mdx @@ -221,6 +221,48 @@ Flow가 실행된 후, 이러한 메소드들에 의해 수행된 업데이트 최종 메소드의 출력이 반환되고 상태에 접근할 수 있도록 함으로써, CrewAI Flow는 AI 워크플로우의 결과를 더 큰 애플리케이션이나 시스템에 쉽게 통합할 수 있게 하며, Flow 실행 과정 전반에 걸쳐 상태를 유지하고 접근하면서도 이를 용이하게 만듭니다. +## 플로우 사용 메트릭 + +Flow 실행이 완료된 후, `usage_metrics` 속성에 접근하여 실행 동안 발생한 **모든 LLM 호출**의 토큰 사용량 집계를 확인할 수 있습니다. 여기에는 Flow가 오케스트레이션한 모든 Crew의 호출, Agent의 도구 내부에서 발생한 호출, 그리고 Flow 메서드에서 직접 호출한 `LLM.call(...)`이 모두 포함됩니다. 이는 CrewAI Enterprise UI에 표시되는 총량과 동등한 SDK 측 값입니다. + +```python Code +from crewai import LLM +from crewai.flow.flow import Flow, listen, start + +class UsageMetricsFlow(Flow): + @start() + def run_first_crew(self): + self.state.first_result = FirstCrew().crew().kickoff() + + @listen(run_first_crew) + def call_llm_directly(self): + # 직접 LLM 호출 — flow.usage_metrics에서도 집계됩니다 + llm = LLM(model="openai/gpt-4o-mini") + self.state.summary = llm.call("핵심 내용을 요약해 주세요.") + + @listen(call_llm_directly) + def run_second_crew(self): + self.state.second_result = SecondCrew().crew().kickoff() + +flow = UsageMetricsFlow() +flow.kickoff() + +print(flow.usage_metrics) +# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369, +# cached_prompt_tokens=0, reasoning_tokens=0, +# cache_creation_tokens=0, successful_requests=5) +``` + + + `flow.usage_metrics`는 `flow.kickoff().token_usage`와 **동일하지 않습니다**. + 후자는 `CrewOutput`을 반환한 **마지막** `@listen` 메서드의 + `CrewOutput.token_usage`만 반환하므로, 이전에 실행된 Crew들과 Flow 메서드에서 + 직접 호출한 `LLM.call(...)`은 전혀 포함되지 않습니다. Flow 실행에 대한 + **전체** 토큰 집계가 필요할 때는 항상 `flow.usage_metrics`를 사용하십시오. + + +반환되는 [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py)의 각 항목은 단일 `flow.kickoff()` 실행 동안 발생한 모든 LLM 호출의 합계입니다. 다음 `kickoff()` 호출(및 `kickoff_for_each`의 각 반복)에서 카운터가 초기화되므로 연속 실행이 이중으로 집계되지 않습니다. 이 속성은 `kickoff()` 완료 후 언제든지 안전하게 읽을 수 있으며, 실행 중에 읽으면 그 시점까지 누적된 부분 합계를 반환합니다. + ## 플로우 상태 관리 상태를 효과적으로 관리하는 것은 신뢰할 수 있고 유지 보수가 용이한 AI 워크플로를 구축하는 데 매우 중요합니다. CrewAI 플로우는 비정형 및 정형 상태 관리를 위한 강력한 메커니즘을 제공하여, 개발자가 자신의 애플리케이션에 가장 적합한 접근 방식을 선택할 수 있도록 합니다. diff --git a/docs/pt-BR/concepts/flows.mdx b/docs/pt-BR/concepts/flows.mdx index 8c0deff15..a33a0af09 100644 --- a/docs/pt-BR/concepts/flows.mdx +++ b/docs/pt-BR/concepts/flows.mdx @@ -219,6 +219,49 @@ Após o término da execução, é possível acessar o estado final e observar a Ao garantir que a saída do método final seja retornada e oferecer acesso ao estado, o CrewAI Flows facilita a integração dos resultados dos seus workflows de IA em aplicações maiores, além de permitir o gerenciamento e o acesso ao estado durante toda a execução do Flow. +## Métricas de Uso do Flow + +Após a execução de um Flow, você pode acessar a propriedade `usage_metrics` para visualizar o consumo agregado de tokens em **todas as chamadas de LLM** realizadas durante a execução — incluindo chamadas das Crews orquestradas pelo Flow, chamadas dentro de tools de Agents, e invocações diretas de `LLM.call(...)` feitas a partir de métodos do Flow. Esse é o equivalente, do lado do SDK, ao total exibido na interface do CrewAI Enterprise. + +```python Code +from crewai import LLM +from crewai.flow.flow import Flow, listen, start + +class UsageMetricsFlow(Flow): + @start() + def run_first_crew(self): + self.state.first_result = FirstCrew().crew().kickoff() + + @listen(run_first_crew) + def call_llm_directly(self): + # Chamada direta de LLM — também contabilizada por flow.usage_metrics + llm = LLM(model="openai/gpt-4o-mini") + self.state.summary = llm.call("Resuma os principais pontos.") + + @listen(call_llm_directly) + def run_second_crew(self): + self.state.second_result = SecondCrew().crew().kickoff() + +flow = UsageMetricsFlow() +flow.kickoff() + +print(flow.usage_metrics) +# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369, +# cached_prompt_tokens=0, reasoning_tokens=0, +# cache_creation_tokens=0, successful_requests=5) +``` + + + `flow.usage_metrics` **não** é o mesmo que `flow.kickoff().token_usage`. Este + último retorna apenas o `CrewOutput.token_usage` do **último** método + `@listen` que retornou um `CrewOutput`, ou seja, reflete somente a Crew + final e ignora completamente as Crews anteriores e quaisquer chamadas + diretas de `LLM.call(...)`. Use `flow.usage_metrics` sempre que precisar do + rollup **completo** de tokens da execução do Flow. + + +Cada campo do [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) retornado representa a soma de todas as chamadas de LLM feitas em uma única invocação de `flow.kickoff()`. Os contadores são resetados a cada novo `kickoff()` (e em cada iteração de `kickoff_for_each`), de modo que execuções sucessivas não duplicam o total. A propriedade é segura para ser lida em qualquer momento após o `kickoff()`; lê-la durante a execução retorna o total parcial acumulado até aquele instante. + ## Gerenciamento de Estado em Flows Gerenciar o estado de forma eficaz é fundamental para construir fluxos de trabalho de IA confiáveis e de fácil manutenção. O CrewAI Flows oferece mecanismos robustos para o gerenciamento de estado tanto não estruturado quanto estruturado, diff --git a/lib/crewai/src/crewai/flow/runtime/__init__.py b/lib/crewai/src/crewai/flow/runtime/__init__.py index 33d399da5..c0a0109fc 100644 --- a/lib/crewai/src/crewai/flow/runtime/__init__.py +++ b/lib/crewai/src/crewai/flow/runtime/__init__.py @@ -85,6 +85,7 @@ from crewai.events.types.flow_events import ( MethodExecutionPausedEvent, MethodExecutionStartedEvent, ) +from crewai.events.types.llm_events import LLMCallCompletedEvent from crewai.flow.dsl._utils import build_flow_definition from crewai.flow.flow_context import ( current_flow_defer_trace_finalization, @@ -132,6 +133,7 @@ if TYPE_CHECKING: from crewai.flow.visualization import build_flow_structure, render_interactive from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput +from crewai.types.usage_metrics import UsageMetrics from crewai.utilities.env import get_env_context from crewai.utilities.streaming import ( TaskInfo, @@ -255,6 +257,16 @@ def _is_multi_event_or( return operator == "or" and len(branches) > 1 +def _usage_dict_to_metrics(usage: dict[str, Any] | None) -> UsageMetrics | None: + """Normalize an LLM call's raw usage dict into ``UsageMetrics``. + + Thin wrapper around ``UsageMetrics.from_provider_dict`` so the flow + aggregator and ``BaseLLM._track_token_usage_internal`` agree on the + set of provider key aliases (LiteLLM, Anthropic, Gemini). + """ + return UsageMetrics.from_provider_dict(usage) + + def _resolve_persistence(value: Any) -> Any: if value is None or isinstance(value, FlowPersistence): return value @@ -960,6 +972,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): _input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list) _state: Any = PrivateAttr(default=None) _deferred_flow_started_event_id: str | None = PrivateAttr(default=None) + _aggregated_usage_metrics: UsageMetrics = PrivateAttr(default_factory=UsageMetrics) + _usage_metrics_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock) + _flow_match_id: str | None = PrivateAttr(default=None) + _usage_aggregation_handler: Callable[..., Any] | None = PrivateAttr(default=None) def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override] class _FlowGeneric(cls): # type: ignore[valid-type,misc] @@ -1059,6 +1075,71 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): methods[FlowMethodName(method_name)] = method return methods + def _attach_usage_aggregation_listener(self) -> None: + """Wire an ``LLMCallCompletedEvent`` listener for the duration of one + ``kickoff_async`` call. + """ + if self._usage_aggregation_handler is not None: + return + + # Capture the accumulator object in the closure so a stale handler + # still queued in the bus thread pool from a prior kickoff writes + # into its own (orphaned) UsageMetrics instead of the next kickoff's + # fresh one. + accumulator = self._aggregated_usage_metrics + match_id = self._flow_match_id + lock = self._usage_metrics_lock + + def _accumulate(source: Any, event: LLMCallCompletedEvent) -> None: + if current_flow_id.get() != match_id: + return + metrics = _usage_dict_to_metrics(event.usage) + if metrics is None: + return + with lock: + accumulator.add_usage_metrics(metrics) + + crewai_event_bus.on(LLMCallCompletedEvent)(_accumulate) + self._usage_aggregation_handler = _accumulate + + def _detach_usage_aggregation_listener(self) -> None: + handler = self._usage_aggregation_handler + if handler is None: + return + crewai_event_bus.off(LLMCallCompletedEvent, handler) + self._usage_aggregation_handler = None + + @property + def usage_metrics(self) -> UsageMetrics: + """Aggregated LLM token usage for the most recent kickoff (or + resume) of this flow instance. + + Aggregation is correlated by the ``current_flow_id`` contextvar + captured at kickoff time. Nested kickoffs (a parent flow calling + a child flow's ``kickoff``) intentionally roll the child's + tokens up into the parent because the contextvar is inherited. + Sibling kickoffs that run in parallel under the same parent + contextvar share the same correlation id and may therefore + over-count each other; if you need strict per-flow isolation + in that pattern, run the children in separate tasks that + explicitly set their own ``current_flow_id`` before kickoff. + + LLM calls that complete without exposing token usage (e.g. + structured-output / Instructor paths) are not counted in + ``successful_requests`` either, since we never see the call's + token data — the metric stays a faithful summary of usage we + actually observed rather than a partial count. + + Cross-process pause/resume (``Flow.from_pending`` in a new + process) starts aggregation from zero on the restored instance + because pre-pause totals are not yet persisted alongside the + pending feedback context. Same-process pause/resume — where the + caller keeps the flow instance and calls ``resume`` on it — + preserves the running totals end-to-end. + """ + with self._usage_metrics_lock: + return self._aggregated_usage_metrics.model_copy() + def recall(self, query: str, **kwargs: Any) -> Any: """Recall relevant memories. Delegates to this flow's memory. @@ -1351,6 +1432,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): instance._initialize_state(state_data) instance._pending_feedback_context = pending_context instance._is_execution_resuming = True + # Seed the match id so the resume-phase listener filters its own + # LLM events (which run with `current_flow_id == instance.flow_id`) + # instead of dropping or absorbing unrelated ones. + instance._flow_match_id = instance.flow_id return instance @@ -1440,15 +1525,34 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): Raises: ValueError: If no pending feedback context exists """ - from datetime import datetime - - from crewai.flow.human_feedback import HumanFeedbackResult - if self._pending_feedback_context is None: raise ValueError( "No pending feedback context. Use from_pending() to restore a paused flow." ) + # Force `current_flow_id` to this flow's match id for the + # duration of the resume so the usage listener's filter passes + # even when resume runs under another flow's active context. + flow_id_token = None + if self._flow_match_id is not None: + flow_id_token = current_flow_id.set(self._flow_match_id) + self._attach_usage_aggregation_listener() + try: + return await self._resume_async_body(feedback) + finally: + # Match kickoff_async: drain pending handlers so the resumed + # phase's LLM events all hit `_aggregated_usage_metrics` + # before the listener is detached. + crewai_event_bus.flush() + self._detach_usage_aggregation_listener() + if flow_id_token is not None: + current_flow_id.reset(flow_id_token) + + async def _resume_async_body(self, feedback: str = "") -> Any: + from datetime import datetime + + from crewai.flow.human_feedback import HumanFeedbackResult + if get_current_parent_id() is None: reset_emission_counter() reset_last_event_id() @@ -1471,6 +1575,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): get_env_context() context = self._pending_feedback_context + if context is None: + raise ValueError( + "No pending feedback context. Use from_pending() to restore a paused flow." + ) emit = context.emit default_outcome = context.default_outcome @@ -2174,6 +2282,16 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): request_id_token = current_flow_request_id.set(self.flow_id) runtime_scope = crewai_event_bus._enter_runtime_scope() + + # Reentrant kickoffs on the same Flow share the outer call's + # listener and accumulator; only the outermost call wires usage + # aggregation. + owns_usage_aggregation = self._usage_aggregation_handler is None + if owns_usage_aggregation: + self._flow_match_id = current_flow_id.get() + self._aggregated_usage_metrics = UsageMetrics() + self._attach_usage_aggregation_listener() + try: # Reset flow state for fresh execution unless restoring from persistence is_restoring = ( @@ -2463,6 +2581,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): # Ensure all background memory saves complete before returning if self.memory is not None and hasattr(self.memory, "drain_writes"): self.memory.drain_writes() + # Drain pending LLMCallCompletedEvent handlers before + # detaching so `flow.usage_metrics` reflects every call + # emitted during this kickoff — mirrors `Crew.kickoff()`, + # which flushes before reporting `token_usage`. Resume paths + # re-attach a fresh listener via `resume_async`. + if owns_usage_aggregation: + crewai_event_bus.flush() + self._detach_usage_aggregation_listener() if request_id_token is not None: current_flow_request_id.reset(request_id_token) if flow_defer_trace_finalization_token is not None: diff --git a/lib/crewai/src/crewai/llms/base_llm.py b/lib/crewai/src/crewai/llms/base_llm.py index 03f277855..94d5eb6b9 100644 --- a/lib/crewai/src/crewai/llms/base_llm.py +++ b/lib/crewai/src/crewai/llms/base_llm.py @@ -890,41 +890,17 @@ class BaseLLM(BaseModel, ABC): Args: usage_data: Token usage data from the API response """ - prompt_tokens = ( - usage_data.get("prompt_tokens") - or usage_data.get("prompt_token_count") - or usage_data.get("input_tokens") - or 0 - ) + metrics = UsageMetrics.from_provider_dict(usage_data) + if metrics is None: + return - completion_tokens = ( - usage_data.get("completion_tokens") - or usage_data.get("candidates_token_count") - or usage_data.get("output_tokens") - or 0 - ) - - cached_tokens = ( - usage_data.get("cached_tokens") - or usage_data.get("cached_prompt_tokens") - or usage_data.get("cache_read_input_tokens") - or 0 - ) - if not cached_tokens: - prompt_details = usage_data.get("prompt_tokens_details") - if isinstance(prompt_details, dict): - cached_tokens = prompt_details.get("cached_tokens", 0) or 0 - - reasoning_tokens = usage_data.get("reasoning_tokens", 0) or 0 - cache_creation_tokens = usage_data.get("cache_creation_tokens", 0) or 0 - - self._token_usage["prompt_tokens"] += prompt_tokens - self._token_usage["completion_tokens"] += completion_tokens - self._token_usage["total_tokens"] += prompt_tokens + completion_tokens - self._token_usage["successful_requests"] += 1 - self._token_usage["cached_prompt_tokens"] += cached_tokens - self._token_usage["reasoning_tokens"] += reasoning_tokens - self._token_usage["cache_creation_tokens"] += cache_creation_tokens + self._token_usage["prompt_tokens"] += metrics.prompt_tokens + self._token_usage["completion_tokens"] += metrics.completion_tokens + self._token_usage["total_tokens"] += metrics.total_tokens + self._token_usage["successful_requests"] += metrics.successful_requests + self._token_usage["cached_prompt_tokens"] += metrics.cached_prompt_tokens + self._token_usage["reasoning_tokens"] += metrics.reasoning_tokens + self._token_usage["cache_creation_tokens"] += metrics.cache_creation_tokens def get_token_usage_summary(self) -> UsageMetrics: """Get summary of token usage for this LLM instance. diff --git a/lib/crewai/src/crewai/types/usage_metrics.py b/lib/crewai/src/crewai/types/usage_metrics.py index 76fa7dca0..aa1597229 100644 --- a/lib/crewai/src/crewai/types/usage_metrics.py +++ b/lib/crewai/src/crewai/types/usage_metrics.py @@ -4,10 +4,31 @@ This module provides models for tracking token usage and request metrics during crew and agent execution. """ +from typing import Any + from pydantic import BaseModel, Field from typing_extensions import Self +def _coerce_int(value: Any) -> int: + if value is None: + return 0 + try: + return int(value) + except (TypeError, ValueError): + return 0 + + +def _first_int(usage_data: dict[str, Any], *keys: str) -> int: + """Return the first integer-coercible value from ``usage_data`` under any + of ``keys``. Falls back to ``0`` when nothing matches.""" + for key in keys: + coerced = _coerce_int(usage_data.get(key)) + if coerced: + return coerced + return 0 + + class UsageMetrics(BaseModel): """Track usage metrics for crew execution. @@ -54,3 +75,50 @@ class UsageMetrics(BaseModel): self.reasoning_tokens += usage_metrics.reasoning_tokens self.cache_creation_tokens += usage_metrics.cache_creation_tokens self.successful_requests += usage_metrics.successful_requests + + @classmethod + def from_provider_dict(cls, usage_data: dict[str, Any] | None) -> Self | None: + """Normalize a provider's raw usage dict into a ``UsageMetrics``. + + Accepts the full set of key aliases CrewAI providers emit: + ``prompt_tokens`` / ``prompt_token_count`` (Gemini) / ``input_tokens`` + (Anthropic), and the equivalent completion / cached-prompt aliases. + Mirrors ``BaseLLM._track_token_usage_internal`` so per-LLM totals, + flow-level aggregation, and OTel spans agree on every provider. + + Returns ``None`` for missing/empty input so callers can decide + whether to skip the event entirely or treat it as a zero-token + successful request. + """ + if not usage_data: + return None + + prompt_tokens = _first_int( + usage_data, "prompt_tokens", "prompt_token_count", "input_tokens" + ) + completion_tokens = _first_int( + usage_data, + "completion_tokens", + "candidates_token_count", + "output_tokens", + ) + cached_prompt_tokens = _first_int( + usage_data, + "cached_tokens", + "cached_prompt_tokens", + "cache_read_input_tokens", + ) + if not cached_prompt_tokens: + details = usage_data.get("prompt_tokens_details") + if isinstance(details, dict): + cached_prompt_tokens = _coerce_int(details.get("cached_tokens")) + + return cls( + total_tokens=prompt_tokens + completion_tokens, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + cached_prompt_tokens=cached_prompt_tokens, + reasoning_tokens=_coerce_int(usage_data.get("reasoning_tokens")), + cache_creation_tokens=_coerce_int(usage_data.get("cache_creation_tokens")), + successful_requests=1, + ) diff --git a/lib/crewai/tests/test_flow_usage_metrics.py b/lib/crewai/tests/test_flow_usage_metrics.py new file mode 100644 index 000000000..48d2274b4 --- /dev/null +++ b/lib/crewai/tests/test_flow_usage_metrics.py @@ -0,0 +1,511 @@ +"""Tests for flow-level token usage aggregation + +``flow.usage_metrics`` listens to ``LLMCallCompletedEvent`` for the duration +of ``kickoff_async`` so it covers every LLM call inside the flow — crew-led, +tool-led, AND bare ``LLM.call(...)`` from a flow method. We exercise the +aggregator end-to-end through the real event bus with fabricated events and +explicit contextvar control; no live LLM provider is required. +""" + +from __future__ import annotations + +import contextvars +import os +import tempfile +from typing import Any, Callable +from uuid import uuid4 + +import pytest + +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.llm_events import LLMCallCompletedEvent, LLMCallType +from crewai.flow.async_feedback.types import PendingFeedbackContext +from crewai.flow.flow import Flow, listen, start +from crewai.flow.flow_context import current_flow_id +from crewai.flow.persistence.sqlite import SQLiteFlowPersistence +from crewai.flow.runtime import _usage_dict_to_metrics +from crewai.types.usage_metrics import UsageMetrics + + +def _emit_llm_call( + *, + flow_id: str | None, + prompt_tokens: int = 0, + completion_tokens: int = 0, + cached_prompt_tokens: int = 0, + reasoning_tokens: int = 0, + cache_creation_tokens: int = 0, +) -> None: + """Emit one fake ``LLMCallCompletedEvent`` with ``current_flow_id`` pinned + to ``flow_id``. + + Runs in a freshly-copied context so the value the bus snapshots at emit + time is exactly ``flow_id`` — independent of the calling thread's outer + context. Mirrors how the real ``LLM.call`` emits events at runtime. + """ + usage: dict[str, Any] = { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": prompt_tokens + completion_tokens, + } + for key, value in ( + ("cached_prompt_tokens", cached_prompt_tokens), + ("reasoning_tokens", reasoning_tokens), + ("cache_creation_tokens", cache_creation_tokens), + ): + if value: + usage[key] = value + event = LLMCallCompletedEvent( + call_id=str(uuid4()), + model="gpt-4o-mini", + response="ok", + call_type=LLMCallType.LLM_CALL, + usage=usage, + ) + + ctx = contextvars.copy_context() + + def _emit() -> None: + current_flow_id.set(flow_id) + future = crewai_event_bus.emit(object(), event) + if future is not None: + future.result(timeout=5.0) + + ctx.run(_emit) + + +class _ScriptedFlow(Flow): + """A Flow whose ``@start`` delegates to a per-instance ``_script`` closure. + + Each test attaches a script with ``flow._script = lambda f: ...`` so we + don't redefine a Flow subclass for every scenario. + """ + + @start() + def run(self) -> None: + script: Callable[[Flow], None] = getattr(self, "_script", lambda _f: None) + script(self) + + +def _run(script: Callable[[Flow], None] = lambda _f: None) -> Flow: + """Build a ``_ScriptedFlow``, attach ``script``, kickoff. Returns the flow.""" + flow = _ScriptedFlow() + flow._script = script + flow.kickoff() + return flow + + +class TestUsageDictToMetrics: + """Unit tests for the dict-to-UsageMetrics normalizer.""" + + @pytest.mark.parametrize( + "usage, expected", + [ + (None, None), + ({}, None), + ( + {"prompt_tokens": 10, "completion_tokens": 20, "total_tokens": 30}, + UsageMetrics( + prompt_tokens=10, + completion_tokens=20, + total_tokens=30, + successful_requests=1, + ), + ), + # total_tokens missing → derived from prompt + completion + ( + {"prompt_tokens": 4, "completion_tokens": 6}, + UsageMetrics( + prompt_tokens=4, + completion_tokens=6, + total_tokens=10, + successful_requests=1, + ), + ), + # Extended provider-specific keys flow through normalization + ( + { + "prompt_tokens": 100, + "completion_tokens": 80, + "total_tokens": 180, + "cached_prompt_tokens": 40, + "reasoning_tokens": 25, + "cache_creation_tokens": 10, + }, + UsageMetrics( + prompt_tokens=100, + completion_tokens=80, + total_tokens=180, + cached_prompt_tokens=40, + reasoning_tokens=25, + cache_creation_tokens=10, + successful_requests=1, + ), + ), + # Garbage / non-int values coerce to 0 instead of crashing + ( + {"prompt_tokens": "n/a", "completion_tokens": None, "total_tokens": 7}, + UsageMetrics( + prompt_tokens=0, + completion_tokens=0, + total_tokens=0, + successful_requests=1, + ), + ), + # Native Anthropic provider emits input_tokens/output_tokens + ( + {"input_tokens": 12, "output_tokens": 8}, + UsageMetrics( + prompt_tokens=12, + completion_tokens=8, + total_tokens=20, + successful_requests=1, + ), + ), + # Native Gemini provider emits prompt_token_count/candidates_token_count + ( + { + "prompt_token_count": 30, + "candidates_token_count": 20, + "reasoning_tokens": 5, + }, + UsageMetrics( + prompt_tokens=30, + completion_tokens=20, + total_tokens=50, + reasoning_tokens=5, + successful_requests=1, + ), + ), + # OpenAI nests cached_tokens under prompt_tokens_details + ( + { + "prompt_tokens": 100, + "completion_tokens": 50, + "prompt_tokens_details": {"cached_tokens": 30}, + }, + UsageMetrics( + prompt_tokens=100, + completion_tokens=50, + total_tokens=150, + cached_prompt_tokens=30, + successful_requests=1, + ), + ), + ], + ids=[ + "none", + "empty", + "all_keys", + "no_total", + "extended_keys", + "garbage", + "anthropic_aliases", + "gemini_aliases", + "openai_nested_cached", + ], + ) + def test_normalization( + self, usage: dict[str, Any] | None, expected: UsageMetrics | None + ) -> None: + assert _usage_dict_to_metrics(usage) == expected + + +class TestFlowUsageAggregation: + """End-to-end tests driving the listener through the real event bus.""" + + def test_sums_every_llm_call_in_the_flow(self) -> None: + """Multiple LLM calls — including bare ``LLM.call(...)`` made outside + any crew — accumulate; ``successful_requests`` tracks the call count.""" + + def script(flow: Flow) -> None: + _emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=300, completion_tokens=300) + _emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=200, completion_tokens=100) + _emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=20, completion_tokens=20) + + flow = _run(script) + + assert flow.usage_metrics.total_tokens == 940 + assert flow.usage_metrics.prompt_tokens == 520 + assert flow.usage_metrics.completion_tokens == 420 + assert flow.usage_metrics.successful_requests == 3 + + def test_returns_zero_when_no_calls_happen(self) -> None: + flow = _run() + assert flow.usage_metrics == UsageMetrics() + + def test_ignores_events_from_other_flows(self) -> None: + """Concurrent flow runs share the singleton bus, so the listener must + scope itself to its own flow via the contextvar match.""" + + def script(flow: Flow) -> None: + _emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=50, completion_tokens=50) + _emit_llm_call(flow_id="some-other-flow", prompt_tokens=49_000, completion_tokens=50_999) + + flow = _run(script) + + assert flow.usage_metrics.total_tokens == 100 + assert flow.usage_metrics.successful_requests == 1 + + def test_resets_between_kickoffs(self) -> None: + flow = _ScriptedFlow() + flow._script = lambda f: _emit_llm_call( + flow_id=f._flow_match_id, prompt_tokens=250, completion_tokens=250 + ) + + flow.kickoff() + flow.kickoff() + + assert flow.usage_metrics.total_tokens == 500 + assert flow.usage_metrics.successful_requests == 1 + + def test_usage_metrics_returns_independent_copy(self) -> None: + """``usage_metrics`` must return a copy, not the internal instance — + otherwise callers can clobber the in-flight accumulator.""" + + flow = _run( + lambda f: _emit_llm_call( + flow_id=f._flow_match_id, prompt_tokens=50, completion_tokens=50 + ) + ) + + snapshot = flow.usage_metrics + snapshot.total_tokens = 999_999 + + assert flow.usage_metrics.total_tokens == 100 + + def test_handler_is_unregistered_after_kickoff(self) -> None: + """Long-lived workers (Celery, devkit) must not leak one handler per + kickoff on the singleton bus, on either the success or failure path.""" + + def handler_count() -> int: + return len( + crewai_event_bus._sync_handlers.get(LLMCallCompletedEvent, frozenset()) + ) + + before = handler_count() + + flow = _ScriptedFlow() + flow._script = lambda f: _emit_llm_call( + flow_id=f._flow_match_id, prompt_tokens=5, completion_tokens=5 + ) + for _ in range(3): + flow.kickoff() + + assert handler_count() == before + + def boom(_f: Flow) -> None: + raise RuntimeError("boom") + + failing = _ScriptedFlow() + failing._script = boom + + with pytest.raises(RuntimeError, match="boom"): + failing.kickoff() + + assert handler_count() == before + + def test_kickoff_flushes_event_bus_before_returning( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """`kickoff_async` must drain pending LLMCallCompletedEvent handlers + before detaching the listener — otherwise late handlers landing on + the threadpool would be lost on short flows. Mirrors the flush + ``Crew.kickoff()`` performs before reporting ``token_usage``.""" + + flush_calls: list[None] = [] + original_flush = crewai_event_bus.flush + + def tracked_flush(*args: Any, **kwargs: Any) -> bool: + flush_calls.append(None) + return original_flush(*args, **kwargs) + + monkeypatch.setattr(crewai_event_bus, "flush", tracked_flush) + + flow = _ScriptedFlow() + flow._script = lambda f: _emit_llm_call( + flow_id=f._flow_match_id, prompt_tokens=3, completion_tokens=4 + ) + flow.kickoff() + + assert flush_calls, "kickoff did not flush the event bus before returning" + assert flow.usage_metrics.total_tokens == 7 + + def test_stale_handler_from_prior_kickoff_does_not_contaminate(self) -> None: + """A handler still queued from a prior kickoff must not write into + a later kickoff's accumulator. The handler's closure captures its + own accumulator object, so any late writes land on an orphaned + instance and the live ``usage_metrics`` is unaffected.""" + + captured: dict[str, Any] = {} + + def script(flow: Flow) -> None: + _emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=10, completion_tokens=10) + captured["handler"] = flow._usage_aggregation_handler + captured["match_id"] = flow._flow_match_id + + flow = _run(script) + assert flow.usage_metrics.total_tokens == 20 + + flow._script = lambda f: None + flow.kickoff() + assert flow.usage_metrics.total_tokens == 0 + + stale_handler = captured["handler"] + assert stale_handler is not None + + stale_event = LLMCallCompletedEvent( + call_id=str(uuid4()), + model="gpt-4o-mini", + response="ok", + call_type=LLMCallType.LLM_CALL, + usage={"prompt_tokens": 999, "completion_tokens": 999, "total_tokens": 1998}, + ) + ctx = contextvars.copy_context() + ctx.run(lambda: (current_flow_id.set(captured["match_id"]), stale_handler(object(), stale_event))) + + assert flow.usage_metrics.total_tokens == 0 + + def test_pause_detaches_listener_and_does_not_leak(self) -> None: + """When ``kickoff_async`` pauses for human feedback, the listener + must be detached from the singleton bus to avoid leaking handlers + across abandoned paused instances. Pre-pause LLM events still + count because the bus snapshots handlers at emit time. Late + events emitted after the pause returns do not count for this + instance — resume paths re-attach a fresh listener.""" + + from crewai.flow.async_feedback.types import HumanFeedbackPending + + captured: dict[str, Any] = {} + + class _PausingFlow(Flow): + @start() + def begin(self) -> None: + _emit_llm_call( + flow_id=self._flow_match_id, + prompt_tokens=10, + completion_tokens=20, + ) + captured["pre_pause_total"] = self.usage_metrics.total_tokens + raise HumanFeedbackPending( + context=PendingFeedbackContext( + flow_id=self.flow_id, + flow_class="_PausingFlow", + method_name="begin", + method_output="content", + message="Review:", + ) + ) + + with tempfile.TemporaryDirectory() as tmpdir: + persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db")) + flow = _PausingFlow(persistence=persistence) + result = flow.kickoff() + + assert isinstance(result, HumanFeedbackPending) + assert captured["pre_pause_total"] == 30 + assert flow._usage_aggregation_handler is None + + # A late event emitted after the pause does not reach the + # detached listener, so the running total is unchanged. + _emit_llm_call( + flow_id=flow._flow_match_id, + prompt_tokens=2, + completion_tokens=3, + ) + assert flow.usage_metrics.total_tokens == 30 + + def test_aggregates_resume_after_from_pending(self) -> None: + """A flow restored via ``from_pending`` is a fresh instance with no + ``_flow_match_id``; without seeding it, the listener attached in + ``resume_async`` either ignores its own LLM calls or absorbs unrelated + ones. ``from_pending`` must seed the match id so the resume-phase + aggregator counts our own calls and only our own calls.""" + + class _ResumeFlow(Flow): + @start() + def begin(self) -> str: + return "content" + + @listen(begin) + def on_begin(self, _feedback: Any) -> str: + _emit_llm_call( + flow_id=self._flow_match_id, + prompt_tokens=100, + completion_tokens=50, + ) + _emit_llm_call( + flow_id="some-other-flow", + prompt_tokens=9_999, + completion_tokens=9_999, + ) + return "done" + + with tempfile.TemporaryDirectory() as tmpdir: + persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db")) + flow_id = "usage-resume-test" + persistence.save_pending_feedback( + flow_uuid=flow_id, + context=PendingFeedbackContext( + flow_id=flow_id, + flow_class="_ResumeFlow", + method_name="begin", + method_output="content", + message="Review:", + ), + state_data={"id": flow_id}, + ) + + flow = _ResumeFlow.from_pending(flow_id, persistence) + assert flow._flow_match_id == flow.flow_id + + flow.resume("ok") + + assert flow.usage_metrics.total_tokens == 150 + assert flow.usage_metrics.prompt_tokens == 100 + assert flow.usage_metrics.completion_tokens == 50 + assert flow.usage_metrics.successful_requests == 1 + + def test_resume_aggregates_under_foreign_flow_context(self) -> None: + """Resume must override an already-set ``current_flow_id`` so its + own LLM events match the listener's filter even when invoked from + inside another flow's active context.""" + + class _ResumeFlow(Flow): + @start() + def begin(self) -> str: + return "content" + + @listen(begin) + def on_begin(self, _feedback: Any) -> str: + _emit_llm_call( + flow_id=self._flow_match_id, + prompt_tokens=42, + completion_tokens=8, + ) + return "done" + + with tempfile.TemporaryDirectory() as tmpdir: + persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db")) + flow_id = "resume-foreign-context" + persistence.save_pending_feedback( + flow_uuid=flow_id, + context=PendingFeedbackContext( + flow_id=flow_id, + flow_class="_ResumeFlow", + method_name="begin", + method_output="content", + message="Review:", + ), + state_data={"id": flow_id}, + ) + + foreign_token = current_flow_id.set("some-parent-flow") + try: + flow = _ResumeFlow.from_pending(flow_id, persistence) + flow.resume("ok") + finally: + current_flow_id.reset(foreign_token) + + assert flow.usage_metrics.total_tokens == 50 + assert flow.usage_metrics.successful_requests == 1