refactor: improve flow handling, typing, and logging; update UI and tests

fix: refine nested flow conditionals and ensure router methods and routes are fully parsed
fix: improve docstrings, typing, and logging coverage across all events
feat: update flow.plot feature with new UI enhancements
chore: apply Ruff linting, reorganize imports, and remove deprecated utilities/files
chore: split constants and utils, clean JS comments, and add typing for linters
tests: strengthen test coverage for flow execution paths and router logic
This commit is contained in:
Greyson LaLonde
2025-11-01 02:15:06 +01:00
committed by GitHub
parent 2e9eb8c32d
commit e229ef4e19
27 changed files with 4976 additions and 1629 deletions

View File

@@ -850,31 +850,6 @@ def test_flow_plotting():
assert isinstance(received_events[0].timestamp, datetime)
def test_method_calls_crew_detection():
"""Test that method_calls_crew() detects .crew(), .kickoff(), and .kickoff_async() calls."""
from crewai.flow.visualization_utils import method_calls_crew
from crewai import Agent
# Test with a real Flow that uses agent.kickoff()
class FlowWithAgentKickoff(Flow):
@start()
def run_agent(self):
agent = Agent(role="test", goal="test", backstory="test")
return agent.kickoff("query")
flow = FlowWithAgentKickoff()
assert method_calls_crew(flow.run_agent) is True
# Test with a Flow that has no crew/agent calls
class FlowWithoutCrewCalls(Flow):
@start()
def simple_method(self):
return "Just a regular method"
flow2 = FlowWithoutCrewCalls()
assert method_calls_crew(flow2.simple_method) is False
def test_multiple_routers_from_same_trigger():
"""Test that multiple routers triggered by the same method all activate their listeners."""
execution_order = []
@@ -1058,3 +1033,354 @@ def test_nested_and_or_conditions():
# method_8 should execute after method_7
assert execution_order.index("method_8") > execution_order.index("method_7")
def test_diamond_dependency_pattern():
"""Test diamond pattern where two parallel paths converge at a final step."""
execution_order = []
class DiamondFlow(Flow):
@start()
def start(self):
execution_order.append("start")
return "started"
@listen(start)
def path_a(self):
execution_order.append("path_a")
return "a_done"
@listen(start)
def path_b(self):
execution_order.append("path_b")
return "b_done"
@listen(and_(path_a, path_b))
def converge(self):
execution_order.append("converge")
return "converged"
flow = DiamondFlow()
flow.kickoff()
# Start should execute first
assert execution_order[0] == "start"
# Both paths should execute after start
assert "path_a" in execution_order
assert "path_b" in execution_order
assert execution_order.index("path_a") > execution_order.index("start")
assert execution_order.index("path_b") > execution_order.index("start")
# Converge should be last and after both paths
assert execution_order[-1] == "converge"
assert execution_order.index("converge") > execution_order.index("path_a")
assert execution_order.index("converge") > execution_order.index("path_b")
def test_router_cascade_chain():
"""Test a chain of routers where each router triggers the next."""
execution_order = []
class RouterCascadeFlow(Flow):
def __init__(self):
super().__init__()
self.state["level"] = 1
@start()
def begin(self):
execution_order.append("begin")
return "started"
@router(begin)
def router_level_1(self):
execution_order.append("router_level_1")
return "level_1_path"
@listen("level_1_path")
def process_level_1(self):
execution_order.append("process_level_1")
self.state["level"] = 2
return "level_1_done"
@router(process_level_1)
def router_level_2(self):
execution_order.append("router_level_2")
return "level_2_path"
@listen("level_2_path")
def process_level_2(self):
execution_order.append("process_level_2")
self.state["level"] = 3
return "level_2_done"
@router(process_level_2)
def router_level_3(self):
execution_order.append("router_level_3")
return "final_path"
@listen("final_path")
def finalize(self):
execution_order.append("finalize")
return "complete"
flow = RouterCascadeFlow()
flow.kickoff()
expected_order = [
"begin",
"router_level_1",
"process_level_1",
"router_level_2",
"process_level_2",
"router_level_3",
"finalize",
]
assert execution_order == expected_order
assert flow.state["level"] == 3
def test_complex_and_or_branching():
"""Test complex branching with multiple AND and OR conditions."""
execution_order = []
class ComplexBranchingFlow(Flow):
@start()
def init(self):
execution_order.append("init")
@listen(init)
def branch_1a(self):
execution_order.append("branch_1a")
@listen(init)
def branch_1b(self):
execution_order.append("branch_1b")
@listen(init)
def branch_1c(self):
execution_order.append("branch_1c")
# Requires 1a AND 1b (ignoring 1c)
@listen(and_(branch_1a, branch_1b))
def branch_2a(self):
execution_order.append("branch_2a")
# Requires any of 1a, 1b, or 1c
@listen(or_(branch_1a, branch_1b, branch_1c))
def branch_2b(self):
execution_order.append("branch_2b")
# Final step requires 2a AND 2b
@listen(and_(branch_2a, branch_2b))
def final(self):
execution_order.append("final")
flow = ComplexBranchingFlow()
flow.kickoff()
# Verify all branches executed
assert "init" in execution_order
assert "branch_1a" in execution_order
assert "branch_1b" in execution_order
assert "branch_1c" in execution_order
assert "branch_2a" in execution_order
assert "branch_2b" in execution_order
assert "final" in execution_order
# Verify order constraints
assert execution_order.index("branch_2a") > execution_order.index("branch_1a")
assert execution_order.index("branch_2a") > execution_order.index("branch_1b")
# branch_2b should trigger after at least one of 1a, 1b, or 1c
min_branch_1_index = min(
execution_order.index("branch_1a"),
execution_order.index("branch_1b"),
execution_order.index("branch_1c"),
)
assert execution_order.index("branch_2b") > min_branch_1_index
# Final should be last and after both 2a and 2b
assert execution_order[-1] == "final"
assert execution_order.index("final") > execution_order.index("branch_2a")
assert execution_order.index("final") > execution_order.index("branch_2b")
def test_conditional_router_paths_exclusivity():
"""Test that only the returned router path activates, not all paths."""
execution_order = []
class ConditionalRouterFlow(Flow):
def __init__(self):
super().__init__()
self.state["condition"] = "take_path_b"
@start()
def begin(self):
execution_order.append("begin")
@router(begin)
def decision_point(self):
execution_order.append("decision_point")
if self.state["condition"] == "take_path_a":
return "path_a"
elif self.state["condition"] == "take_path_b":
return "path_b"
else:
return "path_c"
@listen("path_a")
def handle_path_a(self):
execution_order.append("handle_path_a")
@listen("path_b")
def handle_path_b(self):
execution_order.append("handle_path_b")
@listen("path_c")
def handle_path_c(self):
execution_order.append("handle_path_c")
flow = ConditionalRouterFlow()
flow.kickoff()
# Should only execute path_b, not path_a or path_c
assert "begin" in execution_order
assert "decision_point" in execution_order
assert "handle_path_b" in execution_order
assert "handle_path_a" not in execution_order
assert "handle_path_c" not in execution_order
def test_state_consistency_across_parallel_branches():
"""Test that state remains consistent when branches execute sequentially.
Note: Branches triggered by the same parent execute sequentially, not in parallel.
This ensures predictable state mutations and prevents race conditions.
"""
execution_order = []
class StateConsistencyFlow(Flow):
def __init__(self):
super().__init__()
self.state["counter"] = 0
self.state["branch_a_value"] = None
self.state["branch_b_value"] = None
@start()
def init(self):
execution_order.append("init")
self.state["counter"] = 10
@listen(init)
def branch_a(self):
execution_order.append("branch_a")
# Read counter value
self.state["branch_a_value"] = self.state["counter"]
self.state["counter"] += 1
@listen(init)
def branch_b(self):
execution_order.append("branch_b")
# Read counter value
self.state["branch_b_value"] = self.state["counter"]
self.state["counter"] += 5
@listen(and_(branch_a, branch_b))
def verify_state(self):
execution_order.append("verify_state")
flow = StateConsistencyFlow()
flow.kickoff()
# Branches execute sequentially, so branch_a runs first, then branch_b
assert flow.state["branch_a_value"] == 10 # Sees initial value
assert flow.state["branch_b_value"] == 11 # Sees value after branch_a increment
# Final counter should reflect both increments sequentially
assert flow.state["counter"] == 16 # 10 + 1 + 5
def test_deeply_nested_conditions():
"""Test deeply nested AND/OR conditions to ensure proper evaluation."""
execution_order = []
class DeeplyNestedFlow(Flow):
@start()
def a(self):
execution_order.append("a")
@start()
def b(self):
execution_order.append("b")
@start()
def c(self):
execution_order.append("c")
@start()
def d(self):
execution_order.append("d")
# Nested: (a AND b) OR (c AND d)
@listen(or_(and_(a, b), and_(c, d)))
def result(self):
execution_order.append("result")
flow = DeeplyNestedFlow()
flow.kickoff()
# All start methods should execute
assert "a" in execution_order
assert "b" in execution_order
assert "c" in execution_order
assert "d" in execution_order
# Result should execute after all starts
assert "result" in execution_order
assert execution_order.index("result") > execution_order.index("a")
assert execution_order.index("result") > execution_order.index("b")
assert execution_order.index("result") > execution_order.index("c")
assert execution_order.index("result") > execution_order.index("d")
def test_mixed_sync_async_execution_order():
"""Test that execution order is preserved with mixed sync/async methods."""
execution_order = []
class MixedSyncAsyncFlow(Flow):
@start()
def sync_start(self):
execution_order.append("sync_start")
@listen(sync_start)
async def async_step_1(self):
execution_order.append("async_step_1")
await asyncio.sleep(0.01)
@listen(async_step_1)
def sync_step_2(self):
execution_order.append("sync_step_2")
@listen(sync_step_2)
async def async_step_3(self):
execution_order.append("async_step_3")
await asyncio.sleep(0.01)
@listen(async_step_3)
def sync_final(self):
execution_order.append("sync_final")
flow = MixedSyncAsyncFlow()
asyncio.run(flow.kickoff_async())
expected_order = [
"sync_start",
"async_step_1",
"sync_step_2",
"async_step_3",
"sync_final",
]
assert execution_order == expected_order