mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 12:28:30 +00:00
Compare commits
2 Commits
devin/1765
...
devin/1740
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9453b2aaad | ||
|
|
30636c7b39 |
@@ -873,8 +873,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
Executes all listeners and routers triggered by a method completion.
|
||||
|
||||
This internal method manages the execution flow by:
|
||||
1. First executing all triggered routers sequentially
|
||||
2. Then executing all triggered listeners in parallel
|
||||
1. First executing all triggered routers sequentially and collecting their paths
|
||||
2. Then executing listeners for each router path in parallel
|
||||
3. Finally executing listeners for the original trigger method
|
||||
|
||||
Parameters
|
||||
----------
|
||||
@@ -887,26 +888,45 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
Notes
|
||||
-----
|
||||
- Routers are executed sequentially to maintain flow control
|
||||
- Each router's result becomes the new trigger_method
|
||||
- Each router's result is collected and processed independently
|
||||
- Normal listeners are executed in parallel for efficiency
|
||||
- Listeners can receive the trigger method's result as a parameter
|
||||
"""
|
||||
# First, handle routers repeatedly until no router triggers anymore
|
||||
while True:
|
||||
routers_triggered = self._find_triggered_methods(
|
||||
trigger_method, router_only=True
|
||||
)
|
||||
if not routers_triggered:
|
||||
break
|
||||
for router_name in routers_triggered:
|
||||
await self._execute_single_listener(router_name, result)
|
||||
# After executing router, the router's result is the path
|
||||
# The last router executed sets the trigger_method
|
||||
# The router result is the last element in self._method_outputs
|
||||
trigger_method = self._method_outputs[-1]
|
||||
# First, execute all routers for the trigger method
|
||||
router_paths = [] # Store all router paths
|
||||
routers_triggered = self._find_triggered_methods(
|
||||
trigger_method, router_only=True
|
||||
)
|
||||
print(f"Found routers for {trigger_method}: {routers_triggered}")
|
||||
|
||||
# Execute all routers and collect their results
|
||||
for router_name in routers_triggered:
|
||||
print(f"Executing router: {router_name}")
|
||||
await self._execute_single_listener(router_name, result)
|
||||
# After executing router, add its result to paths
|
||||
router_result = self._method_outputs[-1]
|
||||
print(f"Router {router_name} result: {router_result}")
|
||||
if router_result: # Only add non-None results
|
||||
router_paths.append(router_result)
|
||||
|
||||
# Now that no more routers are triggered by current trigger_method,
|
||||
# execute normal listeners
|
||||
# Process all router paths
|
||||
print(f"Processing router paths: {router_paths}")
|
||||
for path in router_paths:
|
||||
# Execute normal listeners for each router path
|
||||
listeners_triggered = self._find_triggered_methods(
|
||||
path, router_only=False
|
||||
)
|
||||
print(f"Found listeners for path {path}: {listeners_triggered}")
|
||||
if listeners_triggered:
|
||||
print(f"Executing listeners for path {path}")
|
||||
tasks = [
|
||||
self._execute_single_listener(listener_name, result)
|
||||
for listener_name in listeners_triggered
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
print(f"Finished executing listeners for path {path}")
|
||||
|
||||
# Now execute normal listeners for the original trigger method
|
||||
listeners_triggered = self._find_triggered_methods(
|
||||
trigger_method, router_only=False
|
||||
)
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from pydantic import BaseModel
|
||||
@@ -9,12 +10,12 @@ from pydantic import BaseModel
|
||||
from crewai.flow.flow import Flow, and_, listen, or_, router, start
|
||||
from crewai.utilities.events import (
|
||||
FlowFinishedEvent,
|
||||
FlowPlotEvent,
|
||||
FlowStartedEvent,
|
||||
MethodExecutionFinishedEvent,
|
||||
MethodExecutionStartedEvent,
|
||||
crewai_event_bus,
|
||||
)
|
||||
from crewai.utilities.events.flow_events import FlowPlotEvent
|
||||
|
||||
|
||||
def test_simple_sequential_flow():
|
||||
@@ -409,6 +410,96 @@ def test_router_with_multiple_conditions():
|
||||
assert execution_order.index("log_final_step") > execution_order.index("router_and")
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup_event_bus():
|
||||
"""Clean up event bus after each test."""
|
||||
crewai_event_bus._handlers = {}
|
||||
yield
|
||||
crewai_event_bus._handlers = {}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multiple_concurrent_routers():
|
||||
"""Test that multiple routers triggered by the same method all execute their events."""
|
||||
execution_order = []
|
||||
|
||||
# Setup event handlers
|
||||
@crewai_event_bus.on(FlowStartedEvent)
|
||||
def handle_flow_start(source: Any, event: FlowStartedEvent):
|
||||
pass
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionStartedEvent)
|
||||
def handle_method_start(source: Any, event: MethodExecutionStartedEvent):
|
||||
pass
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionFinishedEvent)
|
||||
def handle_method_end(source: Any, event: MethodExecutionFinishedEvent):
|
||||
pass
|
||||
|
||||
@crewai_event_bus.on(FlowFinishedEvent)
|
||||
def handle_flow_end(source: Any, event: FlowFinishedEvent):
|
||||
pass
|
||||
|
||||
class MultiRouterFlow(Flow):
|
||||
def __init__(self):
|
||||
print("Initializing MultiRouterFlow")
|
||||
super().__init__(diagnosed_conditions="ABCDH")
|
||||
print(f"State after init: {self._state}")
|
||||
|
||||
@start()
|
||||
async def diagnose_conditions(self):
|
||||
print("Running diagnose_conditions")
|
||||
execution_order.append("diagnose_conditions")
|
||||
print(f"State in diagnose_conditions: {self._state}")
|
||||
return self._state.get("diagnosed_conditions", "")
|
||||
|
||||
@router(diagnose_conditions)
|
||||
async def diabetes_router(self):
|
||||
execution_order.append("diabetes_router")
|
||||
conditions = self._state.get("diagnosed_conditions", "")
|
||||
print(f"Checking diabetes condition in: {conditions}")
|
||||
if "D" in conditions:
|
||||
return "diabetes"
|
||||
return None
|
||||
|
||||
@router(diagnose_conditions)
|
||||
async def hypertension_router(self):
|
||||
execution_order.append("hypertension_router")
|
||||
conditions = self._state.get("diagnosed_conditions", "")
|
||||
print(f"Checking hypertension condition in: {conditions}")
|
||||
if "H" in conditions:
|
||||
return "hypertension"
|
||||
return None
|
||||
|
||||
@listen("diabetes")
|
||||
async def diabetes_analysis(self):
|
||||
execution_order.append("diabetes_analysis")
|
||||
|
||||
@listen("hypertension")
|
||||
async def hypertension_analysis(self):
|
||||
execution_order.append("hypertension_analysis")
|
||||
|
||||
flow = MultiRouterFlow()
|
||||
try:
|
||||
await asyncio.wait_for(flow.kickoff_async(), timeout=10.0)
|
||||
except asyncio.TimeoutError:
|
||||
print("Flow execution timed out")
|
||||
pytest.fail("Flow execution timed out")
|
||||
|
||||
print("Execution order:", execution_order)
|
||||
|
||||
# Verify both routers and their listeners executed
|
||||
assert "diabetes_router" in execution_order
|
||||
assert "hypertension_router" in execution_order
|
||||
assert "diabetes_analysis" in execution_order
|
||||
assert "hypertension_analysis" in execution_order
|
||||
|
||||
# Verify execution order is correct
|
||||
assert execution_order.index("diagnose_conditions") < execution_order.index("diabetes_router")
|
||||
assert execution_order.index("diagnose_conditions") < execution_order.index("hypertension_router")
|
||||
assert execution_order.index("diabetes_router") < execution_order.index("diabetes_analysis")
|
||||
assert execution_order.index("hypertension_router") < execution_order.index("hypertension_analysis")
|
||||
|
||||
|
||||
def test_unstructured_flow_event_emission():
|
||||
"""Test that the correct events are emitted during unstructured flow
|
||||
execution with all fields validated."""
|
||||
|
||||
Reference in New Issue
Block a user