Compare commits

...

5 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
27396a2fe1 Merge branch 'main' into improvement/speed-up-calculate-node-levels 2025-02-25 15:27:10 -05:00
Lorenze Jay
62d0479fad Merge branch 'main' into improvement/speed-up-calculate-node-levels 2025-02-25 09:41:00 -08:00
Brandon Hancock (bhancock_ai)
32f2f16251 Merge branch 'main' into improvement/speed-up-calculate-node-levels 2025-02-24 16:43:01 -05:00
Brandon Hancock (bhancock_ai)
771cce027c Merge branch 'main' into improvement/speed-up-calculate-node-levels 2025-02-24 15:24:39 -05:00
Brandon Hancock
476396c5d9 incorporating fix from @misrasaurabh1 with additional type fix 2025-02-24 12:31:10 -05:00

View File

@@ -16,7 +16,8 @@ Example
import ast import ast
import inspect import inspect
import textwrap import textwrap
from typing import Any, Dict, List, Optional, Set, Union from collections import defaultdict, deque
from typing import Any, Deque, Dict, List, Optional, Set, Union
def get_possible_return_constants(function: Any) -> Optional[List[str]]: def get_possible_return_constants(function: Any) -> Optional[List[str]]:
@@ -118,7 +119,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
- Processes router paths separately - Processes router paths separately
""" """
levels: Dict[str, int] = {} levels: Dict[str, int] = {}
queue: List[str] = [] queue: Deque[str] = deque()
visited: Set[str] = set() visited: Set[str] = set()
pending_and_listeners: Dict[str, Set[str]] = {} pending_and_listeners: Dict[str, Set[str]] = {}
@@ -128,28 +129,35 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
levels[method_name] = 0 levels[method_name] = 0
queue.append(method_name) queue.append(method_name)
# Precompute listener dependencies
or_listeners = defaultdict(list)
and_listeners = defaultdict(set)
for listener_name, (condition_type, trigger_methods) in flow._listeners.items():
if condition_type == "OR":
for method in trigger_methods:
or_listeners[method].append(listener_name)
elif condition_type == "AND":
and_listeners[listener_name] = set(trigger_methods)
# Breadth-first traversal to assign levels # Breadth-first traversal to assign levels
while queue: while queue:
current = queue.pop(0) current = queue.popleft()
current_level = levels[current] current_level = levels[current]
visited.add(current) visited.add(current)
for listener_name, (condition_type, trigger_methods) in flow._listeners.items(): for listener_name in or_listeners[current]:
if condition_type == "OR": if listener_name not in levels or levels[listener_name] > current_level + 1:
if current in trigger_methods: levels[listener_name] = current_level + 1
if ( if listener_name not in visited:
listener_name not in levels queue.append(listener_name)
or levels[listener_name] > current_level + 1
): for listener_name, required_methods in and_listeners.items():
levels[listener_name] = current_level + 1 if current in required_methods:
if listener_name not in visited:
queue.append(listener_name)
elif condition_type == "AND":
if listener_name not in pending_and_listeners: if listener_name not in pending_and_listeners:
pending_and_listeners[listener_name] = set() pending_and_listeners[listener_name] = set()
if current in trigger_methods: pending_and_listeners[listener_name].add(current)
pending_and_listeners[listener_name].add(current)
if set(trigger_methods) == pending_and_listeners[listener_name]: if required_methods == pending_and_listeners[listener_name]:
if ( if (
listener_name not in levels listener_name not in levels
or levels[listener_name] > current_level + 1 or levels[listener_name] > current_level + 1
@@ -159,22 +167,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
queue.append(listener_name) queue.append(listener_name)
# Handle router connections # Handle router connections
if current in flow._routers: process_router_paths(flow, current, current_level, levels, queue)
router_method_name = current
paths = flow._router_paths.get(router_method_name, [])
for path in paths:
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if path in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
return levels return levels
@@ -227,10 +220,7 @@ def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
def dfs_ancestors( def dfs_ancestors(
node: str, node: str, ancestors: Dict[str, Set[str]], visited: Set[str], flow: Any
ancestors: Dict[str, Set[str]],
visited: Set[str],
flow: Any
) -> None: ) -> None:
""" """
Perform depth-first search to build ancestor relationships. Perform depth-first search to build ancestor relationships.
@@ -274,7 +264,9 @@ def dfs_ancestors(
dfs_ancestors(listener_name, ancestors, visited, flow) dfs_ancestors(listener_name, ancestors, visited, flow)
def is_ancestor(node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]) -> bool: def is_ancestor(
node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]
) -> bool:
""" """
Check if one node is an ancestor of another. Check if one node is an ancestor of another.
@@ -339,7 +331,9 @@ def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
return parent_children return parent_children
def get_child_index(parent: str, child: str, parent_children: Dict[str, List[str]]) -> int: def get_child_index(
parent: str, child: str, parent_children: Dict[str, List[str]]
) -> int:
""" """
Get the index of a child node in its parent's sorted children list. Get the index of a child node in its parent's sorted children list.
@@ -360,3 +354,23 @@ def get_child_index(parent: str, child: str, parent_children: Dict[str, List[str
children = parent_children.get(parent, []) children = parent_children.get(parent, [])
children.sort() children.sort()
return children.index(child) return children.index(child)
def process_router_paths(flow, current, current_level, levels, queue):
"""
Handle the router connections for the current node.
"""
if current in flow._routers:
paths = flow._router_paths.get(current, [])
for path in paths:
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if path in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
queue.append(listener_name)