Compare commits

...

3 Commits

Author SHA1 Message Date
Brandon Hancock
cbe3e7cc5f remove prints 2024-10-29 17:49:56 -04:00
Brandon Hancock
876f71ade1 fix user found issue 2024-10-29 17:47:12 -04:00
Brandon Hancock
60d43c95b9 bugfix/flows-with-multiple-starts-plus-ands-breaking 2024-10-29 17:44:07 -04:00

View File

@@ -1,5 +1,3 @@
# flow.py
import asyncio import asyncio
import inspect import inspect
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
@@ -120,6 +118,8 @@ class FlowMeta(type):
methods = attr_value.__trigger_methods__ methods = attr_value.__trigger_methods__
condition_type = getattr(attr_value, "__condition_type__", "OR") condition_type = getattr(attr_value, "__condition_type__", "OR")
listeners[attr_name] = (condition_type, methods) listeners[attr_name] = (condition_type, methods)
# TODO: should we add a check for __condition_type__ 'AND'?
elif hasattr(attr_value, "__is_router__"): elif hasattr(attr_value, "__is_router__"):
routers[attr_value.__router_for__] = attr_name routers[attr_value.__router_for__] = attr_name
possible_returns = get_possible_return_constants(attr_value) possible_returns = get_possible_return_constants(attr_value)
@@ -159,7 +159,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
def __init__(self) -> None: def __init__(self) -> None:
self._methods: Dict[str, Callable] = {} self._methods: Dict[str, Callable] = {}
self._state: T = self._create_initial_state() self._state: T = self._create_initial_state()
self._completed_methods: Set[str] = set() self._executed_methods: Set[str] = set()
self._scheduled_tasks: Set[str] = set()
self._pending_and_listeners: Dict[str, Set[str]] = {} self._pending_and_listeners: Dict[str, Set[str]] = {}
self._method_outputs: List[Any] = [] # List to store all method outputs self._method_outputs: List[Any] = [] # List to store all method outputs
@@ -216,17 +217,24 @@ class Flow(Generic[T], metaclass=FlowMeta):
else: else:
return None # Or raise an exception if no methods were executed return None # Or raise an exception if no methods were executed
async def _execute_start_method(self, start_method: str) -> None: async def _execute_start_method(self, start_method_name: str) -> None:
result = await self._execute_method(self._methods[start_method]) result = await self._execute_method(
await self._execute_listeners(start_method, result) start_method_name, self._methods[start_method_name]
)
await self._execute_listeners(start_method_name, result)
async def _execute_method(self, method: Callable, *args: Any, **kwargs: Any) -> Any: async def _execute_method(
self, method_name: str, method: Callable, *args: Any, **kwargs: Any
) -> Any:
result = ( result = (
await method(*args, **kwargs) await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method) if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs) else method(*args, **kwargs)
) )
self._method_outputs.append(result) # Store the output self._method_outputs.append(result) # Store the output
self._executed_methods.add(method_name)
return result return result
async def _execute_listeners(self, trigger_method: str, result: Any) -> None: async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
@@ -234,32 +242,40 @@ class Flow(Generic[T], metaclass=FlowMeta):
if trigger_method in self._routers: if trigger_method in self._routers:
router_method = self._methods[self._routers[trigger_method]] router_method = self._methods[self._routers[trigger_method]]
path = await self._execute_method(router_method) path = await self._execute_method(
trigger_method, router_method
) # TODO: Change or not?
# Use the path as the new trigger method # Use the path as the new trigger method
trigger_method = path trigger_method = path
for listener, (condition_type, methods) in self._listeners.items(): for listener_name, (condition_type, methods) in self._listeners.items():
if condition_type == "OR": if condition_type == "OR":
if trigger_method in methods: if trigger_method in methods:
if (
listener_name not in self._executed_methods
and listener_name not in self._scheduled_tasks
):
self._scheduled_tasks.add(listener_name)
listener_tasks.append( listener_tasks.append(
self._execute_single_listener(listener, result) self._execute_single_listener(listener_name, result)
) )
elif condition_type == "AND": elif condition_type == "AND":
if listener not in self._pending_and_listeners: if all(method in self._executed_methods for method in methods):
self._pending_and_listeners[listener] = set() if (
self._pending_and_listeners[listener].add(trigger_method) listener_name not in self._executed_methods
if set(methods) == self._pending_and_listeners[listener]: and listener_name not in self._scheduled_tasks
):
self._scheduled_tasks.add(listener_name)
listener_tasks.append( listener_tasks.append(
self._execute_single_listener(listener, result) self._execute_single_listener(listener_name, result)
) )
del self._pending_and_listeners[listener]
# Run all listener tasks concurrently and wait for them to complete # Run all listener tasks concurrently and wait for them to complete
await asyncio.gather(*listener_tasks) await asyncio.gather(*listener_tasks)
async def _execute_single_listener(self, listener: str, result: Any) -> None: async def _execute_single_listener(self, listener_name: str, result: Any) -> None:
try: try:
method = self._methods[listener] method = self._methods[listener_name]
sig = inspect.signature(method) sig = inspect.signature(method)
params = list(sig.parameters.values()) params = list(sig.parameters.values())
@@ -268,15 +284,22 @@ class Flow(Generic[T], metaclass=FlowMeta):
if method_params: if method_params:
# If listener expects parameters, pass the result # If listener expects parameters, pass the result
listener_result = await self._execute_method(method, result) listener_result = await self._execute_method(
listener_name, method, result
)
else: else:
# If listener does not expect parameters, call without arguments # If listener does not expect parameters, call without arguments
listener_result = await self._execute_method(method) listener_result = await self._execute_method(listener_name, method)
# Remove from scheduled tasks after execution
self._scheduled_tasks.discard(listener_name)
# Execute listeners of this listener # Execute listeners of this listener
await self._execute_listeners(listener, listener_result) await self._execute_listeners(listener_name, listener_result)
except Exception as e: except Exception as e:
print(f"[Flow._execute_single_listener] Error in method {listener}: {e}") print(
f"[Flow._execute_single_listener] Error in method {listener_name}: {e}"
)
import traceback import traceback
traceback.print_exc() traceback.print_exc()