Compare commits

...

5 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
30da1a1811 Merge branch 'main' into bugfix/flow-multiple-router-support 2025-02-26 13:37:38 -05:00
Brandon Hancock (bhancock_ai)
fbf8732784 Fix type issue (#2224)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-26 13:27:41 -05:00
Brandon Hancock (bhancock_ai)
8fedbe49cb Add support for python 3.10 (#2230) 2025-02-26 13:24:31 -05:00
Lorenze Jay
3bba28c772 Merge branch 'main' into bugfix/flow-multiple-router-support 2025-02-26 09:26:12 -08:00
Brandon Hancock
7177f21d1e Support multiple router calls and address issue #2175 2025-02-25 16:30:09 -05:00
4 changed files with 160 additions and 32 deletions

View File

@@ -894,35 +894,45 @@ class Flow(Generic[T], metaclass=FlowMeta):
Notes
-----
- Routers are executed sequentially to maintain flow control
- Each router's result becomes the new trigger_method
- Each router's result becomes a new trigger_method
- Normal listeners are executed in parallel for efficiency
- Listeners can receive the trigger method's result as a parameter
"""
# First, handle routers repeatedly until no router triggers anymore
router_results = []
current_trigger = trigger_method
while True:
routers_triggered = self._find_triggered_methods(
trigger_method, router_only=True
current_trigger, router_only=True
)
if not routers_triggered:
break
for router_name in routers_triggered:
await self._execute_single_listener(router_name, result)
# After executing router, the router's result is the path
# The last router executed sets the trigger_method
# The router result is the last element in self._method_outputs
trigger_method = self._method_outputs[-1]
router_result = self._method_outputs[-1]
if router_result: # Only add non-None results
router_results.append(router_result)
current_trigger = (
router_result # Update for next iteration of router chain
)
# Now that no more routers are triggered by current trigger_method,
# execute normal listeners
listeners_triggered = self._find_triggered_methods(
trigger_method, router_only=False
)
if listeners_triggered:
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
# Now execute normal listeners for all router results and the original trigger
all_triggers = [trigger_method] + router_results
for current_trigger in all_triggers:
if current_trigger: # Skip None results
listeners_triggered = self._find_triggered_methods(
current_trigger, router_only=False
)
if listeners_triggered:
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
def _find_triggered_methods(
self, trigger_method: str, router_only: bool

View File

@@ -4,7 +4,7 @@ SQLite-based implementation of flow state persistence.
import json
import sqlite3
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional, Union
@@ -34,6 +34,7 @@ class SQLiteFlowPersistence(FlowPersistence):
ValueError: If db_path is invalid
"""
from crewai.utilities.paths import db_storage_path
# Get path from argument or default location
path = db_path or str(Path(db_storage_path()) / "flow_states.db")
@@ -46,7 +47,8 @@ class SQLiteFlowPersistence(FlowPersistence):
def init_db(self) -> None:
"""Create the necessary tables if they don't exist."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
conn.execute(
"""
CREATE TABLE IF NOT EXISTS flow_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
flow_uuid TEXT NOT NULL,
@@ -54,12 +56,15 @@ class SQLiteFlowPersistence(FlowPersistence):
timestamp DATETIME NOT NULL,
state_json TEXT NOT NULL
)
""")
"""
)
# Add index for faster UUID lookups
conn.execute("""
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_flow_states_uuid
ON flow_states(flow_uuid)
""")
"""
)
def save_state(
self,
@@ -85,19 +90,22 @@ class SQLiteFlowPersistence(FlowPersistence):
)
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
conn.execute(
"""
INSERT INTO flow_states (
flow_uuid,
method_name,
timestamp,
state_json
) VALUES (?, ?, ?, ?)
""", (
flow_uuid,
method_name,
datetime.utcnow().isoformat(),
json.dumps(state_dict),
))
""",
(
flow_uuid,
method_name,
datetime.now(timezone.utc).isoformat(),
json.dumps(state_dict),
),
)
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
"""Load the most recent state for a given flow UUID.
@@ -109,13 +117,16 @@ class SQLiteFlowPersistence(FlowPersistence):
The most recent state as a dictionary, or None if no state exists
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
cursor = conn.execute(
"""
SELECT state_json
FROM flow_states
WHERE flow_uuid = ?
ORDER BY id DESC
LIMIT 1
""", (flow_uuid,))
""",
(flow_uuid,),
)
row = cursor.fetchone()
if row:

View File

@@ -30,8 +30,14 @@ class TokenCalcHandler(CustomLogger):
if hasattr(usage, "prompt_tokens"):
self.token_cost_process.sum_prompt_tokens(usage.prompt_tokens)
if hasattr(usage, "completion_tokens"):
self.token_cost_process.sum_completion_tokens(usage.completion_tokens)
if hasattr(usage, "prompt_tokens_details") and usage.prompt_tokens_details:
self.token_cost_process.sum_completion_tokens(
usage.completion_tokens
)
if (
hasattr(usage, "prompt_tokens_details")
and usage.prompt_tokens_details
and usage.prompt_tokens_details.cached_tokens
):
self.token_cost_process.sum_cached_prompt_tokens(
usage.prompt_tokens_details.cached_tokens
)

View File

@@ -654,3 +654,104 @@ def test_flow_plotting():
assert isinstance(received_events[0], FlowPlotEvent)
assert received_events[0].flow_name == "StatelessFlow"
assert isinstance(received_events[0].timestamp, datetime)
def test_multiple_routers_from_same_trigger():
"""Test that multiple routers triggered by the same method all activate their listeners."""
execution_order = []
class MultiRouterFlow(Flow):
def __init__(self):
super().__init__()
# Set diagnosed conditions to trigger all routers
self.state["diagnosed_conditions"] = "DHA" # Contains D, H, and A
@start()
def scan_medical(self):
execution_order.append("scan_medical")
return "scan_complete"
@router(scan_medical)
def diagnose_conditions(self):
execution_order.append("diagnose_conditions")
return "diagnosis_complete"
@router(diagnose_conditions)
def diabetes_router(self):
execution_order.append("diabetes_router")
if "D" in self.state["diagnosed_conditions"]:
return "diabetes"
return None
@listen("diabetes")
def diabetes_analysis(self):
execution_order.append("diabetes_analysis")
return "diabetes_analysis_complete"
@router(diagnose_conditions)
def hypertension_router(self):
execution_order.append("hypertension_router")
if "H" in self.state["diagnosed_conditions"]:
return "hypertension"
return None
@listen("hypertension")
def hypertension_analysis(self):
execution_order.append("hypertension_analysis")
return "hypertension_analysis_complete"
@router(diagnose_conditions)
def anemia_router(self):
execution_order.append("anemia_router")
if "A" in self.state["diagnosed_conditions"]:
return "anemia"
return None
@listen("anemia")
def anemia_analysis(self):
execution_order.append("anemia_analysis")
return "anemia_analysis_complete"
flow = MultiRouterFlow()
flow.kickoff()
# Verify all methods were called
assert "scan_medical" in execution_order
assert "diagnose_conditions" in execution_order
# Verify all routers were called
assert "diabetes_router" in execution_order
assert "hypertension_router" in execution_order
assert "anemia_router" in execution_order
# Verify all listeners were called - this is the key test for the fix
assert "diabetes_analysis" in execution_order
assert "hypertension_analysis" in execution_order
assert "anemia_analysis" in execution_order
# Verify execution order constraints
assert execution_order.index("diagnose_conditions") > execution_order.index(
"scan_medical"
)
# All routers should execute after diagnose_conditions
assert execution_order.index("diabetes_router") > execution_order.index(
"diagnose_conditions"
)
assert execution_order.index("hypertension_router") > execution_order.index(
"diagnose_conditions"
)
assert execution_order.index("anemia_router") > execution_order.index(
"diagnose_conditions"
)
# All analyses should execute after their respective routers
assert execution_order.index("diabetes_analysis") > execution_order.index(
"diabetes_router"
)
assert execution_order.index("hypertension_analysis") > execution_order.index(
"hypertension_router"
)
assert execution_order.index("anemia_analysis") > execution_order.index(
"anemia_router"
)