diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index ba50b21eb..ed7b933d3 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -1,12 +1,85 @@ import asyncio import inspect -from typing import Any, Callable, Dict, Generic, List, Type, TypeVar, Union +from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union from pydantic import BaseModel T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]]) +def start(): + def decorator(func): + print(f"[start decorator] Decorating start method: {func.__name__}") + func.__is_start_method__ = True + return func + + return decorator + + +def listen(condition): + def decorator(func): + print( + f"[listen decorator] Decorating listener: {func.__name__} with condition: {condition}" + ) + if isinstance(condition, str): + func.__trigger_methods__ = [condition] + func.__condition_type__ = "OR" + print( + f"[listen decorator] Set __trigger_methods__ for {func.__name__}: [{condition}] with mode: OR" + ) + elif ( + isinstance(condition, dict) + and "type" in condition + and "methods" in condition + ): + func.__trigger_methods__ = condition["methods"] + func.__condition_type__ = condition["type"] + print( + f"[listen decorator] Set __trigger_methods__ for {func.__name__}: {func.__trigger_methods__} with mode: {func.__condition_type__}" + ) + elif callable(condition) and hasattr(condition, "__name__"): + func.__trigger_methods__ = [condition.__name__] + func.__condition_type__ = "OR" + print( + f"[listen decorator] Set __trigger_methods__ for {func.__name__}: [{condition.__name__}] with mode: OR" + ) + else: + raise ValueError( + "Condition must be a method, string, or a result of or_() or and_()" + ) + return func + + return decorator + + +def or_(*conditions): + methods = [] + for condition in conditions: + if isinstance(condition, dict) and "methods" in condition: + methods.extend(condition["methods"]) + elif callable(condition) and hasattr(condition, "__name__"): + methods.append(condition.__name__) + elif isinstance(condition, str): + methods.append(condition) + else: + raise ValueError("Invalid condition in or_()") + return {"type": "OR", "methods": methods} + + +def and_(*conditions): + methods = [] + for condition in conditions: + if isinstance(condition, dict) and "methods" in condition: + methods.extend(condition["methods"]) + elif callable(condition) and hasattr(condition, "__name__"): + methods.append(condition.__name__) + elif isinstance(condition, str): + methods.append(condition) + else: + raise ValueError("Invalid condition in and_()") + return {"type": "AND", "methods": methods} + + class FlowMeta(type): def __new__(mcs, name, bases, dct): cls = super().__new__(mcs, name, bases, dct) @@ -14,69 +87,51 @@ class FlowMeta(type): start_methods = [] listeners = {} + print(f"[FlowMeta] Processing class: {name}") for attr_name, attr_value in dct.items(): + print(f"[FlowMeta] Checking attribute: {attr_name}") if hasattr(attr_value, "__is_start_method__"): + print(f"[FlowMeta] Found start method: {attr_name}") start_methods.append(attr_name) if hasattr(attr_value, "__trigger_methods__"): - condition = attr_value.__trigger_methods__ - if callable(condition): - # Single method reference - method_name = condition.__name__ - if method_name not in listeners: - listeners[method_name] = [] - listeners[method_name].append((attr_name, "SINGLE", [method_name])) - elif isinstance(condition, str): - # Single method name - if condition not in listeners: - listeners[condition] = [] - listeners[condition].append((attr_name, "SINGLE", [condition])) - elif isinstance(condition, tuple): - # AND or OR condition - condition_type = ( - "AND" if any(item == "and" for item in condition) else "OR" - ) - methods = [ - m.__name__ if callable(m) else m - for m in condition - if m != "and" and m != "or" - ] - for method in methods: - if method not in listeners: - listeners[method] = [] - listeners[method].append((attr_name, condition_type, methods)) - else: - raise ValueError(f"Invalid listener format for {attr_name}") + methods = attr_value.__trigger_methods__ + condition_type = getattr(attr_value, "__condition_type__", "OR") + print(f"[FlowMeta] Conditions for {attr_name}:", methods) + listeners[attr_name] = (condition_type, methods) setattr(cls, "_start_methods", start_methods) setattr(cls, "_listeners", listeners) - if "initial_state" in dct: - initial_state = dct["initial_state"] - if isinstance(initial_state, type) and issubclass(initial_state, BaseModel): - cls.__annotations__["state"] = initial_state - elif isinstance(initial_state, dict): - cls.__annotations__["state"] = Dict[str, Any] + print("[FlowMeta] ALL LISTENERS:", listeners) + print("[FlowMeta] START METHODS:", start_methods) return cls class Flow(Generic[T], metaclass=FlowMeta): _start_methods: List[str] = [] - _listeners: Dict[str, List[tuple[str, str, List[str]]]] = {} + _listeners: Dict[str, tuple[str, List[str]]] = {} initial_state: Union[Type[T], T, None] = None def __init__(self): + print("[Flow.__init__] Initializing Flow") self._methods: Dict[str, Callable] = {} self._state = self._create_initial_state() - self._completed_methods: set[str] = set() + self._completed_methods: Set[str] = set() + self._pending_and_listeners: Dict[str, Set[str]] = {} for method_name in dir(self): if callable(getattr(self, method_name)) and not method_name.startswith( "__" ): + print(f"[Flow.__init__] Adding method: {method_name}") self._methods[method_name] = getattr(self, method_name) + print("[Flow.__init__] All methods:", self._methods.keys()) + print("[Flow.__init__] Listeners:", self._listeners) + def _create_initial_state(self) -> T: + print("[Flow._create_initial_state] Creating initial state") if self.initial_state is None: return {} # type: ignore elif isinstance(self.initial_state, type): @@ -89,67 +144,81 @@ class Flow(Generic[T], metaclass=FlowMeta): return self._state async def kickoff(self): + print("[Flow.kickoff] Starting kickoff") if not self._start_methods: raise ValueError("No start method defined") for start_method in self._start_methods: + print(f"[Flow.kickoff] Executing start method: {start_method}") result = await self._execute_method(self._methods[start_method]) + print( + f"[Flow.kickoff] Start method {start_method} completed. Executing listeners." + ) await self._execute_listeners(start_method, result) async def _execute_method(self, method: Callable, *args, **kwargs): + print(f"[Flow._execute_method] Executing method: {method.__name__}") if inspect.iscoroutinefunction(method): return await method(*args, **kwargs) else: return method(*args, **kwargs) async def _execute_listeners(self, trigger_method: str, result: Any): - self._completed_methods.add(trigger_method) - - if trigger_method in self._listeners: - listener_tasks = [] - for listener, condition_type, methods in self._listeners[trigger_method]: - if condition_type == "OR": - if trigger_method in methods: - listener_tasks.append( - self._execute_single_listener(listener, result) - ) - elif condition_type == "AND": - if all(method in self._completed_methods for method in methods): - listener_tasks.append( - self._execute_single_listener(listener, result) - ) - elif condition_type == "SINGLE": + print( + f"[Flow._execute_listeners] Executing listeners for trigger method: {trigger_method}" + ) + listener_tasks = [] + for listener, (condition_type, methods) in self._listeners.items(): + print( + f"[Flow._execute_listeners] Checking listener: {listener}, condition: {condition_type}, methods: {methods}" + ) + if condition_type == "OR": + if trigger_method in methods: + print( + f"[Flow._execute_listeners] TRIGGERING METHOD: {listener} due to trigger: {trigger_method}" + ) listener_tasks.append( self._execute_single_listener(listener, result) ) + elif condition_type == "AND": + if listener not in self._pending_and_listeners: + self._pending_and_listeners[listener] = set() + self._pending_and_listeners[listener].add(trigger_method) + if set(methods) == self._pending_and_listeners[listener]: + print( + f"[Flow._execute_listeners] All conditions met for listener: {listener}. Executing." + ) + listener_tasks.append( + self._execute_single_listener(listener, result) + ) + del self._pending_and_listeners[listener] - # Run all listener tasks concurrently and wait for them to complete - await asyncio.gather(*listener_tasks) + # Run all listener tasks concurrently and wait for them to complete + print( + f"[Flow._execute_listeners] Executing {len(listener_tasks)} listener tasks" + ) + await asyncio.gather(*listener_tasks) async def _execute_single_listener(self, listener: str, result: Any): + print(f"[Flow._execute_single_listener] Executing listener: {listener}") try: method = self._methods[listener] sig = inspect.signature(method) if len(sig.parameters) > 1: # More than just 'self' + print( + f"[Flow._execute_single_listener] Executing {listener} with result" + ) listener_result = await self._execute_method(method, result) else: + print( + f"[Flow._execute_single_listener] Executing {listener} without result" + ) listener_result = await self._execute_method(method) + print( + f"[Flow._execute_single_listener] {listener} completed, executing its listeners" + ) await self._execute_listeners(listener, listener_result) except Exception as e: - print(f"Error in method {listener}: {str(e)}") - - -def start(): - def decorator(func): - func.__is_start_method__ = True - return func - - return decorator - - -def listen(condition): - def decorator(func): - func.__trigger_methods__ = condition - return func - - return decorator + print( + f"[Flow._execute_single_listener] Error in method {listener}: {str(e)}" + ) diff --git a/src/crewai/flow/structured_test_flow.py b/src/crewai/flow/structured_test_flow.py index c31ba81d9..20b687225 100644 --- a/src/crewai/flow/structured_test_flow.py +++ b/src/crewai/flow/structured_test_flow.py @@ -9,7 +9,7 @@ class ExampleState(BaseModel): message: str = "" -class StructuredExampleFlow(Flow[ExampleState]): +class StructuredExampleFlow(Flow): initial_state = ExampleState @start() @@ -41,7 +41,7 @@ class StructuredExampleFlow(Flow[ExampleState]): async def main(): flow = StructuredExampleFlow() - await flow.run() + await flow.kickoff() asyncio.run(main()) diff --git a/src/crewai/flow/structured_test_flow_and.py b/src/crewai/flow/structured_test_flow_and.py index c31ba81d9..910849cc0 100644 --- a/src/crewai/flow/structured_test_flow_and.py +++ b/src/crewai/flow/structured_test_flow_and.py @@ -9,7 +9,7 @@ class ExampleState(BaseModel): message: str = "" -class StructuredExampleFlow(Flow[ExampleState]): +class StructuredExampleFlow(Flow): initial_state = ExampleState @start() @@ -21,27 +21,22 @@ class StructuredExampleFlow(Flow[ExampleState]): return "Start result" @listen(start_method) - async def second_method(self, result): - print(f"Second method, received: {result}") + async def second_method(self): print(f"State before increment: {self.state}") self.state.counter += 1 self.state.message += " - updated" print(f"State after second_method: {self.state}") return "Second result" - @listen(start_method) - async def third_method(self, result): - print(f"Third method, received: {result}") - print(f"State before increment: {self.state}") - self.state.counter += 1 - self.state.message += " - updated" - print(f"State after third_method: {self.state}") - return "Third result" + @listen(start_method and second_method) + async def logger(self): + print("AND METHOD RUNNING") + print("CURRENT STATE FROM OR: ", self.state) async def main(): flow = StructuredExampleFlow() - await flow.run() + await flow.kickoff() asyncio.run(main()) diff --git a/src/crewai/flow/structured_test_flow_or.py b/src/crewai/flow/structured_test_flow_or.py index a0e62d1eb..a7e58ac35 100644 --- a/src/crewai/flow/structured_test_flow_or.py +++ b/src/crewai/flow/structured_test_flow_or.py @@ -1,6 +1,6 @@ import asyncio -from crewai.flow.flow import Flow, listen, start +from crewai.flow.flow import Flow, and_, listen, or_, start from pydantic import BaseModel @@ -21,23 +21,27 @@ class StructuredExampleFlow(Flow[ExampleState]): return "Start result" @listen(start_method) - async def second_method(self, result): - print(f"Second method, received: {result}") + async def second_method(self): print(f"State before increment: {self.state}") self.state.counter += 1 self.state.message += " - updated" print(f"State after second_method: {self.state}") return "Second result" - @listen(start_method or second_method) + @listen(or_(start_method, second_method)) async def logger(self): - print("OR METHOD RUNNING") - print("CURRENT STATE FROM OR: ", self.state) + print("LOGGER METHOD RUNNING") + print("CURRENT STATE FROM LOGGER: ", self.state) + + @listen(and_(start_method, second_method)) + async def and_logger(self): + print("AND LOGGER METHOD RUNNING") + print("CURRENT STATE FROM AND LOGGER: ", self.state) async def main(): flow = StructuredExampleFlow() - await flow.run() + await flow.kickoff() asyncio.run(main()) diff --git a/src/crewai/flow/unstructured_test_flow.py b/src/crewai/flow/unstructured_test_flow.py index b6cc5e573..cff142e4f 100644 --- a/src/crewai/flow/unstructured_test_flow.py +++ b/src/crewai/flow/unstructured_test_flow.py @@ -1,3 +1,5 @@ +import asyncio + from crewai.flow.flow import Flow, listen, start @@ -23,6 +25,9 @@ class FlexibleExampleFlow(Flow): return "Third result" -# Run the flows -flexible_flow = FlexibleExampleFlow() -flexible_flow.run() +async def main(): + flow = FlexibleExampleFlow() + await flow.kickoff() + + +asyncio.run(main())