diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index 559484a1d..3a8733662 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -1330,6 +1330,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): instance._initialize_state(state_data) instance._pending_feedback_context = pending_context instance._is_execution_resuming = True + # Seed the usage-aggregation match id so `resume_async` can wire its + # listener and restore `current_flow_id` correctly. Without this, + # a restored flow has a None match id and the handler would either + # ignore its own LLM calls or absorb unrelated ones from sibling + # flows. The accumulator itself starts at zero — any usage from + # before the pause was only observable on the original kickoff + # instance. + instance._flow_match_id = instance.flow_id return instance diff --git a/lib/crewai/tests/test_flow_usage_metrics.py b/lib/crewai/tests/test_flow_usage_metrics.py index 30b412398..069af6e46 100644 --- a/lib/crewai/tests/test_flow_usage_metrics.py +++ b/lib/crewai/tests/test_flow_usage_metrics.py @@ -10,6 +10,8 @@ explicit contextvar control; no live LLM provider is required. from __future__ import annotations import contextvars +import os +import tempfile from typing import Any, Callable from uuid import uuid4 @@ -17,8 +19,10 @@ import pytest from crewai.events.event_bus import crewai_event_bus from crewai.events.types.llm_events import LLMCallCompletedEvent, LLMCallType -from crewai.flow.flow import Flow, start +from crewai.flow.async_feedback.types import PendingFeedbackContext +from crewai.flow.flow import Flow, listen, start from crewai.flow.flow_context import current_flow_id +from crewai.flow.persistence.sqlite import SQLiteFlowPersistence from crewai.flow.runtime import _usage_dict_to_metrics from crewai.types.usage_metrics import UsageMetrics @@ -250,3 +254,54 @@ class TestFlowUsageAggregation: failing.kickoff() assert handler_count() == before + + def test_aggregates_resume_after_from_pending(self) -> None: + """A flow restored via ``from_pending`` is a fresh instance with no + ``_flow_match_id``; without seeding it, the listener attached in + ``resume_async`` either ignores its own LLM calls or absorbs unrelated + ones. ``from_pending`` must seed the match id so the resume-phase + aggregator counts our own calls and only our own calls.""" + + class _ResumeFlow(Flow): + @start() + def begin(self) -> str: + return "content" + + @listen(begin) + def on_begin(self, _feedback: Any) -> str: + _emit_llm_call( + flow_id=self._flow_match_id, + prompt_tokens=100, + completion_tokens=50, + ) + _emit_llm_call( + flow_id="some-other-flow", + prompt_tokens=9_999, + completion_tokens=9_999, + ) + return "done" + + with tempfile.TemporaryDirectory() as tmpdir: + persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db")) + flow_id = "usage-resume-test" + persistence.save_pending_feedback( + flow_uuid=flow_id, + context=PendingFeedbackContext( + flow_id=flow_id, + flow_class="_ResumeFlow", + method_name="begin", + method_output="content", + message="Review:", + ), + state_data={"id": flow_id}, + ) + + flow = _ResumeFlow.from_pending(flow_id, persistence) + assert flow._flow_match_id == flow.flow_id + + flow.resume("ok") + + assert flow.usage_metrics.total_tokens == 150 + assert flow.usage_metrics.prompt_tokens == 100 + assert flow.usage_metrics.completion_tokens == 50 + assert flow.usage_metrics.successful_requests == 1