Compare commits

...

2 Commits

Author SHA1 Message Date
Devin AI
9453b2aaad fix: Sort imports in flow_test.py to fix linting
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-20 15:36:09 +00:00
Devin AI
30636c7b39 fix: Allow multiple routers to trigger events simultaneously
- Modified _execute_listeners to collect and process router paths independently
- Added test case for multiple concurrent routers
- Fixed router result handling to support multiple paths
- Added debug logging for better troubleshooting

Fixes #2175

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-20 15:34:19 +00:00
2 changed files with 130 additions and 19 deletions

View File

@@ -873,8 +873,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
Executes all listeners and routers triggered by a method completion.
This internal method manages the execution flow by:
1. First executing all triggered routers sequentially
2. Then executing all triggered listeners in parallel
1. First executing all triggered routers sequentially and collecting their paths
2. Then executing listeners for each router path in parallel
3. Finally executing listeners for the original trigger method
Parameters
----------
@@ -887,26 +888,45 @@ class Flow(Generic[T], metaclass=FlowMeta):
Notes
-----
- Routers are executed sequentially to maintain flow control
- Each router's result becomes the new trigger_method
- Each router's result is collected and processed independently
- Normal listeners are executed in parallel for efficiency
- Listeners can receive the trigger method's result as a parameter
"""
# First, handle routers repeatedly until no router triggers anymore
while True:
routers_triggered = self._find_triggered_methods(
trigger_method, router_only=True
)
if not routers_triggered:
break
for router_name in routers_triggered:
await self._execute_single_listener(router_name, result)
# After executing router, the router's result is the path
# The last router executed sets the trigger_method
# The router result is the last element in self._method_outputs
trigger_method = self._method_outputs[-1]
# First, execute all routers for the trigger method
router_paths = [] # Store all router paths
routers_triggered = self._find_triggered_methods(
trigger_method, router_only=True
)
print(f"Found routers for {trigger_method}: {routers_triggered}")
# Execute all routers and collect their results
for router_name in routers_triggered:
print(f"Executing router: {router_name}")
await self._execute_single_listener(router_name, result)
# After executing router, add its result to paths
router_result = self._method_outputs[-1]
print(f"Router {router_name} result: {router_result}")
if router_result: # Only add non-None results
router_paths.append(router_result)
# Now that no more routers are triggered by current trigger_method,
# execute normal listeners
# Process all router paths
print(f"Processing router paths: {router_paths}")
for path in router_paths:
# Execute normal listeners for each router path
listeners_triggered = self._find_triggered_methods(
path, router_only=False
)
print(f"Found listeners for path {path}: {listeners_triggered}")
if listeners_triggered:
print(f"Executing listeners for path {path}")
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
print(f"Finished executing listeners for path {path}")
# Now execute normal listeners for the original trigger method
listeners_triggered = self._find_triggered_methods(
trigger_method, router_only=False
)

View File

@@ -2,6 +2,7 @@
import asyncio
from datetime import datetime
from typing import Any
import pytest
from pydantic import BaseModel
@@ -9,12 +10,12 @@ from pydantic import BaseModel
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.utilities.events import (
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
crewai_event_bus,
)
from crewai.utilities.events.flow_events import FlowPlotEvent
def test_simple_sequential_flow():
@@ -409,6 +410,96 @@ def test_router_with_multiple_conditions():
assert execution_order.index("log_final_step") > execution_order.index("router_and")
@pytest.fixture(autouse=True)
def cleanup_event_bus():
"""Clean up event bus after each test."""
crewai_event_bus._handlers = {}
yield
crewai_event_bus._handlers = {}
@pytest.mark.asyncio
async def test_multiple_concurrent_routers():
"""Test that multiple routers triggered by the same method all execute their events."""
execution_order = []
# Setup event handlers
@crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source: Any, event: FlowStartedEvent):
pass
@crewai_event_bus.on(MethodExecutionStartedEvent)
def handle_method_start(source: Any, event: MethodExecutionStartedEvent):
pass
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def handle_method_end(source: Any, event: MethodExecutionFinishedEvent):
pass
@crewai_event_bus.on(FlowFinishedEvent)
def handle_flow_end(source: Any, event: FlowFinishedEvent):
pass
class MultiRouterFlow(Flow):
def __init__(self):
print("Initializing MultiRouterFlow")
super().__init__(diagnosed_conditions="ABCDH")
print(f"State after init: {self._state}")
@start()
async def diagnose_conditions(self):
print("Running diagnose_conditions")
execution_order.append("diagnose_conditions")
print(f"State in diagnose_conditions: {self._state}")
return self._state.get("diagnosed_conditions", "")
@router(diagnose_conditions)
async def diabetes_router(self):
execution_order.append("diabetes_router")
conditions = self._state.get("diagnosed_conditions", "")
print(f"Checking diabetes condition in: {conditions}")
if "D" in conditions:
return "diabetes"
return None
@router(diagnose_conditions)
async def hypertension_router(self):
execution_order.append("hypertension_router")
conditions = self._state.get("diagnosed_conditions", "")
print(f"Checking hypertension condition in: {conditions}")
if "H" in conditions:
return "hypertension"
return None
@listen("diabetes")
async def diabetes_analysis(self):
execution_order.append("diabetes_analysis")
@listen("hypertension")
async def hypertension_analysis(self):
execution_order.append("hypertension_analysis")
flow = MultiRouterFlow()
try:
await asyncio.wait_for(flow.kickoff_async(), timeout=10.0)
except asyncio.TimeoutError:
print("Flow execution timed out")
pytest.fail("Flow execution timed out")
print("Execution order:", execution_order)
# Verify both routers and their listeners executed
assert "diabetes_router" in execution_order
assert "hypertension_router" in execution_order
assert "diabetes_analysis" in execution_order
assert "hypertension_analysis" in execution_order
# Verify execution order is correct
assert execution_order.index("diagnose_conditions") < execution_order.index("diabetes_router")
assert execution_order.index("diagnose_conditions") < execution_order.index("hypertension_router")
assert execution_order.index("diabetes_router") < execution_order.index("diabetes_analysis")
assert execution_order.index("hypertension_router") < execution_order.index("hypertension_analysis")
def test_unstructured_flow_event_emission():
"""Test that the correct events are emitted during unstructured flow
execution with all fields validated."""