Compare commits

..

1 Commits

Author SHA1 Message Date
Vini Brasil
373dca3d04 Run flows from a definition without a Python subclass (#6104)
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Vulnerability Scan / pip-audit (push) Waiting to run
* Read flow dispatch from FlowDefinition

Store the definition in a `_definition` PrivateAttr at post-init and
convert the dispatch helpers (`_start_method_names`, `_listener_methods`,
`_start_condition`, `_listen_condition`, `_is_router`) from classmethods
to instance methods that read it. Event names now fall back to
`self._definition.name` instead of `self.__class__.__name__`.

Behavior is identical for decorator subclasses, but the engine no longer
assumes the definition comes from the class. This is the seam for
`Flow.from_definition`, where an instance runs a definition that was
loaded rather than built from a Python subclass.

* Add Flow.from_definition to run flows without a subclass

A FlowDefinition (e.g. loaded from YAML) was only usable for dispatch on
decorator-authored subclasses. Now each method definition records an
importable `module:qualname` handler ref, and `Flow.from_definition`
resolves and binds those handlers to build a runnable flow directly.

* Build flow state from FlowDefinition

Definition-driven flows previously always started with a bare dict
state.

* Replace handler string with structured FlowActionDefinition

`handler: str | None` was optional and opaque — missing handlers only
surfaced at kickoff time. `do: FlowActionDefinition` is required, so
Pydantic rejects invalid definitions at parse time.

The `call: "code"` discriminator prepares the schema for future
non-Python action types (e.g. MCP tool, crew) without touching
`FlowMethodDefinition`. Resolution logic is extracted to
`runtime/_action_resolvers.py` to keep the dispatch point isolated.

* Fix conversational start router missing required do field

FlowMethodDefinition.do became required when the handler string was
replaced with FlowActionDefinition, but _conversation_start_router still
built its fragment without it, breaking crewai import entirely.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* Add event scoping to flow test

* Change lib/crewai/tests/test_flow_from_definition.py

---------

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 14:18:49 -07:00
18 changed files with 1048 additions and 972 deletions

View File

@@ -226,48 +226,6 @@ counter=2 message='Hello from first_method - updated by second_method'
من خلال ضمان إعادة مخرجات الدالة الأخيرة وتوفير الوصول إلى الحالة، تجعل تدفقات CrewAI من السهل دمج نتائج سير عمل الذكاء الاصطناعي في التطبيقات أو الأنظمة الأكبر،
مع الحفاظ على الوصول إلى الحالة طوال تنفيذ التدفق.
## مقاييس استخدام التدفق
بعد اكتمال تنفيذ التدفق، يمكنك الوصول إلى الخاصية `usage_metrics` لعرض إجمالي استخدام التوكنات عبر **كل استدعاء لنموذج اللغة** يتم خلال التشغيل — بما في ذلك الاستدعاءات من كل فريق (Crew) ينظمه التدفق، والاستدعاءات داخل أدوات الـ Agents، والاستدعاءات المباشرة لـ `LLM.call(...)` من دوال التدفق. هذا هو المكافئ على جانب الـ SDK للإجماليات المعروضة في واجهة CrewAI Enterprise.
```python Code
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
class UsageMetricsFlow(Flow):
@start()
def run_first_crew(self):
self.state.first_result = FirstCrew().crew().kickoff()
@listen(run_first_crew)
def call_llm_directly(self):
# استدعاء مباشر لنموذج اللغة — يُحسب أيضًا ضمن flow.usage_metrics
llm = LLM(model="openai/gpt-4o-mini")
self.state.summary = llm.call("لخّص النقاط الرئيسية.")
@listen(call_llm_directly)
def run_second_crew(self):
self.state.second_result = SecondCrew().crew().kickoff()
flow = UsageMetricsFlow()
flow.kickoff()
print(flow.usage_metrics)
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
# cached_prompt_tokens=0, reasoning_tokens=0,
# cache_creation_tokens=0, successful_requests=5)
```
<Note>
`flow.usage_metrics` **ليست** نفس `flow.kickoff().token_usage`. هذه الأخيرة
ترجع فقط `CrewOutput.token_usage` لـ **آخر** دالة `@listen` أعادت
`CrewOutput`، مما يعني أنها تعكس فقط الفريق الأخير وتتجاهل الفرق السابقة
وكذلك أي استدعاءات مباشرة لـ `LLM.call(...)`. استخدم `flow.usage_metrics`
كلما احتجت إلى الإجمالي **الكامل** للتوكنات لتنفيذ التدفق.
</Note>
كل حقل في [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) المُعاد هو مجموع جميع استدعاءات نموذج اللغة التي حدثت خلال استدعاء واحد لـ `flow.kickoff()`. تتم إعادة تعيين العدادات عند الاستدعاء التالي لـ `kickoff()` (وفي كل تكرار من `kickoff_for_each`)، لذلك لن تتكرر العدّات عبر التشغيلات المتتالية. يمكن قراءة هذه الخاصية بأمان في أي وقت بعد اكتمال `kickoff()`؛ قراءتها أثناء التنفيذ تُرجع المجموع الجزئي المتراكم حتى تلك اللحظة.
## إدارة حالة التدفق
إدارة الحالة بفعالية أمر بالغ الأهمية لبناء سير عمل ذكاء اصطناعي موثوق وقابل للصيانة. توفر تدفقات CrewAI آليات قوية لإدارة الحالة غير المهيكلة والمهيكلة،

View File

@@ -226,49 +226,6 @@ After the Flow has run, you can access the final state to see the updates made b
By ensuring that the final method's output is returned and providing access to the state, CrewAI Flows make it easy to integrate the results of your AI workflows into larger applications or systems,
while also maintaining and accessing the state throughout the Flow's execution.
## Flow Usage Metrics
After a Flow execution completes, you can access the `usage_metrics` property to view aggregated token usage across **every LLM call** made during the run — including calls from every Crew the Flow orchestrated, calls inside Agent tools, and bare `LLM.call(...)` invocations from Flow methods. This is the SDK-side equivalent of the totals shown in the CrewAI Enterprise UI.
```python Code
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
class UsageMetricsFlow(Flow):
@start()
def run_first_crew(self):
self.state.first_result = FirstCrew().crew().kickoff()
@listen(run_first_crew)
def call_llm_directly(self):
# Bare LLM call — still counted by flow.usage_metrics
llm = LLM(model="openai/gpt-4o-mini")
self.state.summary = llm.call("Summarize the key takeaways.")
@listen(call_llm_directly)
def run_second_crew(self):
self.state.second_result = SecondCrew().crew().kickoff()
flow = UsageMetricsFlow()
flow.kickoff()
print(flow.usage_metrics)
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
# cached_prompt_tokens=0, reasoning_tokens=0,
# cache_creation_tokens=0, successful_requests=5)
```
<Note>
`flow.usage_metrics` is **not** the same as `flow.kickoff().token_usage`. The
latter returns the `CrewOutput.token_usage` of the **last** `@listen` method
that returned a `CrewOutput`, which means it only reflects the final Crew and
ignores prior Crews and bare `LLM.call(...)` invocations entirely. Use
`flow.usage_metrics` whenever you need the **full** token rollup for the Flow
execution.
</Note>
Each entry in the returned [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) is the sum across all LLM calls made within a single `flow.kickoff()` invocation. Counters reset on the next `kickoff()` call (or on each iteration of `kickoff_for_each`), so successive runs don't double-count. The property is safe to read at any point after `kickoff()` completes; reading it during execution returns the partial total accumulated so far.
## Flow State Management
Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management,

View File

@@ -221,48 +221,6 @@ Flow가 실행된 후, 이러한 메소드들에 의해 수행된 업데이트
최종 메소드의 출력이 반환되고 상태에 접근할 수 있도록 함으로써, CrewAI Flow는 AI 워크플로우의 결과를 더 큰 애플리케이션이나 시스템에 쉽게 통합할 수 있게 하며,
Flow 실행 과정 전반에 걸쳐 상태를 유지하고 접근하면서도 이를 용이하게 만듭니다.
## 플로우 사용 메트릭
Flow 실행이 완료된 후, `usage_metrics` 속성에 접근하여 실행 동안 발생한 **모든 LLM 호출**의 토큰 사용량 집계를 확인할 수 있습니다. 여기에는 Flow가 오케스트레이션한 모든 Crew의 호출, Agent의 도구 내부에서 발생한 호출, 그리고 Flow 메서드에서 직접 호출한 `LLM.call(...)`이 모두 포함됩니다. 이는 CrewAI Enterprise UI에 표시되는 총량과 동등한 SDK 측 값입니다.
```python Code
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
class UsageMetricsFlow(Flow):
@start()
def run_first_crew(self):
self.state.first_result = FirstCrew().crew().kickoff()
@listen(run_first_crew)
def call_llm_directly(self):
# 직접 LLM 호출 — flow.usage_metrics에서도 집계됩니다
llm = LLM(model="openai/gpt-4o-mini")
self.state.summary = llm.call("핵심 내용을 요약해 주세요.")
@listen(call_llm_directly)
def run_second_crew(self):
self.state.second_result = SecondCrew().crew().kickoff()
flow = UsageMetricsFlow()
flow.kickoff()
print(flow.usage_metrics)
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
# cached_prompt_tokens=0, reasoning_tokens=0,
# cache_creation_tokens=0, successful_requests=5)
```
<Note>
`flow.usage_metrics`는 `flow.kickoff().token_usage`와 **동일하지 않습니다**.
후자는 `CrewOutput`을 반환한 **마지막** `@listen` 메서드의
`CrewOutput.token_usage`만 반환하므로, 이전에 실행된 Crew들과 Flow 메서드에서
직접 호출한 `LLM.call(...)`은 전혀 포함되지 않습니다. Flow 실행에 대한
**전체** 토큰 집계가 필요할 때는 항상 `flow.usage_metrics`를 사용하십시오.
</Note>
반환되는 [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py)의 각 항목은 단일 `flow.kickoff()` 실행 동안 발생한 모든 LLM 호출의 합계입니다. 다음 `kickoff()` 호출(및 `kickoff_for_each`의 각 반복)에서 카운터가 초기화되므로 연속 실행이 이중으로 집계되지 않습니다. 이 속성은 `kickoff()` 완료 후 언제든지 안전하게 읽을 수 있으며, 실행 중에 읽으면 그 시점까지 누적된 부분 합계를 반환합니다.
## 플로우 상태 관리
상태를 효과적으로 관리하는 것은 신뢰할 수 있고 유지 보수가 용이한 AI 워크플로를 구축하는 데 매우 중요합니다. CrewAI 플로우는 비정형 및 정형 상태 관리를 위한 강력한 메커니즘을 제공하여, 개발자가 자신의 애플리케이션에 가장 적합한 접근 방식을 선택할 수 있도록 합니다.

View File

@@ -219,49 +219,6 @@ Após o término da execução, é possível acessar o estado final e observar a
Ao garantir que a saída do método final seja retornada e oferecer acesso ao estado, o CrewAI Flows facilita a integração dos resultados dos seus workflows de IA em aplicações maiores,
além de permitir o gerenciamento e o acesso ao estado durante toda a execução do Flow.
## Métricas de Uso do Flow
Após a execução de um Flow, você pode acessar a propriedade `usage_metrics` para visualizar o consumo agregado de tokens em **todas as chamadas de LLM** realizadas durante a execução — incluindo chamadas das Crews orquestradas pelo Flow, chamadas dentro de tools de Agents, e invocações diretas de `LLM.call(...)` feitas a partir de métodos do Flow. Esse é o equivalente, do lado do SDK, ao total exibido na interface do CrewAI Enterprise.
```python Code
from crewai import LLM
from crewai.flow.flow import Flow, listen, start
class UsageMetricsFlow(Flow):
@start()
def run_first_crew(self):
self.state.first_result = FirstCrew().crew().kickoff()
@listen(run_first_crew)
def call_llm_directly(self):
# Chamada direta de LLM — também contabilizada por flow.usage_metrics
llm = LLM(model="openai/gpt-4o-mini")
self.state.summary = llm.call("Resuma os principais pontos.")
@listen(call_llm_directly)
def run_second_crew(self):
self.state.second_result = SecondCrew().crew().kickoff()
flow = UsageMetricsFlow()
flow.kickoff()
print(flow.usage_metrics)
# UsageMetrics(total_tokens=8579, prompt_tokens=6210, completion_tokens=2369,
# cached_prompt_tokens=0, reasoning_tokens=0,
# cache_creation_tokens=0, successful_requests=5)
```
<Note>
`flow.usage_metrics` **não** é o mesmo que `flow.kickoff().token_usage`. Este
último retorna apenas o `CrewOutput.token_usage` do **último** método
`@listen` que retornou um `CrewOutput`, ou seja, reflete somente a Crew
final e ignora completamente as Crews anteriores e quaisquer chamadas
diretas de `LLM.call(...)`. Use `flow.usage_metrics` sempre que precisar do
rollup **completo** de tokens da execução do Flow.
</Note>
Cada campo do [`UsageMetrics`](https://github.com/crewAIInc/crewAI/blob/main/lib/crewai/src/crewai/types/usage_metrics.py) retornado representa a soma de todas as chamadas de LLM feitas em uma única invocação de `flow.kickoff()`. Os contadores são resetados a cada novo `kickoff()` (e em cada iteração de `kickoff_for_each`), de modo que execuções sucessivas não duplicam o total. A propriedade é segura para ser lida em qualquer momento após o `kickoff()`; lê-la durante a execução retorna o total parcial acumulado até aquele instante.
## Gerenciamento de Estado em Flows
Gerenciar o estado de forma eficaz é fundamental para construir fluxos de trabalho de IA confiáveis e de fácil manutenção. O CrewAI Flows oferece mecanismos robustos para o gerenciamento de estado tanto não estruturado quanto estruturado,

View File

@@ -47,7 +47,7 @@ from crewai.flow.conversation import (
receive_user_message as _receive_user_message,
)
from crewai.flow.dsl import listen, start
from crewai.flow.dsl._utils import _set_flow_method_definition
from crewai.flow.dsl._utils import _method_action, _set_flow_method_definition
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.utilities.types import LLMMessage
@@ -78,7 +78,7 @@ def _conversation_start_router(func: Callable[..., Any]) -> Any:
wrapper = start()(func)
_set_flow_method_definition(
cast(Any, wrapper),
FlowMethodDefinition(start=True, router=True),
FlowMethodDefinition(do=_method_action(func), start=True, router=True),
)
return wrapper

View File

@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -45,7 +46,11 @@ def listen(condition: FlowTrigger) -> FlowMethodDecorator:
wrapper = ListenMethod(func)
_set_flow_method_definition(
wrapper, FlowMethodDefinition(listen=_to_definition_condition(condition))
wrapper,
FlowMethodDefinition(
do=_method_action(func),
listen=_to_definition_condition(condition),
),
)
return wrapper

View File

@@ -19,6 +19,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -148,6 +149,7 @@ def router(
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
do=_method_action(func),
listen=_to_definition_condition(condition),
router=True,
emit=router_events or None,

View File

@@ -8,6 +8,7 @@ from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_method_action,
_set_flow_method_definition,
)
from crewai.flow.flow_definition import FlowMethodDefinition
@@ -53,13 +54,17 @@ def start(
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
wrapper = StartMethod(func)
if condition is not None:
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(start=_to_definition_condition(condition)),
)
else:
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
do=_method_action(func),
start=(
_to_definition_condition(condition)
if condition is not None
else True
),
),
)
return wrapper
return cast(FlowMethodDecorator, decorator)

View File

@@ -8,6 +8,7 @@ from pydantic import BaseModel
from typing_extensions import TypeIs
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowConfigDefinition,
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
@@ -84,6 +85,10 @@ def _stamp_inherited_conversational_metadata(
return method
def _method_action(method: Any) -> FlowActionDefinition:
return FlowActionDefinition(ref=f"{method.__module__}:{method.__qualname__}")
def _set_flow_method_definition(
wrapper: FlowMethod[P, R],
definition: FlowMethodDefinition,
@@ -373,9 +378,11 @@ def _build_method_definition(
) -> FlowMethodDefinition:
fragment = _get_flow_method_definition(method)
if fragment is None:
method_definition = FlowMethodDefinition()
method_definition = FlowMethodDefinition(do=_method_action(method))
else:
method_definition = fragment.model_copy(deep=True)
method_definition = fragment.model_copy(
deep=True, update={"do": _method_action(method)}
)
human_feedback = _build_human_feedback_definition(
method, diagnostics, f"{path}.human_feedback"

View File

@@ -27,6 +27,7 @@ logger = logging.getLogger(__name__)
FlowDefinitionCondition = str | dict[str, Any]
__all__ = [
"FlowActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
@@ -52,8 +53,9 @@ class FlowDefinitionDiagnostic(BaseModel):
class FlowStateDefinition(BaseModel):
"""Static description of a Flow state contract."""
type: TypingLiteral["dict", "pydantic", "unknown"] = "dict"
type: TypingLiteral["dict", "pydantic", "json_schema", "unknown"] = "dict"
ref: str | None = None
json_schema: dict[str, Any] | None = None
default: Any = None
@@ -90,9 +92,17 @@ class FlowHumanFeedbackDefinition(BaseModel):
learn_strict: bool = False
class FlowActionDefinition(BaseModel):
"""What a Flow method node executes, independent of when it fires."""
call: TypingLiteral["code"] = "code"
ref: str
class FlowMethodDefinition(BaseModel):
"""Static definition of one Flow method and its execution roles."""
do: FlowActionDefinition
start: bool | FlowDefinitionCondition | None = None
listen: FlowDefinitionCondition | None = None
router: bool = False

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
from collections.abc import Callable
import importlib
from operator import attrgetter
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.flow_definition import FlowActionDefinition
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
class InvalidActionRefError(ValueError):
def __init__(self, ref: str) -> None:
super().__init__(f"invalid callable {ref!r}; expected 'module:qualname'")
def _resolve_code_action(
flow: Flow[Any], action: FlowActionDefinition
) -> Callable[..., Any]:
ref = action.ref
module_name, _, qualname = ref.partition(":")
if "<" in ref or not module_name or not qualname:
raise InvalidActionRefError(ref)
try:
target = attrgetter(qualname)(importlib.import_module(module_name))
except (ImportError, AttributeError) as e:
raise InvalidActionRefError(ref) from e
if not callable(target):
raise InvalidActionRefError(ref)
handler = cast(Callable[..., Any], target)
if getattr(handler, "__self__", None) is None:
handler = handler.__get__(flow, type(flow))
return handler
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
"""Turn one `do:` action into the callable the flow runs for that node."""
if action.call == "code":
return _resolve_code_action(flow, action)
raise ValueError(f"unknown call type {action.call!r}")

View File

@@ -999,7 +999,11 @@ def _json_schema_to_pydantic_field(
if examples:
schema_extra["examples"] = examples
default = ... if is_required else None
default = (
json_schema["default"]
if "default" in json_schema
else (... if is_required else None)
)
if isinstance(type_, type) and issubclass(type_, (int, float)):
if "minimum" in json_schema:

View File

@@ -1157,6 +1157,26 @@ def test_flow_name():
assert flow.name == "MyFlow"
def test_flow_custom_name_overrides_class_name_in_events():
class InternalFlowClass(Flow):
name = "PublicName"
@start()
def begin(self):
return "done"
received = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowStartedEvent)
def handle(source, event):
received.append(event)
InternalFlowClass().kickoff()
assert received[0].flow_name == "PublicName"
def test_nested_and_or_conditions():
"""Test nested conditions like or_(and_(A, B), and_(C, D)).

View File

@@ -36,6 +36,7 @@ def test_flow_public_exports_are_explicit():
"start",
}
assert set(flow_definition.__all__) == {
"FlowActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
@@ -629,6 +630,7 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract():
"name": "LoadedDiagnosticsFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"},
"router": True,
"emit": ["continue"],
}
@@ -662,6 +664,7 @@ def test_router_start_false_without_listen_reports_missing_trigger():
"name": "LoadedFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
"router": True,
"start": False,
"emit": ["continue"],
@@ -740,8 +743,14 @@ def test_static_string_listener_is_allowed_by_contract():
"schema": "crewai.flow/v1",
"name": "TypoFlow",
"methods": {
"begin": {"start": True},
"handle": {"listen": "begni"},
"begin": {
"do": {"ref": "loaded_flows:TypoFlow.begin"},
"start": True,
},
"handle": {
"do": {"ref": "loaded_flows:TypoFlow.handle"},
"listen": "begni",
},
},
}
)
@@ -754,8 +763,15 @@ def test_start_false_not_classified_as_start_method():
"schema": "crewai.flow/v1",
"name": "ExplicitNonStartFlow",
"methods": {
"begin": {"start": True},
"handle": {"start": False, "listen": "begin"},
"begin": {
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.begin"},
"start": True,
},
"handle": {
"do": {"ref": "loaded_flows:ExplicitNonStartFlow.handle"},
"start": False,
"listen": "begin",
},
},
}
)
@@ -812,6 +828,7 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
"name": "LoadedFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedFlow.decision"},
"router": True,
"emit": ["continue"],
}

View File

@@ -0,0 +1,552 @@
from __future__ import annotations
import pytest
from pydantic import ValidationError
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow import FlowState
from crewai.flow.flow_definition import FlowDefinition
class ChainFlow(Flow):
@start()
def begin(self):
self.state["begin_ran"] = True
return "hello"
@listen(begin)
def shout(self, result):
return result.upper()
@listen(shout)
def confirm(self):
self.state["confirmed"] = True
return f"confirmed:{self.state['confirmed']}"
CHAIN_YAML = f"""
schema: crewai.flow/v1
name: ChainFlow
methods:
begin:
do:
call: code
ref: {__name__}:ChainFlow.begin
start: true
shout:
do:
ref: {__name__}:ChainFlow.shout
listen: begin
confirm:
do:
ref: {__name__}:ChainFlow.confirm
listen: shout
"""
class MergeFlow(Flow):
@start()
def begin(self):
return "go"
@listen(begin)
def left(self):
return "left"
@listen(begin)
def right(self):
return "right"
@listen(or_(left, right))
def either(self):
self.state["either_ran"] = True
return "either"
@listen(and_(left, right, either))
def join(self):
self.state["joined"] = True
return "joined"
MERGE_YAML = f"""
schema: crewai.flow/v1
name: MergeFlow
methods:
begin:
do:
ref: {__name__}:MergeFlow.begin
start: true
left:
do:
ref: {__name__}:MergeFlow.left
listen: begin
right:
do:
ref: {__name__}:MergeFlow.right
listen: begin
either:
do:
ref: {__name__}:MergeFlow.either
listen:
or: [left, right]
join:
do:
ref: {__name__}:MergeFlow.join
listen:
and: [left, right, either]
"""
class RouteFlow(Flow):
@start()
def begin(self):
return "go"
@router(begin)
def decide(self):
return "left" if self.state.get("direction") == "left" else "right"
@listen("left")
def take_left(self):
return "took-left"
@listen("right")
def take_right(self):
return "took-right"
ROUTE_YAML = f"""
schema: crewai.flow/v1
name: RouteFlow
methods:
begin:
do:
ref: {__name__}:RouteFlow.begin
start: true
decide:
do:
ref: {__name__}:RouteFlow.decide
listen: begin
router: true
take_left:
do:
ref: {__name__}:RouteFlow.take_left
listen: left
take_right:
do:
ref: {__name__}:RouteFlow.take_right
listen: right
"""
class LoopFlow(Flow):
@start("retry")
def step(self):
self.state["count"] = self.state.get("count", 0) + 1
return self.state["count"]
@router(step)
def decide(self):
if self.state["count"] < 3:
return "retry"
return "done"
@listen("done")
def finish(self):
return f"finished:{self.state['count']}"
LOOP_YAML = f"""
schema: crewai.flow/v1
name: LoopFlow
methods:
step:
do:
ref: {__name__}:LoopFlow.step
start: retry
decide:
do:
ref: {__name__}:LoopFlow.decide
listen: step
router: true
finish:
do:
ref: {__name__}:LoopFlow.finish
listen: done
"""
class CounterState(FlowState):
count: int = 0
label: str = "none"
class PydanticStateFlow(Flow[CounterState]):
@start()
def begin(self):
self.state.count += 1
return self.state.count
@listen(begin)
def finish(self):
self.state.label = f"count={self.state.count}"
return self.state.label
PYDANTIC_STATE_YAML = f"""
schema: crewai.flow/v1
name: PydanticStateFlow
state:
type: pydantic
ref: {__name__}:CounterState
methods:
begin:
do:
ref: {__name__}:PydanticStateFlow.begin
start: true
finish:
do:
ref: {__name__}:PydanticStateFlow.finish
listen: begin
"""
PYDANTIC_STATE_OVERLAY_YAML = f"""
schema: crewai.flow/v1
name: PydanticStateFlow
state:
type: pydantic
ref: {__name__}:CounterState
default:
count: 5
methods:
begin:
do:
ref: {__name__}:PydanticStateFlow.begin
start: true
finish:
do:
ref: {__name__}:PydanticStateFlow.finish
listen: begin
"""
JSON_SCHEMA_STATE_YAML = f"""
schema: crewai.flow/v1
name: JsonSchemaStateFlow
state:
type: json_schema
json_schema:
title: CounterState
type: object
properties:
count:
type: integer
default: 0
label:
type: string
default: none
methods:
begin:
do:
ref: {__name__}:PydanticStateFlow.begin
start: true
finish:
do:
ref: {__name__}:PydanticStateFlow.finish
listen: begin
"""
PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML = f"""
schema: crewai.flow/v1
name: SchemaFallbackFlow
state:
type: pydantic
ref: definitely_not_a_module_xyz:MissingState
json_schema:
title: CounterState
type: object
properties:
count:
type: integer
default: 0
label:
type: string
default: none
methods:
begin:
do:
ref: {__name__}:PydanticStateFlow.begin
start: true
finish:
do:
ref: {__name__}:PydanticStateFlow.finish
listen: begin
"""
UNRESOLVABLE_STATE_YAML = f"""
schema: crewai.flow/v1
name: UnresolvableStateFlow
state:
type: pydantic
ref: definitely_not_a_module_xyz:MissingState
methods:
begin:
do:
ref: {__name__}:ChainFlow.begin
start: true
"""
DICT_STATE_YAML = f"""
schema: crewai.flow/v1
name: DictStateFlow
state:
type: dict
default:
count: 5
methods:
begin:
do:
ref: {__name__}:ChainFlow.begin
start: true
"""
UNKNOWN_STATE_YAML = f"""
schema: crewai.flow/v1
name: UnknownStateFlow
state:
type: unknown
ref: somewhere:Something
methods:
begin:
do:
ref: {__name__}:ChainFlow.begin
start: true
"""
def _run_with_events(flow, inputs=None):
events = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_finished(source, event):
events.append(event)
result = flow.kickoff(inputs=inputs)
events.sort(key=lambda e: e.timestamp)
return result, [
(type(e).__name__, str(e.method_name), e.flow_name) for e in events
]
def _state_without_id(flow):
snapshot = dict(flow.state.model_dump())
snapshot.pop("id", None)
return snapshot
def assert_parity(flow_cls, yaml_str, inputs=None, ordered=True):
class_flow = flow_cls()
class_result, class_events = _run_with_events(class_flow, inputs)
definition = FlowDefinition.from_yaml(yaml_str)
definition_flow = Flow.from_definition(definition)
definition_result, definition_events = _run_with_events(definition_flow, inputs)
assert definition_result == class_result
assert _state_without_id(definition_flow) == _state_without_id(class_flow)
if ordered:
assert definition_flow.method_outputs == class_flow.method_outputs
assert definition_events == class_events
else:
assert sorted(map(repr, definition_flow.method_outputs)) == sorted(
map(repr, class_flow.method_outputs)
)
assert sorted(definition_events) == sorted(class_events)
return definition_flow, definition_result
def test_simple_chain_parity():
flow, result = assert_parity(ChainFlow, CHAIN_YAML)
assert result == "confirmed:True"
assert flow.method_outputs == ["hello", "HELLO", "confirmed:True"]
def test_and_or_merge_parity():
flow, _ = assert_parity(MergeFlow, MERGE_YAML, ordered=False)
assert flow.state["joined"] is True
assert flow.state["either_ran"] is True
def test_router_label_parity_for_each_branch():
left_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "left"})
assert "took-left" in left_flow.method_outputs
assert "took-right" not in left_flow.method_outputs
right_flow, _ = assert_parity(RouteFlow, ROUTE_YAML, inputs={"direction": "right"})
assert "took-right" in right_flow.method_outputs
def test_cyclic_flow_parity():
flow, result = assert_parity(LoopFlow, LOOP_YAML)
assert result == "finished:3"
assert flow.state["count"] == 3
def test_definition_flow_events_use_definition_name():
definition = FlowDefinition.from_yaml(CHAIN_YAML)
flow = Flow.from_definition(definition)
_, events = _run_with_events(flow)
assert events
assert all(flow_name == "ChainFlow" for _, _, flow_name in events)
def test_definition_method_without_action_is_invalid():
with pytest.raises(ValidationError, match="do"):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "NoActions",
"methods": {"begin": {"start": True}},
}
)
def test_from_definition_unresolvable_ref_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "BadRefs",
"methods": {
"begin": {
"start": True,
"do": {"ref": "definitely_not_a_module_xyz:nope"},
}
},
}
)
with pytest.raises(ValueError, match="unresolvable actions.*begin"):
Flow.from_definition(definition)
def test_from_definition_malformed_ref_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "MalformedRefs",
"methods": {"begin": {"start": True, "do": {"ref": "no-colon-here"}}},
}
)
with pytest.raises(ValueError, match="expected 'module:qualname'"):
Flow.from_definition(definition)
def test_from_definition_local_scope_ref_raises():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LocalRefs",
"methods": {
"begin": {
"start": True,
"do": {"ref": f"{__name__}:make.<locals>.LocalFlow.begin"},
}
},
}
)
with pytest.raises(ValueError, match="expected 'module:qualname'"):
Flow.from_definition(definition)
def test_flow_definition_stamps_refs():
definition = ChainFlow.flow_definition()
assert definition.methods["begin"].do.ref == f"{__name__}:ChainFlow.begin"
assert definition.methods["shout"].do.ref == f"{__name__}:ChainFlow.shout"
def test_pydantic_state_from_ref_parity():
flow, result = assert_parity(PydanticStateFlow, PYDANTIC_STATE_YAML)
assert result == "count=1"
assert flow.state.count == 1
assert flow.state.label == "count=1"
assert flow.state.id
def test_pydantic_state_default_overlay():
flow = Flow.from_definition(FlowDefinition.from_yaml(PYDANTIC_STATE_OVERLAY_YAML))
result = flow.kickoff()
assert result == "count=6"
assert flow.state.count == 6
def test_json_schema_state():
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
result = flow.kickoff()
assert result == "count=1"
assert flow.state.count == 1
assert flow.state.label == "count=1"
assert flow.state.id
def test_json_schema_state_validates_inputs():
flow = Flow.from_definition(FlowDefinition.from_yaml(JSON_SCHEMA_STATE_YAML))
with pytest.raises(ValueError, match="Invalid inputs"):
flow.kickoff(inputs={"count": "not-a-number"})
def test_pydantic_state_falls_back_to_json_schema_when_ref_unimportable():
flow = Flow.from_definition(
FlowDefinition.from_yaml(PYDANTIC_REF_WITH_SCHEMA_FALLBACK_YAML)
)
result = flow.kickoff()
assert result == "count=1"
assert flow.state.count == 1
def test_pydantic_state_without_ref_or_schema_falls_back_to_dict(caplog):
with caplog.at_level("ERROR"):
flow = Flow.from_definition(FlowDefinition.from_yaml(UNRESOLVABLE_STATE_YAML))
assert "falling back to dict state" in caplog.text
result = flow.kickoff()
assert result == "hello"
assert flow.state["begin_ran"] is True
assert flow.state["id"]
def test_dict_state_is_a_copy_of_default_plus_id():
definition = FlowDefinition.from_yaml(DICT_STATE_YAML)
flow = Flow.from_definition(definition)
assert flow.state["count"] == 5
assert flow.state["id"]
flow.kickoff()
assert flow.state["begin_ran"] is True
second = Flow.from_definition(definition)
assert second.state["count"] == 5
assert "begin_ran" not in second.state
assert second.state["id"] != flow.state["id"]
assert definition.state.default == {"count": 5}
def test_unknown_state_type_falls_back_to_dict(caplog):
with caplog.at_level("WARNING"):
flow = Flow.from_definition(FlowDefinition.from_yaml(UNKNOWN_STATE_YAML))
assert "falling back to dict state" in caplog.text
result = flow.kickoff()
assert result == "hello"
assert flow.state["begin_ran"] is True

View File

@@ -1,394 +0,0 @@
"""Tests for flow-level token usage aggregation
``flow.usage_metrics`` listens to ``LLMCallCompletedEvent`` for the duration
of ``kickoff_async`` so it covers every LLM call inside the flow — crew-led,
tool-led, AND bare ``LLM.call(...)`` from a flow method. We exercise the
aggregator end-to-end through the real event bus with fabricated events and
explicit contextvar control; no live LLM provider is required.
"""
from __future__ import annotations
import contextvars
import os
import tempfile
from typing import Any, Callable
from uuid import uuid4
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMCallCompletedEvent, LLMCallType
from crewai.flow.async_feedback.types import PendingFeedbackContext
from crewai.flow.flow import Flow, listen, start
from crewai.flow.flow_context import current_flow_id
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
from crewai.flow.runtime import _usage_dict_to_metrics
from crewai.types.usage_metrics import UsageMetrics
def _emit_llm_call(
*,
flow_id: str | None,
prompt_tokens: int = 0,
completion_tokens: int = 0,
cached_prompt_tokens: int = 0,
reasoning_tokens: int = 0,
cache_creation_tokens: int = 0,
) -> None:
"""Emit one fake ``LLMCallCompletedEvent`` with ``current_flow_id`` pinned
to ``flow_id``.
Runs in a freshly-copied context so the value the bus snapshots at emit
time is exactly ``flow_id`` — independent of the calling thread's outer
context. Mirrors how the real ``LLM.call`` emits events at runtime.
"""
usage: dict[str, Any] = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
}
for key, value in (
("cached_prompt_tokens", cached_prompt_tokens),
("reasoning_tokens", reasoning_tokens),
("cache_creation_tokens", cache_creation_tokens),
):
if value:
usage[key] = value
event = LLMCallCompletedEvent(
call_id=str(uuid4()),
model="gpt-4o-mini",
response="ok",
call_type=LLMCallType.LLM_CALL,
usage=usage,
)
ctx = contextvars.copy_context()
def _emit() -> None:
current_flow_id.set(flow_id)
future = crewai_event_bus.emit(object(), event)
if future is not None:
future.result(timeout=5.0)
ctx.run(_emit)
class _ScriptedFlow(Flow):
"""A Flow whose ``@start`` delegates to a per-instance ``_script`` closure.
Each test attaches a script with ``flow._script = lambda f: ...`` so we
don't redefine a Flow subclass for every scenario.
"""
@start()
def run(self) -> None:
script: Callable[[Flow], None] = getattr(self, "_script", lambda _f: None)
script(self)
def _run(script: Callable[[Flow], None] = lambda _f: None) -> Flow:
"""Build a ``_ScriptedFlow``, attach ``script``, kickoff. Returns the flow."""
flow = _ScriptedFlow()
flow._script = script
flow.kickoff()
return flow
class TestUsageDictToMetrics:
"""Unit tests for the dict-to-UsageMetrics normalizer."""
@pytest.mark.parametrize(
"usage, expected",
[
(None, None),
({}, None),
(
{"prompt_tokens": 10, "completion_tokens": 20, "total_tokens": 30},
UsageMetrics(
prompt_tokens=10,
completion_tokens=20,
total_tokens=30,
successful_requests=1,
),
),
# total_tokens missing → derived from prompt + completion
(
{"prompt_tokens": 4, "completion_tokens": 6},
UsageMetrics(
prompt_tokens=4,
completion_tokens=6,
total_tokens=10,
successful_requests=1,
),
),
# Extended provider-specific keys flow through normalization
(
{
"prompt_tokens": 100,
"completion_tokens": 80,
"total_tokens": 180,
"cached_prompt_tokens": 40,
"reasoning_tokens": 25,
"cache_creation_tokens": 10,
},
UsageMetrics(
prompt_tokens=100,
completion_tokens=80,
total_tokens=180,
cached_prompt_tokens=40,
reasoning_tokens=25,
cache_creation_tokens=10,
successful_requests=1,
),
),
# Garbage / non-int values coerce to 0 instead of crashing
(
{"prompt_tokens": "n/a", "completion_tokens": None, "total_tokens": 7},
UsageMetrics(
prompt_tokens=0,
completion_tokens=0,
total_tokens=7,
successful_requests=1,
),
),
],
ids=["none", "empty", "all_keys", "no_total", "extended_keys", "garbage"],
)
def test_normalization(
self, usage: dict[str, Any] | None, expected: UsageMetrics | None
) -> None:
assert _usage_dict_to_metrics(usage) == expected
class TestFlowUsageAggregation:
"""End-to-end tests driving the listener through the real event bus."""
def test_sums_every_llm_call_in_the_flow(self) -> None:
"""Multiple LLM calls — including bare ``LLM.call(...)`` made outside
any crew — accumulate; ``successful_requests`` tracks the call count."""
def script(flow: Flow) -> None:
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=300, completion_tokens=300)
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=200, completion_tokens=100)
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=20, completion_tokens=20)
flow = _run(script)
assert flow.usage_metrics.total_tokens == 940
assert flow.usage_metrics.prompt_tokens == 520
assert flow.usage_metrics.completion_tokens == 420
assert flow.usage_metrics.successful_requests == 3
def test_returns_zero_when_no_calls_happen(self) -> None:
flow = _run()
assert flow.usage_metrics == UsageMetrics()
def test_ignores_events_from_other_flows(self) -> None:
"""Concurrent flow runs share the singleton bus, so the listener must
scope itself to its own flow via the contextvar match."""
def script(flow: Flow) -> None:
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=50, completion_tokens=50)
_emit_llm_call(flow_id="some-other-flow", prompt_tokens=49_000, completion_tokens=50_999)
flow = _run(script)
assert flow.usage_metrics.total_tokens == 100
assert flow.usage_metrics.successful_requests == 1
def test_resets_between_kickoffs(self) -> None:
flow = _ScriptedFlow()
flow._script = lambda f: _emit_llm_call(
flow_id=f._flow_match_id, prompt_tokens=250, completion_tokens=250
)
flow.kickoff()
flow.kickoff()
assert flow.usage_metrics.total_tokens == 500
assert flow.usage_metrics.successful_requests == 1
def test_usage_metrics_returns_independent_copy(self) -> None:
"""``usage_metrics`` must return a copy, not the internal instance —
otherwise callers can clobber the in-flight accumulator."""
flow = _run(
lambda f: _emit_llm_call(
flow_id=f._flow_match_id, prompt_tokens=50, completion_tokens=50
)
)
snapshot = flow.usage_metrics
snapshot.total_tokens = 999_999
assert flow.usage_metrics.total_tokens == 100
def test_handler_is_unregistered_after_kickoff(self) -> None:
"""Long-lived workers (Celery, devkit) must not leak one handler per
kickoff on the singleton bus, on either the success or failure path."""
def handler_count() -> int:
return len(
crewai_event_bus._sync_handlers.get(LLMCallCompletedEvent, frozenset())
)
before = handler_count()
flow = _ScriptedFlow()
flow._script = lambda f: _emit_llm_call(
flow_id=f._flow_match_id, prompt_tokens=5, completion_tokens=5
)
for _ in range(3):
flow.kickoff()
assert handler_count() == before
def boom(_f: Flow) -> None:
raise RuntimeError("boom")
failing = _ScriptedFlow()
failing._script = boom
with pytest.raises(RuntimeError, match="boom"):
failing.kickoff()
assert handler_count() == before
def test_stale_handler_from_prior_kickoff_does_not_contaminate(self) -> None:
"""The bus dispatches sync handlers on a thread pool that ``emit``
does not wait on. A handler still queued from a prior kickoff
must not write into a later kickoff's accumulator — the epoch
snapshot in the handler closure bails out on mismatch."""
captured: dict[str, Any] = {}
def script(flow: Flow) -> None:
_emit_llm_call(flow_id=flow._flow_match_id, prompt_tokens=10, completion_tokens=10)
captured["handler"] = flow._usage_aggregation_handler
captured["match_id"] = flow._flow_match_id
flow = _run(script)
first_total = flow.usage_metrics.total_tokens
assert first_total == 20
# A second kickoff bumps the epoch and resets the accumulator.
flow._script = lambda f: None
flow.kickoff()
assert flow.usage_metrics.total_tokens == 0
stale_handler = captured["handler"]
assert stale_handler is not None
stale_event = LLMCallCompletedEvent(
call_id=str(uuid4()),
model="gpt-4o-mini",
response="ok",
call_type=LLMCallType.LLM_CALL,
usage={"prompt_tokens": 999, "completion_tokens": 999, "total_tokens": 1998},
)
ctx = contextvars.copy_context()
ctx.run(lambda: (current_flow_id.set(captured["match_id"]), stale_handler(object(), stale_event)))
# Stale handler bailed: second kickoff's accumulator is still zero.
assert flow.usage_metrics.total_tokens == 0
def test_pause_detaches_listener_and_does_not_leak(self) -> None:
"""When ``kickoff_async`` pauses for human feedback, the listener
must be detached from the singleton bus to avoid leaking handlers
across abandoned paused instances. Pre-pause LLM events still
count because the bus snapshots handlers at emit time. Late
events emitted after the pause returns do not count for this
instance — resume paths re-attach a fresh listener."""
from crewai.flow.async_feedback.types import HumanFeedbackPending
captured: dict[str, Any] = {}
class _PausingFlow(Flow):
@start()
def begin(self) -> None:
_emit_llm_call(
flow_id=self._flow_match_id,
prompt_tokens=10,
completion_tokens=20,
)
captured["pre_pause_total"] = self.usage_metrics.total_tokens
raise HumanFeedbackPending(
context=PendingFeedbackContext(
flow_id=self.flow_id,
flow_class="_PausingFlow",
method_name="begin",
method_output="content",
message="Review:",
)
)
with tempfile.TemporaryDirectory() as tmpdir:
persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db"))
flow = _PausingFlow(persistence=persistence)
result = flow.kickoff()
assert isinstance(result, HumanFeedbackPending)
assert captured["pre_pause_total"] == 30
assert flow._usage_aggregation_handler is None
# A late event emitted after the pause does not reach the
# detached listener, so the running total is unchanged.
_emit_llm_call(
flow_id=flow._flow_match_id,
prompt_tokens=2,
completion_tokens=3,
)
assert flow.usage_metrics.total_tokens == 30
def test_aggregates_resume_after_from_pending(self) -> None:
"""A flow restored via ``from_pending`` is a fresh instance with no
``_flow_match_id``; without seeding it, the listener attached in
``resume_async`` either ignores its own LLM calls or absorbs unrelated
ones. ``from_pending`` must seed the match id so the resume-phase
aggregator counts our own calls and only our own calls."""
class _ResumeFlow(Flow):
@start()
def begin(self) -> str:
return "content"
@listen(begin)
def on_begin(self, _feedback: Any) -> str:
_emit_llm_call(
flow_id=self._flow_match_id,
prompt_tokens=100,
completion_tokens=50,
)
_emit_llm_call(
flow_id="some-other-flow",
prompt_tokens=9_999,
completion_tokens=9_999,
)
return "done"
with tempfile.TemporaryDirectory() as tmpdir:
persistence = SQLiteFlowPersistence(os.path.join(tmpdir, "f.db"))
flow_id = "usage-resume-test"
persistence.save_pending_feedback(
flow_uuid=flow_id,
context=PendingFeedbackContext(
flow_id=flow_id,
flow_class="_ResumeFlow",
method_name="begin",
method_output="content",
message="Review:",
),
state_data={"id": flow_id},
)
flow = _ResumeFlow.from_pending(flow_id, persistence)
assert flow._flow_match_id == flow.flow_id
flow.resume("ok")
assert flow.usage_metrics.total_tokens == 150
assert flow.usage_metrics.prompt_tokens == 100
assert flow.usage_metrics.completion_tokens == 50
assert flow.usage_metrics.successful_requests == 1

View File

@@ -77,12 +77,22 @@ class ComplexFlow(Flow):
return "complete"
def _attach_flow_definition(flow_class: type[Flow], methods: dict[str, object]) -> None:
def _attach_flow_definition(
flow_class: type[Flow], methods: dict[str, dict[str, object]]
) -> None:
flow_class._flow_definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": flow_class.__name__,
"methods": methods,
"methods": {
name: {
"do": {
"ref": f"{flow_class.__module__}:{flow_class.__name__}.{name}"
},
**spec,
}
for name, spec in methods.items()
},
}
)
@@ -125,13 +135,20 @@ def test_build_flow_structure_from_flow_definition():
"schema": "crewai.flow/v1",
"name": "DefinedFlow",
"methods": {
"begin": {"start": True},
"begin": {
"do": {"ref": "defined_flows:DefinedFlow.begin"},
"start": True,
},
"decide": {
"do": {"ref": "defined_flows:DefinedFlow.decide"},
"listen": "begin",
"router": True,
"emit": ["done"],
},
"finish": {"listen": "done"},
"finish": {
"do": {"ref": "defined_flows:DefinedFlow.finish"},
"listen": "done",
},
},
}
)