feat: async flow kickoff
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled

Introduces akickoff alias to flows, improves tool decorator typing, ensures _run backward compatibility, updates docs and docstrings, adds tests, and removes duplicated logic.
This commit is contained in:
Greyson LaLonde
2025-12-04 17:08:08 -05:00
committed by GitHub
parent 24d1fad7ab
commit 34e09162ba
2 changed files with 155 additions and 0 deletions

View File

@@ -1032,6 +1032,20 @@ class Flow(Generic[T], metaclass=FlowMeta):
finally:
detach(flow_token)
async def akickoff(
self, inputs: dict[str, Any] | None = None
) -> Any | FlowStreamingOutput:
"""Native async method to start the flow execution. Alias for kickoff_async.
Args:
inputs: Optional dictionary containing input values and/or a state ID for restoration.
Returns:
The final output from the flow, which is the result of the last executed method.
"""
return await self.kickoff_async(inputs)
async def _execute_start_method(self, start_method_name: FlowMethodName) -> None:
"""Executes a flow's start method and its triggered listeners.

View File

@@ -1492,3 +1492,144 @@ def test_flow_copy_state_with_dict_state():
flow.state["test"] = "modified"
assert copied_state["test"] == "value"
class TestFlowAkickoff:
"""Tests for the native async akickoff method."""
@pytest.mark.asyncio
async def test_akickoff_basic(self):
"""Test basic akickoff execution."""
execution_order = []
class SimpleFlow(Flow):
@start()
def step_1(self):
execution_order.append("step_1")
return "step_1_result"
@listen(step_1)
def step_2(self, result):
execution_order.append("step_2")
return "final_result"
flow = SimpleFlow()
result = await flow.akickoff()
assert execution_order == ["step_1", "step_2"]
assert result == "final_result"
@pytest.mark.asyncio
async def test_akickoff_with_inputs(self):
"""Test akickoff with inputs."""
class InputFlow(Flow):
@start()
def process_input(self):
return self.state.get("value", "default")
flow = InputFlow()
result = await flow.akickoff(inputs={"value": "custom_value"})
assert result == "custom_value"
@pytest.mark.asyncio
async def test_akickoff_with_async_methods(self):
"""Test akickoff with async flow methods."""
execution_order = []
class AsyncMethodFlow(Flow):
@start()
async def async_step_1(self):
execution_order.append("async_step_1")
await asyncio.sleep(0.01)
return "async_result"
@listen(async_step_1)
async def async_step_2(self, result):
execution_order.append("async_step_2")
await asyncio.sleep(0.01)
return f"final_{result}"
flow = AsyncMethodFlow()
result = await flow.akickoff()
assert execution_order == ["async_step_1", "async_step_2"]
assert result == "final_async_result"
@pytest.mark.asyncio
async def test_akickoff_equivalent_to_kickoff_async(self):
"""Test that akickoff produces the same results as kickoff_async."""
execution_order_akickoff = []
execution_order_kickoff_async = []
class TestFlow(Flow):
def __init__(self, execution_list):
super().__init__()
self._execution_list = execution_list
@start()
def step_1(self):
self._execution_list.append("step_1")
return "result_1"
@listen(step_1)
def step_2(self, result):
self._execution_list.append("step_2")
return "result_2"
flow1 = TestFlow(execution_order_akickoff)
result1 = await flow1.akickoff()
flow2 = TestFlow(execution_order_kickoff_async)
result2 = await flow2.kickoff_async()
assert execution_order_akickoff == execution_order_kickoff_async
assert result1 == result2
@pytest.mark.asyncio
async def test_akickoff_with_multiple_starts(self):
"""Test akickoff with multiple start methods."""
execution_order = []
class MultiStartFlow(Flow):
@start()
def start_a(self):
execution_order.append("start_a")
@start()
def start_b(self):
execution_order.append("start_b")
flow = MultiStartFlow()
await flow.akickoff()
assert "start_a" in execution_order
assert "start_b" in execution_order
@pytest.mark.asyncio
async def test_akickoff_with_router(self):
"""Test akickoff with router method."""
execution_order = []
class RouterFlow(Flow):
@start()
def begin(self):
execution_order.append("begin")
return "data"
@router(begin)
def route(self, data):
execution_order.append("route")
return "PATH_A"
@listen("PATH_A")
def handle_path_a(self):
execution_order.append("path_a")
return "path_a_result"
flow = RouterFlow()
result = await flow.akickoff()
assert execution_order == ["begin", "route", "path_a"]
assert result == "path_a_result"