mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-17 04:48:30 +00:00
Compare commits
3 Commits
lg-memory-
...
bugfix/flo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
30da1a1811 | ||
|
|
3bba28c772 | ||
|
|
7177f21d1e |
@@ -894,28 +894,38 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
Notes
|
Notes
|
||||||
-----
|
-----
|
||||||
- Routers are executed sequentially to maintain flow control
|
- Routers are executed sequentially to maintain flow control
|
||||||
- Each router's result becomes the new trigger_method
|
- Each router's result becomes a new trigger_method
|
||||||
- Normal listeners are executed in parallel for efficiency
|
- Normal listeners are executed in parallel for efficiency
|
||||||
- Listeners can receive the trigger method's result as a parameter
|
- Listeners can receive the trigger method's result as a parameter
|
||||||
"""
|
"""
|
||||||
# First, handle routers repeatedly until no router triggers anymore
|
# First, handle routers repeatedly until no router triggers anymore
|
||||||
|
router_results = []
|
||||||
|
current_trigger = trigger_method
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
routers_triggered = self._find_triggered_methods(
|
routers_triggered = self._find_triggered_methods(
|
||||||
trigger_method, router_only=True
|
current_trigger, router_only=True
|
||||||
)
|
)
|
||||||
if not routers_triggered:
|
if not routers_triggered:
|
||||||
break
|
break
|
||||||
|
|
||||||
for router_name in routers_triggered:
|
for router_name in routers_triggered:
|
||||||
await self._execute_single_listener(router_name, result)
|
await self._execute_single_listener(router_name, result)
|
||||||
# After executing router, the router's result is the path
|
# After executing router, the router's result is the path
|
||||||
# The last router executed sets the trigger_method
|
router_result = self._method_outputs[-1]
|
||||||
# The router result is the last element in self._method_outputs
|
if router_result: # Only add non-None results
|
||||||
trigger_method = self._method_outputs[-1]
|
router_results.append(router_result)
|
||||||
|
current_trigger = (
|
||||||
|
router_result # Update for next iteration of router chain
|
||||||
|
)
|
||||||
|
|
||||||
# Now that no more routers are triggered by current trigger_method,
|
# Now execute normal listeners for all router results and the original trigger
|
||||||
# execute normal listeners
|
all_triggers = [trigger_method] + router_results
|
||||||
|
|
||||||
|
for current_trigger in all_triggers:
|
||||||
|
if current_trigger: # Skip None results
|
||||||
listeners_triggered = self._find_triggered_methods(
|
listeners_triggered = self._find_triggered_methods(
|
||||||
trigger_method, router_only=False
|
current_trigger, router_only=False
|
||||||
)
|
)
|
||||||
if listeners_triggered:
|
if listeners_triggered:
|
||||||
tasks = [
|
tasks = [
|
||||||
|
|||||||
@@ -654,3 +654,104 @@ def test_flow_plotting():
|
|||||||
assert isinstance(received_events[0], FlowPlotEvent)
|
assert isinstance(received_events[0], FlowPlotEvent)
|
||||||
assert received_events[0].flow_name == "StatelessFlow"
|
assert received_events[0].flow_name == "StatelessFlow"
|
||||||
assert isinstance(received_events[0].timestamp, datetime)
|
assert isinstance(received_events[0].timestamp, datetime)
|
||||||
|
|
||||||
|
|
||||||
|
def test_multiple_routers_from_same_trigger():
|
||||||
|
"""Test that multiple routers triggered by the same method all activate their listeners."""
|
||||||
|
execution_order = []
|
||||||
|
|
||||||
|
class MultiRouterFlow(Flow):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
# Set diagnosed conditions to trigger all routers
|
||||||
|
self.state["diagnosed_conditions"] = "DHA" # Contains D, H, and A
|
||||||
|
|
||||||
|
@start()
|
||||||
|
def scan_medical(self):
|
||||||
|
execution_order.append("scan_medical")
|
||||||
|
return "scan_complete"
|
||||||
|
|
||||||
|
@router(scan_medical)
|
||||||
|
def diagnose_conditions(self):
|
||||||
|
execution_order.append("diagnose_conditions")
|
||||||
|
return "diagnosis_complete"
|
||||||
|
|
||||||
|
@router(diagnose_conditions)
|
||||||
|
def diabetes_router(self):
|
||||||
|
execution_order.append("diabetes_router")
|
||||||
|
if "D" in self.state["diagnosed_conditions"]:
|
||||||
|
return "diabetes"
|
||||||
|
return None
|
||||||
|
|
||||||
|
@listen("diabetes")
|
||||||
|
def diabetes_analysis(self):
|
||||||
|
execution_order.append("diabetes_analysis")
|
||||||
|
return "diabetes_analysis_complete"
|
||||||
|
|
||||||
|
@router(diagnose_conditions)
|
||||||
|
def hypertension_router(self):
|
||||||
|
execution_order.append("hypertension_router")
|
||||||
|
if "H" in self.state["diagnosed_conditions"]:
|
||||||
|
return "hypertension"
|
||||||
|
return None
|
||||||
|
|
||||||
|
@listen("hypertension")
|
||||||
|
def hypertension_analysis(self):
|
||||||
|
execution_order.append("hypertension_analysis")
|
||||||
|
return "hypertension_analysis_complete"
|
||||||
|
|
||||||
|
@router(diagnose_conditions)
|
||||||
|
def anemia_router(self):
|
||||||
|
execution_order.append("anemia_router")
|
||||||
|
if "A" in self.state["diagnosed_conditions"]:
|
||||||
|
return "anemia"
|
||||||
|
return None
|
||||||
|
|
||||||
|
@listen("anemia")
|
||||||
|
def anemia_analysis(self):
|
||||||
|
execution_order.append("anemia_analysis")
|
||||||
|
return "anemia_analysis_complete"
|
||||||
|
|
||||||
|
flow = MultiRouterFlow()
|
||||||
|
flow.kickoff()
|
||||||
|
|
||||||
|
# Verify all methods were called
|
||||||
|
assert "scan_medical" in execution_order
|
||||||
|
assert "diagnose_conditions" in execution_order
|
||||||
|
|
||||||
|
# Verify all routers were called
|
||||||
|
assert "diabetes_router" in execution_order
|
||||||
|
assert "hypertension_router" in execution_order
|
||||||
|
assert "anemia_router" in execution_order
|
||||||
|
|
||||||
|
# Verify all listeners were called - this is the key test for the fix
|
||||||
|
assert "diabetes_analysis" in execution_order
|
||||||
|
assert "hypertension_analysis" in execution_order
|
||||||
|
assert "anemia_analysis" in execution_order
|
||||||
|
|
||||||
|
# Verify execution order constraints
|
||||||
|
assert execution_order.index("diagnose_conditions") > execution_order.index(
|
||||||
|
"scan_medical"
|
||||||
|
)
|
||||||
|
|
||||||
|
# All routers should execute after diagnose_conditions
|
||||||
|
assert execution_order.index("diabetes_router") > execution_order.index(
|
||||||
|
"diagnose_conditions"
|
||||||
|
)
|
||||||
|
assert execution_order.index("hypertension_router") > execution_order.index(
|
||||||
|
"diagnose_conditions"
|
||||||
|
)
|
||||||
|
assert execution_order.index("anemia_router") > execution_order.index(
|
||||||
|
"diagnose_conditions"
|
||||||
|
)
|
||||||
|
|
||||||
|
# All analyses should execute after their respective routers
|
||||||
|
assert execution_order.index("diabetes_analysis") > execution_order.index(
|
||||||
|
"diabetes_router"
|
||||||
|
)
|
||||||
|
assert execution_order.index("hypertension_analysis") > execution_order.index(
|
||||||
|
"hypertension_router"
|
||||||
|
)
|
||||||
|
assert execution_order.index("anemia_analysis") > execution_order.index(
|
||||||
|
"anemia_router"
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user