From 8565713a1a85aa822c8eebe3ad03039fe15b2f4c Mon Sep 17 00:00:00 2001 From: Lucas Gomide Date: Thu, 11 Jun 2026 14:15:23 -0300 Subject: [PATCH] fix: seed _flow_match_id in from_pending for resume aggregation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A flow restored via Flow.from_pending() is a fresh instance whose _flow_match_id starts at None. When resume_async then attaches the LLMCallCompletedEvent listener, the handler filter (current_flow_id.get() != flow._flow_match_id) either absorbs unrelated events (when the contextvar is also None) or skips the flow's own LLM calls (when set to a different id). Seed instance._flow_match_id = instance.flow_id at the end of from_pending so the resume-phase aggregator has a real id to match against. The accumulator itself stays at zero on restore — any usage from before the pause was only observable on the original kickoff instance. Add an end-to-end test that pauses a flow, restores it via from_pending, emits one of its own LLM events and one belonging to a sibling flow during resume, and asserts only its own is counted. --- lib/crewai/src/crewai/flow/runtime.py | 8 +++ lib/crewai/tests/test_flow_usage_metrics.py | 57 ++++++++++++++++++++- 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index 559484a1d..3a8733662 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -1330,6 +1330,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): instance._initialize_state(state_data) instance._pending_feedback_context = pending_context instance._is_execution_resuming = True + # Seed the usage-aggregation match id so `resume_async` can wire its + # listener and restore `current_flow_id` correctly. Without this, + # a restored flow has a None match id and the handler would either + # ignore its own LLM calls or absorb unrelated ones from sibling + # flows. The accumulator itself starts at zero — any usage from + # before the pause was only observable on the original kickoff + # instance. + instance._flow_match_id = instance.flow_id return instance diff --git a/lib/crewai/tests/test_flow_usage_metrics.py b/lib/crewai/tests/test_flow_usage_metrics.py index 30b412398..069af6e46 100644 --- a/lib/crewai/tests/test_flow_usage_metrics.py +++ b/lib/crewai/tests/test_flow_usage_metrics.py @@ -10,6 +10,8 @@ explicit contextvar control; no live LLM provider is required. from __future__ import annotations import contextvars +import os +import tempfile from typing import Any, Callable from uuid import uuid4 @@ -17,8 +19,10 @@ import pytest from crewai.events.event_bus import crewai_event_bus from crewai.events.types.llm_events import LLMCallCompletedEvent, LLMCallType -from crewai.flow.flow import Flow, start +from crewai.flow.async_feedback.types import PendingFeedbackContext +from crewai.flow.flow import Flow, listen, start from crewai.flow.flow_context import current_flow_id +from crewai.flow.persistence.sqlite import SQLiteFlowPersistence from crewai.flow.runtime import _usage_dict_to_metrics from crewai.types.usage_metrics import UsageMetrics @@ -250,3 +254,54 @@ class TestFlowUsageAggregation: failing.kickoff() assert handler_count() == before + + def test_aggregates_resume_after_from_pending(self) -> None: + """A flow restored via ``from_pending`` is a fresh instance with no + ``_flow_match_id``; without seeding it, the listener attached in + ``resume_async`` either ignores its own LLM calls or absorbs unrelated + ones. ``from_pending`` must seed the match id so the resume-phase + aggregator counts our own calls and only our own calls.""" + + class _ResumeFlow(Flow): + @start() + def begin(self) -> str: + return "content" + + @listen(begin) + def on_begin(self, _feedback: Any) -> str: + _emit_llm_call( + flow_id=self._flow_match_id, + prompt_tokens=100, + completion_tokens=50, + ) + _emit_llm_call( + flow_id="some-other-flow", + prompt_tokens=9_999, + completion_tokens=9_999, + ) + return "done" + + with tempfile.TemporaryDirectory() as tmpdir: + persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db")) + flow_id = "usage-resume-test" + persistence.save_pending_feedback( + flow_uuid=flow_id, + context=PendingFeedbackContext( + flow_id=flow_id, + flow_class="_ResumeFlow", + method_name="begin", + method_output="content", + message="Review:", + ), + state_data={"id": flow_id}, + ) + + flow = _ResumeFlow.from_pending(flow_id, persistence) + assert flow._flow_match_id == flow.flow_id + + flow.resume("ok") + + assert flow.usage_metrics.total_tokens == 150 + assert flow.usage_metrics.prompt_tokens == 100 + assert flow.usage_metrics.completion_tokens == 50 + assert flow.usage_metrics.successful_requests == 1