From 30636c7b39bf9e90e3c1ae881ccfdb91a24d677c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 20 Feb 2025 15:34:19 +0000 Subject: [PATCH] fix: Allow multiple routers to trigger events simultaneously - Modified _execute_listeners to collect and process router paths independently - Added test case for multiple concurrent routers - Fixed router result handling to support multiple paths - Added debug logging for better troubleshooting Fixes #2175 Co-Authored-By: Joe Moura --- src/crewai/flow/flow.py | 56 ++++++++++++++++-------- tests/flow_test.py | 94 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 131 insertions(+), 19 deletions(-) diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 2babbe57c..39eb99186 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -873,8 +873,9 @@ class Flow(Generic[T], metaclass=FlowMeta): Executes all listeners and routers triggered by a method completion. This internal method manages the execution flow by: - 1. First executing all triggered routers sequentially - 2. Then executing all triggered listeners in parallel + 1. First executing all triggered routers sequentially and collecting their paths + 2. Then executing listeners for each router path in parallel + 3. Finally executing listeners for the original trigger method Parameters ---------- @@ -887,26 +888,45 @@ class Flow(Generic[T], metaclass=FlowMeta): Notes ----- - Routers are executed sequentially to maintain flow control - - Each router's result becomes the new trigger_method + - Each router's result is collected and processed independently - Normal listeners are executed in parallel for efficiency - Listeners can receive the trigger method's result as a parameter """ - # First, handle routers repeatedly until no router triggers anymore - while True: - routers_triggered = self._find_triggered_methods( - trigger_method, router_only=True - ) - if not routers_triggered: - break - for router_name in routers_triggered: - await self._execute_single_listener(router_name, result) - # After executing router, the router's result is the path - # The last router executed sets the trigger_method - # The router result is the last element in self._method_outputs - trigger_method = self._method_outputs[-1] + # First, execute all routers for the trigger method + router_paths = [] # Store all router paths + routers_triggered = self._find_triggered_methods( + trigger_method, router_only=True + ) + print(f"Found routers for {trigger_method}: {routers_triggered}") + + # Execute all routers and collect their results + for router_name in routers_triggered: + print(f"Executing router: {router_name}") + await self._execute_single_listener(router_name, result) + # After executing router, add its result to paths + router_result = self._method_outputs[-1] + print(f"Router {router_name} result: {router_result}") + if router_result: # Only add non-None results + router_paths.append(router_result) - # Now that no more routers are triggered by current trigger_method, - # execute normal listeners + # Process all router paths + print(f"Processing router paths: {router_paths}") + for path in router_paths: + # Execute normal listeners for each router path + listeners_triggered = self._find_triggered_methods( + path, router_only=False + ) + print(f"Found listeners for path {path}: {listeners_triggered}") + if listeners_triggered: + print(f"Executing listeners for path {path}") + tasks = [ + self._execute_single_listener(listener_name, result) + for listener_name in listeners_triggered + ] + await asyncio.gather(*tasks) + print(f"Finished executing listeners for path {path}") + + # Now execute normal listeners for the original trigger method listeners_triggered = self._find_triggered_methods( trigger_method, router_only=False ) diff --git a/tests/flow_test.py b/tests/flow_test.py index b2edcfa5a..4a9176d74 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -3,18 +3,20 @@ import asyncio from datetime import datetime +from typing import Any + import pytest from pydantic import BaseModel from crewai.flow.flow import Flow, and_, listen, or_, router, start from crewai.utilities.events import ( FlowFinishedEvent, + FlowPlotEvent, FlowStartedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, crewai_event_bus, ) -from crewai.utilities.events.flow_events import FlowPlotEvent def test_simple_sequential_flow(): @@ -409,6 +411,96 @@ def test_router_with_multiple_conditions(): assert execution_order.index("log_final_step") > execution_order.index("router_and") +@pytest.fixture(autouse=True) +def cleanup_event_bus(): + """Clean up event bus after each test.""" + crewai_event_bus._handlers = {} + yield + crewai_event_bus._handlers = {} + +@pytest.mark.asyncio +async def test_multiple_concurrent_routers(): + """Test that multiple routers triggered by the same method all execute their events.""" + execution_order = [] + + # Setup event handlers + @crewai_event_bus.on(FlowStartedEvent) + def handle_flow_start(source: Any, event: FlowStartedEvent): + pass + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def handle_method_start(source: Any, event: MethodExecutionStartedEvent): + pass + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def handle_method_end(source: Any, event: MethodExecutionFinishedEvent): + pass + + @crewai_event_bus.on(FlowFinishedEvent) + def handle_flow_end(source: Any, event: FlowFinishedEvent): + pass + + class MultiRouterFlow(Flow): + def __init__(self): + print("Initializing MultiRouterFlow") + super().__init__(diagnosed_conditions="ABCDH") + print(f"State after init: {self._state}") + + @start() + async def diagnose_conditions(self): + print("Running diagnose_conditions") + execution_order.append("diagnose_conditions") + print(f"State in diagnose_conditions: {self._state}") + return self._state.get("diagnosed_conditions", "") + + @router(diagnose_conditions) + async def diabetes_router(self): + execution_order.append("diabetes_router") + conditions = self._state.get("diagnosed_conditions", "") + print(f"Checking diabetes condition in: {conditions}") + if "D" in conditions: + return "diabetes" + return None + + @router(diagnose_conditions) + async def hypertension_router(self): + execution_order.append("hypertension_router") + conditions = self._state.get("diagnosed_conditions", "") + print(f"Checking hypertension condition in: {conditions}") + if "H" in conditions: + return "hypertension" + return None + + @listen("diabetes") + async def diabetes_analysis(self): + execution_order.append("diabetes_analysis") + + @listen("hypertension") + async def hypertension_analysis(self): + execution_order.append("hypertension_analysis") + + flow = MultiRouterFlow() + try: + await asyncio.wait_for(flow.kickoff_async(), timeout=10.0) + except asyncio.TimeoutError: + print("Flow execution timed out") + pytest.fail("Flow execution timed out") + + print("Execution order:", execution_order) + + # Verify both routers and their listeners executed + assert "diabetes_router" in execution_order + assert "hypertension_router" in execution_order + assert "diabetes_analysis" in execution_order + assert "hypertension_analysis" in execution_order + + # Verify execution order is correct + assert execution_order.index("diagnose_conditions") < execution_order.index("diabetes_router") + assert execution_order.index("diagnose_conditions") < execution_order.index("hypertension_router") + assert execution_order.index("diabetes_router") < execution_order.index("diabetes_analysis") + assert execution_order.index("hypertension_router") < execution_order.index("hypertension_analysis") + + def test_unstructured_flow_event_emission(): """Test that the correct events are emitted during unstructured flow execution with all fields validated."""