mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 08:08:32 +00:00
- Add test_flow_with_rlock_in_state to verify Flow works with threading.RLock in state - Add test_flow_with_nested_unpickleable_objects to verify Flow works with unpickleable objects nested in containers - These tests ensure the issue from version 1.3.0 (TypeError: cannot pickle '_thread.RLock' object) doesn't get reintroduced - The issue was resolved in the current main branch by removing the _copy_state() method that used copy.deepcopy() - Tests verify that flows with memory components or other resources containing locks work correctly Co-Authored-By: João <joao@crewai.com>
415 lines
12 KiB
Python
415 lines
12 KiB
Python
"""Test Flow creation and execution basic functionality."""
|
|
|
|
import asyncio
|
|
import threading
|
|
|
|
import pytest
|
|
from pydantic import BaseModel
|
|
|
|
from crewai.flow.flow import Flow, and_, listen, or_, router, start
|
|
|
|
|
|
def test_simple_sequential_flow():
|
|
"""Test a simple flow with two steps called sequentially."""
|
|
execution_order = []
|
|
|
|
class SimpleFlow(Flow):
|
|
@start()
|
|
def step_1(self):
|
|
execution_order.append("step_1")
|
|
|
|
@listen(step_1)
|
|
def step_2(self):
|
|
execution_order.append("step_2")
|
|
|
|
flow = SimpleFlow()
|
|
flow.kickoff()
|
|
|
|
assert execution_order == ["step_1", "step_2"]
|
|
|
|
|
|
def test_flow_with_multiple_starts():
|
|
"""Test a flow with multiple start methods."""
|
|
execution_order = []
|
|
|
|
class MultiStartFlow(Flow):
|
|
@start()
|
|
def step_a(self):
|
|
execution_order.append("step_a")
|
|
|
|
@start()
|
|
def step_b(self):
|
|
execution_order.append("step_b")
|
|
|
|
@listen(step_a)
|
|
def step_c(self):
|
|
execution_order.append("step_c")
|
|
|
|
@listen(step_b)
|
|
def step_d(self):
|
|
execution_order.append("step_d")
|
|
|
|
flow = MultiStartFlow()
|
|
flow.kickoff()
|
|
|
|
assert "step_a" in execution_order
|
|
assert "step_b" in execution_order
|
|
assert "step_c" in execution_order
|
|
assert "step_d" in execution_order
|
|
assert execution_order.index("step_c") > execution_order.index("step_a")
|
|
assert execution_order.index("step_d") > execution_order.index("step_b")
|
|
|
|
|
|
def test_cyclic_flow():
|
|
"""Test a cyclic flow that runs a finite number of iterations."""
|
|
execution_order = []
|
|
|
|
class CyclicFlow(Flow):
|
|
iteration = 0
|
|
max_iterations = 3
|
|
|
|
@start("loop")
|
|
def step_1(self):
|
|
if self.iteration >= self.max_iterations:
|
|
return # Do not proceed further
|
|
execution_order.append(f"step_1_{self.iteration}")
|
|
|
|
@listen(step_1)
|
|
def step_2(self):
|
|
execution_order.append(f"step_2_{self.iteration}")
|
|
|
|
@router(step_2)
|
|
def step_3(self):
|
|
execution_order.append(f"step_3_{self.iteration}")
|
|
self.iteration += 1
|
|
if self.iteration < self.max_iterations:
|
|
return "loop"
|
|
|
|
return "exit"
|
|
|
|
flow = CyclicFlow()
|
|
flow.kickoff()
|
|
|
|
expected_order = []
|
|
for i in range(flow.max_iterations):
|
|
expected_order.extend([f"step_1_{i}", f"step_2_{i}", f"step_3_{i}"])
|
|
|
|
assert execution_order == expected_order
|
|
|
|
|
|
def test_flow_with_and_condition():
|
|
"""Test a flow where a step waits for multiple other steps to complete."""
|
|
execution_order = []
|
|
|
|
class AndConditionFlow(Flow):
|
|
@start()
|
|
def step_1(self):
|
|
execution_order.append("step_1")
|
|
|
|
@start()
|
|
def step_2(self):
|
|
execution_order.append("step_2")
|
|
|
|
@listen(and_(step_1, step_2))
|
|
def step_3(self):
|
|
execution_order.append("step_3")
|
|
|
|
flow = AndConditionFlow()
|
|
flow.kickoff()
|
|
|
|
assert "step_1" in execution_order
|
|
assert "step_2" in execution_order
|
|
assert execution_order[-1] == "step_3"
|
|
assert execution_order.index("step_3") > execution_order.index("step_1")
|
|
assert execution_order.index("step_3") > execution_order.index("step_2")
|
|
|
|
|
|
def test_flow_with_or_condition():
|
|
"""Test a flow where a step is triggered when any of multiple steps complete."""
|
|
execution_order = []
|
|
|
|
class OrConditionFlow(Flow):
|
|
@start()
|
|
def step_a(self):
|
|
execution_order.append("step_a")
|
|
|
|
@start()
|
|
def step_b(self):
|
|
execution_order.append("step_b")
|
|
|
|
@listen(or_(step_a, step_b))
|
|
def step_c(self):
|
|
execution_order.append("step_c")
|
|
|
|
flow = OrConditionFlow()
|
|
flow.kickoff()
|
|
|
|
assert "step_a" in execution_order or "step_b" in execution_order
|
|
assert "step_c" in execution_order
|
|
assert execution_order.index("step_c") > min(
|
|
execution_order.index("step_a"), execution_order.index("step_b")
|
|
)
|
|
|
|
|
|
def test_flow_with_router():
|
|
"""Test a flow that uses a router method to determine the next step."""
|
|
execution_order = []
|
|
|
|
class RouterFlow(Flow):
|
|
@start()
|
|
def start_method(self):
|
|
execution_order.append("start_method")
|
|
|
|
@router(start_method)
|
|
def router(self):
|
|
execution_order.append("router")
|
|
# Ensure the condition is set to True to follow the "step_if_true" path
|
|
condition = True
|
|
return "step_if_true" if condition else "step_if_false"
|
|
|
|
@listen("step_if_true")
|
|
def truthy(self):
|
|
execution_order.append("step_if_true")
|
|
|
|
@listen("step_if_false")
|
|
def falsy(self):
|
|
execution_order.append("step_if_false")
|
|
|
|
flow = RouterFlow()
|
|
flow.kickoff()
|
|
|
|
assert execution_order == ["start_method", "router", "step_if_true"]
|
|
|
|
|
|
def test_async_flow():
|
|
"""Test an asynchronous flow."""
|
|
execution_order = []
|
|
|
|
class AsyncFlow(Flow):
|
|
@start()
|
|
async def step_1(self):
|
|
execution_order.append("step_1")
|
|
await asyncio.sleep(0.1)
|
|
|
|
@listen(step_1)
|
|
async def step_2(self):
|
|
execution_order.append("step_2")
|
|
await asyncio.sleep(0.1)
|
|
|
|
flow = AsyncFlow()
|
|
asyncio.run(flow.kickoff_async())
|
|
|
|
assert execution_order == ["step_1", "step_2"]
|
|
|
|
|
|
def test_flow_with_exceptions():
|
|
"""Test flow behavior when exceptions occur in steps."""
|
|
execution_order = []
|
|
|
|
class ExceptionFlow(Flow):
|
|
@start()
|
|
def step_1(self):
|
|
execution_order.append("step_1")
|
|
raise ValueError("An error occurred in step_1")
|
|
|
|
@listen(step_1)
|
|
def step_2(self):
|
|
execution_order.append("step_2")
|
|
|
|
flow = ExceptionFlow()
|
|
|
|
with pytest.raises(ValueError):
|
|
flow.kickoff()
|
|
|
|
# Ensure step_2 did not execute
|
|
assert execution_order == ["step_1"]
|
|
|
|
|
|
def test_flow_restart():
|
|
"""Test restarting a flow after it has completed."""
|
|
execution_order = []
|
|
|
|
class RestartableFlow(Flow):
|
|
@start()
|
|
def step_1(self):
|
|
execution_order.append("step_1")
|
|
|
|
@listen(step_1)
|
|
def step_2(self):
|
|
execution_order.append("step_2")
|
|
|
|
flow = RestartableFlow()
|
|
flow.kickoff()
|
|
flow.kickoff() # Restart the flow
|
|
|
|
assert execution_order == ["step_1", "step_2", "step_1", "step_2"]
|
|
|
|
|
|
def test_flow_with_custom_state():
|
|
"""Test a flow that maintains and modifies internal state."""
|
|
|
|
class StateFlow(Flow):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.counter = 0
|
|
|
|
@start()
|
|
def step_1(self):
|
|
self.counter += 1
|
|
|
|
@listen(step_1)
|
|
def step_2(self):
|
|
self.counter *= 2
|
|
assert self.counter == 2
|
|
|
|
flow = StateFlow()
|
|
flow.kickoff()
|
|
assert flow.counter == 2
|
|
|
|
|
|
def test_router_with_multiple_conditions():
|
|
"""Test a router that triggers when any of multiple steps complete (OR condition),
|
|
and another router that triggers only after all specified steps complete (AND condition).
|
|
"""
|
|
|
|
execution_order = []
|
|
|
|
class ComplexRouterFlow(Flow):
|
|
@start()
|
|
def step_a(self):
|
|
execution_order.append("step_a")
|
|
|
|
@start()
|
|
def step_b(self):
|
|
execution_order.append("step_b")
|
|
|
|
@router(or_("step_a", "step_b"))
|
|
def router_or(self):
|
|
execution_order.append("router_or")
|
|
return "next_step_or"
|
|
|
|
@listen("next_step_or")
|
|
def handle_next_step_or_event(self):
|
|
execution_order.append("handle_next_step_or_event")
|
|
|
|
@listen(handle_next_step_or_event)
|
|
def branch_2_step(self):
|
|
execution_order.append("branch_2_step")
|
|
|
|
@router(and_(handle_next_step_or_event, branch_2_step))
|
|
def router_and(self):
|
|
execution_order.append("router_and")
|
|
return "final_step"
|
|
|
|
@listen("final_step")
|
|
def log_final_step(self):
|
|
execution_order.append("log_final_step")
|
|
|
|
flow = ComplexRouterFlow()
|
|
flow.kickoff()
|
|
|
|
assert "step_a" in execution_order
|
|
assert "step_b" in execution_order
|
|
assert "router_or" in execution_order
|
|
assert "handle_next_step_or_event" in execution_order
|
|
assert "branch_2_step" in execution_order
|
|
assert "router_and" in execution_order
|
|
assert "log_final_step" in execution_order
|
|
|
|
# Check that the AND router triggered after both relevant steps:
|
|
assert execution_order.index("router_and") > execution_order.index(
|
|
"handle_next_step_or_event"
|
|
)
|
|
assert execution_order.index("router_and") > execution_order.index("branch_2_step")
|
|
|
|
# final_step should run after router_and
|
|
assert execution_order.index("log_final_step") > execution_order.index("router_and")
|
|
|
|
|
|
def test_flow_with_rlock_in_state():
|
|
"""Test that Flow can handle unpickleable objects like RLock in state.
|
|
|
|
Regression test for issue #3828: Flow should not crash when state contains
|
|
objects that cannot be deep copied (like threading.RLock).
|
|
|
|
In version 1.3.0, Flow._copy_state() used copy.deepcopy() which would fail
|
|
with "TypeError: cannot pickle '_thread.RLock' object" when state contained
|
|
threading locks (e.g., from memory components or LLM instances).
|
|
|
|
The current implementation no longer deep copies state, so this test verifies
|
|
that flows with unpickleable objects in state work correctly.
|
|
"""
|
|
execution_order = []
|
|
|
|
class StateWithRLock(BaseModel):
|
|
class Config:
|
|
arbitrary_types_allowed = True
|
|
|
|
counter: int = 0
|
|
lock: threading.RLock = None
|
|
|
|
class FlowWithRLock(Flow[StateWithRLock]):
|
|
@start()
|
|
def step_1(self):
|
|
execution_order.append("step_1")
|
|
self.state.counter += 1
|
|
|
|
@listen(step_1)
|
|
def step_2(self):
|
|
execution_order.append("step_2")
|
|
self.state.counter += 1
|
|
|
|
flow = FlowWithRLock()
|
|
flow._state.lock = threading.RLock()
|
|
|
|
flow.kickoff()
|
|
|
|
assert execution_order == ["step_1", "step_2"]
|
|
assert flow.state.counter == 2
|
|
|
|
|
|
def test_flow_with_nested_unpickleable_objects():
|
|
"""Test that Flow can handle unpickleable objects nested in containers.
|
|
|
|
Regression test for issue #3828: Verifies that unpickleable objects
|
|
nested inside dicts/lists in state don't cause crashes.
|
|
|
|
This simulates real-world scenarios where memory components or other
|
|
resources with locks might be stored in nested data structures.
|
|
"""
|
|
execution_order = []
|
|
|
|
class NestedState(BaseModel):
|
|
class Config:
|
|
arbitrary_types_allowed = True
|
|
|
|
data: dict = {}
|
|
items: list = []
|
|
|
|
class FlowWithNestedUnpickleable(Flow[NestedState]):
|
|
@start()
|
|
def step_1(self):
|
|
execution_order.append("step_1")
|
|
self.state.data["lock"] = threading.RLock()
|
|
self.state.data["value"] = 42
|
|
|
|
@listen(step_1)
|
|
def step_2(self):
|
|
execution_order.append("step_2")
|
|
self.state.items.append(threading.Lock())
|
|
self.state.items.append("normal_value")
|
|
|
|
@listen(step_2)
|
|
def step_3(self):
|
|
execution_order.append("step_3")
|
|
assert self.state.data["value"] == 42
|
|
assert len(self.state.items) == 2
|
|
|
|
flow = FlowWithNestedUnpickleable()
|
|
|
|
flow.kickoff()
|
|
|
|
assert execution_order == ["step_1", "step_2", "step_3"]
|
|
assert flow.state.data["value"] == 42
|
|
assert len(flow.state.items) == 2
|