mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-11 09:08:31 +00:00
Fix flow not terminating on CTRL+C (#2611)
Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
@@ -755,3 +755,29 @@ def test_multiple_routers_from_same_trigger():
|
||||
assert execution_order.index("anemia_analysis") > execution_order.index(
|
||||
"anemia_router"
|
||||
)
|
||||
|
||||
|
||||
def test_flow_keyboard_interrupt_handling():
|
||||
"""Test that a flow properly terminates when a keyboard interrupt is received."""
|
||||
execution_order = []
|
||||
|
||||
class KeyboardInterruptFlow(Flow):
|
||||
@start()
|
||||
def step_1(self):
|
||||
execution_order.append("step_1")
|
||||
|
||||
@listen(step_1)
|
||||
def step_2(self):
|
||||
execution_order.append("step_2")
|
||||
raise KeyboardInterrupt()
|
||||
|
||||
@listen(step_2)
|
||||
def step_3(self):
|
||||
execution_order.append("step_3")
|
||||
|
||||
flow = KeyboardInterruptFlow()
|
||||
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
flow.kickoff()
|
||||
|
||||
assert execution_order == ["step_1", "step_2"]
|
||||
|
||||
Reference in New Issue
Block a user