mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-15 02:58:30 +00:00
Router working now
This commit is contained in:
@@ -77,6 +77,18 @@ def listen(condition):
|
||||
return decorator
|
||||
|
||||
|
||||
def router(method):
|
||||
def decorator(func):
|
||||
print(
|
||||
f"[router decorator] Decorating router: {func.__name__} for method: {method.__name__}"
|
||||
)
|
||||
func.__is_router__ = True
|
||||
func.__router_for__ = method.__name__
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def or_(*conditions):
|
||||
methods = []
|
||||
for condition in conditions:
|
||||
@@ -111,6 +123,7 @@ class FlowMeta(type):
|
||||
|
||||
start_methods = []
|
||||
listeners = {}
|
||||
routers = {}
|
||||
|
||||
print(f"[FlowMeta] Processing class: {name}")
|
||||
for attr_name, attr_value in dct.items():
|
||||
@@ -130,12 +143,19 @@ class FlowMeta(type):
|
||||
condition_type = getattr(attr_value, "__condition_type__", "OR")
|
||||
print(f"[FlowMeta] Conditions for listener {attr_name}:", methods)
|
||||
listeners[attr_name] = (condition_type, methods)
|
||||
elif hasattr(attr_value, "__is_router__"):
|
||||
print(
|
||||
f"[FlowMeta] Found router: {attr_name} for method: {attr_value.__router_for__}"
|
||||
)
|
||||
routers[attr_value.__router_for__] = attr_name
|
||||
|
||||
setattr(cls, "_start_methods", start_methods)
|
||||
setattr(cls, "_listeners", listeners)
|
||||
setattr(cls, "_routers", routers)
|
||||
|
||||
print("[FlowMeta] ALL LISTENERS:", listeners)
|
||||
print("[FlowMeta] START METHODS:", start_methods)
|
||||
print("[FlowMeta] ROUTERS:", routers)
|
||||
|
||||
return cls
|
||||
|
||||
@@ -143,6 +163,7 @@ class FlowMeta(type):
|
||||
class Flow(Generic[T], metaclass=FlowMeta):
|
||||
_start_methods: List[str] = []
|
||||
_listeners: Dict[str, tuple[str, List[str]]] = {}
|
||||
_routers: Dict[str, str] = {}
|
||||
initial_state: Union[Type[T], T, None] = None
|
||||
|
||||
def __init__(self):
|
||||
@@ -200,6 +221,15 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
f"[Flow._execute_listeners] Executing listeners for trigger method: {trigger_method}"
|
||||
)
|
||||
listener_tasks = []
|
||||
|
||||
if trigger_method in self._routers:
|
||||
router_method = self._methods[self._routers[trigger_method]]
|
||||
print(f"[Flow._execute_listeners] Executing router for {trigger_method}")
|
||||
path = await self._execute_method(router_method)
|
||||
print(f"[Flow._execute_listeners] Router returned path: {path}")
|
||||
# Use the path as the new trigger method
|
||||
trigger_method = path
|
||||
|
||||
for listener, (condition_type, methods) in self._listeners.items():
|
||||
print(
|
||||
f"[Flow._execute_listeners] Checking listener: {listener}, condition: {condition_type}, methods: {methods}"
|
||||
|
||||
42
src/crewai/flow/structured_test_flow_router.py
Normal file
42
src/crewai/flow/structured_test_flow_router.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import asyncio
|
||||
import random
|
||||
|
||||
from crewai.flow.flow import Flow, listen, router, start
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ExampleState(BaseModel):
|
||||
success_flag: bool = False
|
||||
|
||||
|
||||
class StructuredExampleFlow(Flow):
|
||||
initial_state = ExampleState
|
||||
|
||||
@start()
|
||||
async def start_method(self):
|
||||
print("Starting the structured flow")
|
||||
random_boolean = random.choice([True, False])
|
||||
self.state.success_flag = random_boolean
|
||||
|
||||
@router(start_method)
|
||||
async def second_method(self):
|
||||
if self.state.success_flag:
|
||||
return "success"
|
||||
else:
|
||||
return "failed"
|
||||
|
||||
@listen("success")
|
||||
async def third_method(self):
|
||||
print("Third method running")
|
||||
|
||||
@listen("failed")
|
||||
async def fourth_method(self):
|
||||
print("Fourth method running")
|
||||
|
||||
|
||||
async def main():
|
||||
flow = StructuredExampleFlow()
|
||||
await flow.kickoff()
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user