diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 7a8b88ba0..3b6e81293 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -894,35 +894,45 @@ class Flow(Generic[T], metaclass=FlowMeta): Notes ----- - Routers are executed sequentially to maintain flow control - - Each router's result becomes the new trigger_method + - Each router's result becomes a new trigger_method - Normal listeners are executed in parallel for efficiency - Listeners can receive the trigger method's result as a parameter """ # First, handle routers repeatedly until no router triggers anymore + router_results = [] + current_trigger = trigger_method + while True: routers_triggered = self._find_triggered_methods( - trigger_method, router_only=True + current_trigger, router_only=True ) if not routers_triggered: break + for router_name in routers_triggered: await self._execute_single_listener(router_name, result) # After executing router, the router's result is the path - # The last router executed sets the trigger_method - # The router result is the last element in self._method_outputs - trigger_method = self._method_outputs[-1] + router_result = self._method_outputs[-1] + if router_result: # Only add non-None results + router_results.append(router_result) + current_trigger = ( + router_result # Update for next iteration of router chain + ) - # Now that no more routers are triggered by current trigger_method, - # execute normal listeners - listeners_triggered = self._find_triggered_methods( - trigger_method, router_only=False - ) - if listeners_triggered: - tasks = [ - self._execute_single_listener(listener_name, result) - for listener_name in listeners_triggered - ] - await asyncio.gather(*tasks) + # Now execute normal listeners for all router results and the original trigger + all_triggers = [trigger_method] + router_results + + for current_trigger in all_triggers: + if current_trigger: # Skip None results + listeners_triggered = self._find_triggered_methods( + current_trigger, router_only=False + ) + if listeners_triggered: + tasks = [ + self._execute_single_listener(listener_name, result) + for listener_name in listeners_triggered + ] + await asyncio.gather(*tasks) def _find_triggered_methods( self, trigger_method: str, router_only: bool diff --git a/tests/flow_test.py b/tests/flow_test.py index b2edcfa5a..c2640fffb 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -654,3 +654,104 @@ def test_flow_plotting(): assert isinstance(received_events[0], FlowPlotEvent) assert received_events[0].flow_name == "StatelessFlow" assert isinstance(received_events[0].timestamp, datetime) + + +def test_multiple_routers_from_same_trigger(): + """Test that multiple routers triggered by the same method all activate their listeners.""" + execution_order = [] + + class MultiRouterFlow(Flow): + def __init__(self): + super().__init__() + # Set diagnosed conditions to trigger all routers + self.state["diagnosed_conditions"] = "DHA" # Contains D, H, and A + + @start() + def scan_medical(self): + execution_order.append("scan_medical") + return "scan_complete" + + @router(scan_medical) + def diagnose_conditions(self): + execution_order.append("diagnose_conditions") + return "diagnosis_complete" + + @router(diagnose_conditions) + def diabetes_router(self): + execution_order.append("diabetes_router") + if "D" in self.state["diagnosed_conditions"]: + return "diabetes" + return None + + @listen("diabetes") + def diabetes_analysis(self): + execution_order.append("diabetes_analysis") + return "diabetes_analysis_complete" + + @router(diagnose_conditions) + def hypertension_router(self): + execution_order.append("hypertension_router") + if "H" in self.state["diagnosed_conditions"]: + return "hypertension" + return None + + @listen("hypertension") + def hypertension_analysis(self): + execution_order.append("hypertension_analysis") + return "hypertension_analysis_complete" + + @router(diagnose_conditions) + def anemia_router(self): + execution_order.append("anemia_router") + if "A" in self.state["diagnosed_conditions"]: + return "anemia" + return None + + @listen("anemia") + def anemia_analysis(self): + execution_order.append("anemia_analysis") + return "anemia_analysis_complete" + + flow = MultiRouterFlow() + flow.kickoff() + + # Verify all methods were called + assert "scan_medical" in execution_order + assert "diagnose_conditions" in execution_order + + # Verify all routers were called + assert "diabetes_router" in execution_order + assert "hypertension_router" in execution_order + assert "anemia_router" in execution_order + + # Verify all listeners were called - this is the key test for the fix + assert "diabetes_analysis" in execution_order + assert "hypertension_analysis" in execution_order + assert "anemia_analysis" in execution_order + + # Verify execution order constraints + assert execution_order.index("diagnose_conditions") > execution_order.index( + "scan_medical" + ) + + # All routers should execute after diagnose_conditions + assert execution_order.index("diabetes_router") > execution_order.index( + "diagnose_conditions" + ) + assert execution_order.index("hypertension_router") > execution_order.index( + "diagnose_conditions" + ) + assert execution_order.index("anemia_router") > execution_order.index( + "diagnose_conditions" + ) + + # All analyses should execute after their respective routers + assert execution_order.index("diabetes_analysis") > execution_order.index( + "diabetes_router" + ) + assert execution_order.index("hypertension_analysis") > execution_order.index( + "hypertension_router" + ) + assert execution_order.index("anemia_analysis") > execution_order.index( + "anemia_router" + )