mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
fix: Allow multiple routers to trigger events simultaneously
- Modified _execute_listeners to collect and process router paths independently - Added test case for multiple concurrent routers - Fixed router result handling to support multiple paths - Added debug logging for better troubleshooting Fixes #2175 Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
@@ -873,8 +873,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
Executes all listeners and routers triggered by a method completion.
|
Executes all listeners and routers triggered by a method completion.
|
||||||
|
|
||||||
This internal method manages the execution flow by:
|
This internal method manages the execution flow by:
|
||||||
1. First executing all triggered routers sequentially
|
1. First executing all triggered routers sequentially and collecting their paths
|
||||||
2. Then executing all triggered listeners in parallel
|
2. Then executing listeners for each router path in parallel
|
||||||
|
3. Finally executing listeners for the original trigger method
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
@@ -887,26 +888,45 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
Notes
|
Notes
|
||||||
-----
|
-----
|
||||||
- Routers are executed sequentially to maintain flow control
|
- Routers are executed sequentially to maintain flow control
|
||||||
- Each router's result becomes the new trigger_method
|
- Each router's result is collected and processed independently
|
||||||
- Normal listeners are executed in parallel for efficiency
|
- Normal listeners are executed in parallel for efficiency
|
||||||
- Listeners can receive the trigger method's result as a parameter
|
- Listeners can receive the trigger method's result as a parameter
|
||||||
"""
|
"""
|
||||||
# First, handle routers repeatedly until no router triggers anymore
|
# First, execute all routers for the trigger method
|
||||||
while True:
|
router_paths = [] # Store all router paths
|
||||||
routers_triggered = self._find_triggered_methods(
|
routers_triggered = self._find_triggered_methods(
|
||||||
trigger_method, router_only=True
|
trigger_method, router_only=True
|
||||||
)
|
)
|
||||||
if not routers_triggered:
|
print(f"Found routers for {trigger_method}: {routers_triggered}")
|
||||||
break
|
|
||||||
for router_name in routers_triggered:
|
# Execute all routers and collect their results
|
||||||
await self._execute_single_listener(router_name, result)
|
for router_name in routers_triggered:
|
||||||
# After executing router, the router's result is the path
|
print(f"Executing router: {router_name}")
|
||||||
# The last router executed sets the trigger_method
|
await self._execute_single_listener(router_name, result)
|
||||||
# The router result is the last element in self._method_outputs
|
# After executing router, add its result to paths
|
||||||
trigger_method = self._method_outputs[-1]
|
router_result = self._method_outputs[-1]
|
||||||
|
print(f"Router {router_name} result: {router_result}")
|
||||||
|
if router_result: # Only add non-None results
|
||||||
|
router_paths.append(router_result)
|
||||||
|
|
||||||
# Now that no more routers are triggered by current trigger_method,
|
# Process all router paths
|
||||||
# execute normal listeners
|
print(f"Processing router paths: {router_paths}")
|
||||||
|
for path in router_paths:
|
||||||
|
# Execute normal listeners for each router path
|
||||||
|
listeners_triggered = self._find_triggered_methods(
|
||||||
|
path, router_only=False
|
||||||
|
)
|
||||||
|
print(f"Found listeners for path {path}: {listeners_triggered}")
|
||||||
|
if listeners_triggered:
|
||||||
|
print(f"Executing listeners for path {path}")
|
||||||
|
tasks = [
|
||||||
|
self._execute_single_listener(listener_name, result)
|
||||||
|
for listener_name in listeners_triggered
|
||||||
|
]
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
print(f"Finished executing listeners for path {path}")
|
||||||
|
|
||||||
|
# Now execute normal listeners for the original trigger method
|
||||||
listeners_triggered = self._find_triggered_methods(
|
listeners_triggered = self._find_triggered_methods(
|
||||||
trigger_method, router_only=False
|
trigger_method, router_only=False
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -3,18 +3,20 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from crewai.flow.flow import Flow, and_, listen, or_, router, start
|
from crewai.flow.flow import Flow, and_, listen, or_, router, start
|
||||||
from crewai.utilities.events import (
|
from crewai.utilities.events import (
|
||||||
FlowFinishedEvent,
|
FlowFinishedEvent,
|
||||||
|
FlowPlotEvent,
|
||||||
FlowStartedEvent,
|
FlowStartedEvent,
|
||||||
MethodExecutionFinishedEvent,
|
MethodExecutionFinishedEvent,
|
||||||
MethodExecutionStartedEvent,
|
MethodExecutionStartedEvent,
|
||||||
crewai_event_bus,
|
crewai_event_bus,
|
||||||
)
|
)
|
||||||
from crewai.utilities.events.flow_events import FlowPlotEvent
|
|
||||||
|
|
||||||
|
|
||||||
def test_simple_sequential_flow():
|
def test_simple_sequential_flow():
|
||||||
@@ -409,6 +411,96 @@ def test_router_with_multiple_conditions():
|
|||||||
assert execution_order.index("log_final_step") > execution_order.index("router_and")
|
assert execution_order.index("log_final_step") > execution_order.index("router_and")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def cleanup_event_bus():
|
||||||
|
"""Clean up event bus after each test."""
|
||||||
|
crewai_event_bus._handlers = {}
|
||||||
|
yield
|
||||||
|
crewai_event_bus._handlers = {}
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_multiple_concurrent_routers():
|
||||||
|
"""Test that multiple routers triggered by the same method all execute their events."""
|
||||||
|
execution_order = []
|
||||||
|
|
||||||
|
# Setup event handlers
|
||||||
|
@crewai_event_bus.on(FlowStartedEvent)
|
||||||
|
def handle_flow_start(source: Any, event: FlowStartedEvent):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@crewai_event_bus.on(MethodExecutionStartedEvent)
|
||||||
|
def handle_method_start(source: Any, event: MethodExecutionStartedEvent):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@crewai_event_bus.on(MethodExecutionFinishedEvent)
|
||||||
|
def handle_method_end(source: Any, event: MethodExecutionFinishedEvent):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@crewai_event_bus.on(FlowFinishedEvent)
|
||||||
|
def handle_flow_end(source: Any, event: FlowFinishedEvent):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class MultiRouterFlow(Flow):
|
||||||
|
def __init__(self):
|
||||||
|
print("Initializing MultiRouterFlow")
|
||||||
|
super().__init__(diagnosed_conditions="ABCDH")
|
||||||
|
print(f"State after init: {self._state}")
|
||||||
|
|
||||||
|
@start()
|
||||||
|
async def diagnose_conditions(self):
|
||||||
|
print("Running diagnose_conditions")
|
||||||
|
execution_order.append("diagnose_conditions")
|
||||||
|
print(f"State in diagnose_conditions: {self._state}")
|
||||||
|
return self._state.get("diagnosed_conditions", "")
|
||||||
|
|
||||||
|
@router(diagnose_conditions)
|
||||||
|
async def diabetes_router(self):
|
||||||
|
execution_order.append("diabetes_router")
|
||||||
|
conditions = self._state.get("diagnosed_conditions", "")
|
||||||
|
print(f"Checking diabetes condition in: {conditions}")
|
||||||
|
if "D" in conditions:
|
||||||
|
return "diabetes"
|
||||||
|
return None
|
||||||
|
|
||||||
|
@router(diagnose_conditions)
|
||||||
|
async def hypertension_router(self):
|
||||||
|
execution_order.append("hypertension_router")
|
||||||
|
conditions = self._state.get("diagnosed_conditions", "")
|
||||||
|
print(f"Checking hypertension condition in: {conditions}")
|
||||||
|
if "H" in conditions:
|
||||||
|
return "hypertension"
|
||||||
|
return None
|
||||||
|
|
||||||
|
@listen("diabetes")
|
||||||
|
async def diabetes_analysis(self):
|
||||||
|
execution_order.append("diabetes_analysis")
|
||||||
|
|
||||||
|
@listen("hypertension")
|
||||||
|
async def hypertension_analysis(self):
|
||||||
|
execution_order.append("hypertension_analysis")
|
||||||
|
|
||||||
|
flow = MultiRouterFlow()
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(flow.kickoff_async(), timeout=10.0)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
print("Flow execution timed out")
|
||||||
|
pytest.fail("Flow execution timed out")
|
||||||
|
|
||||||
|
print("Execution order:", execution_order)
|
||||||
|
|
||||||
|
# Verify both routers and their listeners executed
|
||||||
|
assert "diabetes_router" in execution_order
|
||||||
|
assert "hypertension_router" in execution_order
|
||||||
|
assert "diabetes_analysis" in execution_order
|
||||||
|
assert "hypertension_analysis" in execution_order
|
||||||
|
|
||||||
|
# Verify execution order is correct
|
||||||
|
assert execution_order.index("diagnose_conditions") < execution_order.index("diabetes_router")
|
||||||
|
assert execution_order.index("diagnose_conditions") < execution_order.index("hypertension_router")
|
||||||
|
assert execution_order.index("diabetes_router") < execution_order.index("diabetes_analysis")
|
||||||
|
assert execution_order.index("hypertension_router") < execution_order.index("hypertension_analysis")
|
||||||
|
|
||||||
|
|
||||||
def test_unstructured_flow_event_emission():
|
def test_unstructured_flow_event_emission():
|
||||||
"""Test that the correct events are emitted during unstructured flow
|
"""Test that the correct events are emitted during unstructured flow
|
||||||
execution with all fields validated."""
|
execution with all fields validated."""
|
||||||
|
|||||||
Reference in New Issue
Block a user