From 3e48a402ee9d8f94f58c64118a3da5582e536657 Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Mon, 16 Sep 2024 11:08:30 -0400 Subject: [PATCH] Router working now --- src/crewai/flow/flow.py | 30 +++++++++++++ .../flow/structured_test_flow_router.py | 42 +++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 src/crewai/flow/structured_test_flow_router.py diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index f282046a0..44adf0532 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -77,6 +77,18 @@ def listen(condition): return decorator +def router(method): + def decorator(func): + print( + f"[router decorator] Decorating router: {func.__name__} for method: {method.__name__}" + ) + func.__is_router__ = True + func.__router_for__ = method.__name__ + return func + + return decorator + + def or_(*conditions): methods = [] for condition in conditions: @@ -111,6 +123,7 @@ class FlowMeta(type): start_methods = [] listeners = {} + routers = {} print(f"[FlowMeta] Processing class: {name}") for attr_name, attr_value in dct.items(): @@ -130,12 +143,19 @@ class FlowMeta(type): condition_type = getattr(attr_value, "__condition_type__", "OR") print(f"[FlowMeta] Conditions for listener {attr_name}:", methods) listeners[attr_name] = (condition_type, methods) + elif hasattr(attr_value, "__is_router__"): + print( + f"[FlowMeta] Found router: {attr_name} for method: {attr_value.__router_for__}" + ) + routers[attr_value.__router_for__] = attr_name setattr(cls, "_start_methods", start_methods) setattr(cls, "_listeners", listeners) + setattr(cls, "_routers", routers) print("[FlowMeta] ALL LISTENERS:", listeners) print("[FlowMeta] START METHODS:", start_methods) + print("[FlowMeta] ROUTERS:", routers) return cls @@ -143,6 +163,7 @@ class FlowMeta(type): class Flow(Generic[T], metaclass=FlowMeta): _start_methods: List[str] = [] _listeners: Dict[str, tuple[str, List[str]]] = {} + _routers: Dict[str, str] = {} initial_state: Union[Type[T], T, None] = None def __init__(self): @@ -200,6 +221,15 @@ class Flow(Generic[T], metaclass=FlowMeta): f"[Flow._execute_listeners] Executing listeners for trigger method: {trigger_method}" ) listener_tasks = [] + + if trigger_method in self._routers: + router_method = self._methods[self._routers[trigger_method]] + print(f"[Flow._execute_listeners] Executing router for {trigger_method}") + path = await self._execute_method(router_method) + print(f"[Flow._execute_listeners] Router returned path: {path}") + # Use the path as the new trigger method + trigger_method = path + for listener, (condition_type, methods) in self._listeners.items(): print( f"[Flow._execute_listeners] Checking listener: {listener}, condition: {condition_type}, methods: {methods}" diff --git a/src/crewai/flow/structured_test_flow_router.py b/src/crewai/flow/structured_test_flow_router.py new file mode 100644 index 000000000..91fff2d00 --- /dev/null +++ b/src/crewai/flow/structured_test_flow_router.py @@ -0,0 +1,42 @@ +import asyncio +import random + +from crewai.flow.flow import Flow, listen, router, start +from pydantic import BaseModel + + +class ExampleState(BaseModel): + success_flag: bool = False + + +class StructuredExampleFlow(Flow): + initial_state = ExampleState + + @start() + async def start_method(self): + print("Starting the structured flow") + random_boolean = random.choice([True, False]) + self.state.success_flag = random_boolean + + @router(start_method) + async def second_method(self): + if self.state.success_flag: + return "success" + else: + return "failed" + + @listen("success") + async def third_method(self): + print("Third method running") + + @listen("failed") + async def fourth_method(self): + print("Fourth method running") + + +async def main(): + flow = StructuredExampleFlow() + await flow.kickoff() + + +asyncio.run(main())