mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 08:38:30 +00:00
feat: adding additional parameter to Flow' start methods (#3356)
* feat: adding additional parameter to Flow' start methods When the `crewai_trigger_payload` parameter exists in the input Flow, we will add it in the start Flow methods as parameter * fix: support crewai_trigger_payload in async Flow start methods
This commit is contained in:
@@ -913,17 +913,52 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
- Triggers execution of any listeners waiting on this start method
|
- Triggers execution of any listeners waiting on this start method
|
||||||
- Part of the flow's initialization sequence
|
- Part of the flow's initialization sequence
|
||||||
- Skips execution if method was already completed (e.g., after reload)
|
- Skips execution if method was already completed (e.g., after reload)
|
||||||
|
- Automatically injects crewai_trigger_payload if available in flow inputs
|
||||||
"""
|
"""
|
||||||
if start_method_name in self._completed_methods:
|
if start_method_name in self._completed_methods:
|
||||||
last_output = self._method_outputs[-1] if self._method_outputs else None
|
last_output = self._method_outputs[-1] if self._method_outputs else None
|
||||||
await self._execute_listeners(start_method_name, last_output)
|
await self._execute_listeners(start_method_name, last_output)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
method = self._methods[start_method_name]
|
||||||
|
enhanced_method = self._inject_trigger_payload_for_start_method(method)
|
||||||
|
|
||||||
result = await self._execute_method(
|
result = await self._execute_method(
|
||||||
start_method_name, self._methods[start_method_name]
|
start_method_name, enhanced_method
|
||||||
)
|
)
|
||||||
await self._execute_listeners(start_method_name, result)
|
await self._execute_listeners(start_method_name, result)
|
||||||
|
|
||||||
|
def _inject_trigger_payload_for_start_method(self, original_method: Callable) -> Callable:
|
||||||
|
def prepare_kwargs(*args, **kwargs):
|
||||||
|
inputs = baggage.get_baggage("flow_inputs") or {}
|
||||||
|
trigger_payload = inputs.get("crewai_trigger_payload")
|
||||||
|
|
||||||
|
sig = inspect.signature(original_method)
|
||||||
|
accepts_trigger_payload = "crewai_trigger_payload" in sig.parameters
|
||||||
|
|
||||||
|
if trigger_payload is not None and accepts_trigger_payload:
|
||||||
|
kwargs["crewai_trigger_payload"] = trigger_payload
|
||||||
|
elif trigger_payload is not None:
|
||||||
|
self._log_flow_event(
|
||||||
|
f"Trigger payload available but {original_method.__name__} doesn't accept crewai_trigger_payload parameter",
|
||||||
|
color="yellow"
|
||||||
|
)
|
||||||
|
return args, kwargs
|
||||||
|
|
||||||
|
if asyncio.iscoroutinefunction(original_method):
|
||||||
|
async def enhanced_method(*args, **kwargs):
|
||||||
|
args, kwargs = prepare_kwargs(*args, **kwargs)
|
||||||
|
return await original_method(*args, **kwargs)
|
||||||
|
else:
|
||||||
|
def enhanced_method(*args, **kwargs):
|
||||||
|
args, kwargs = prepare_kwargs(*args, **kwargs)
|
||||||
|
return original_method(*args, **kwargs)
|
||||||
|
|
||||||
|
enhanced_method.__name__ = original_method.__name__
|
||||||
|
enhanced_method.__doc__ = original_method.__doc__
|
||||||
|
|
||||||
|
return enhanced_method
|
||||||
|
|
||||||
async def _execute_method(
|
async def _execute_method(
|
||||||
self, method_name: str, method: Callable, *args: Any, **kwargs: Any
|
self, method_name: str, method: Callable, *args: Any, **kwargs: Any
|
||||||
) -> Any:
|
) -> Any:
|
||||||
|
|||||||
@@ -497,6 +497,131 @@ def test_unstructured_flow_event_emission():
|
|||||||
assert isinstance(received_events[6].timestamp, datetime)
|
assert isinstance(received_events[6].timestamp, datetime)
|
||||||
|
|
||||||
|
|
||||||
|
def test_flow_trigger_payload_injection():
|
||||||
|
captured_payload = []
|
||||||
|
|
||||||
|
class TriggerFlow(Flow):
|
||||||
|
@start()
|
||||||
|
def start_method(self, crewai_trigger_payload=None):
|
||||||
|
captured_payload.append(crewai_trigger_payload)
|
||||||
|
return "started"
|
||||||
|
|
||||||
|
@listen(start_method)
|
||||||
|
def second_method(self):
|
||||||
|
captured_payload.append("no_parameter")
|
||||||
|
return "finished"
|
||||||
|
|
||||||
|
flow = TriggerFlow()
|
||||||
|
|
||||||
|
test_payload = "This is important trigger data"
|
||||||
|
flow.kickoff(inputs={"crewai_trigger_payload": test_payload})
|
||||||
|
|
||||||
|
assert captured_payload == [test_payload, "no_parameter"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_flow_trigger_payload_injection_multiple_starts():
|
||||||
|
captured_payloads = []
|
||||||
|
|
||||||
|
class MultiStartFlow(Flow):
|
||||||
|
@start()
|
||||||
|
def start_method_1(self, crewai_trigger_payload=None):
|
||||||
|
captured_payloads.append(("start_1", crewai_trigger_payload))
|
||||||
|
return "start_1_done"
|
||||||
|
|
||||||
|
@start()
|
||||||
|
def start_method_2(self, crewai_trigger_payload=None):
|
||||||
|
captured_payloads.append(("start_2", crewai_trigger_payload))
|
||||||
|
return "start_2_done"
|
||||||
|
|
||||||
|
flow = MultiStartFlow()
|
||||||
|
|
||||||
|
test_payload = "Multiple start trigger data"
|
||||||
|
flow.kickoff(inputs={"crewai_trigger_payload": test_payload})
|
||||||
|
|
||||||
|
assert captured_payloads == [("start_1", test_payload), ("start_2", test_payload)]
|
||||||
|
|
||||||
|
|
||||||
|
def test_flow_without_trigger_payload():
|
||||||
|
captured_payload = []
|
||||||
|
|
||||||
|
class NormalFlow(Flow):
|
||||||
|
@start()
|
||||||
|
def start_method(self, crewai_trigger_payload=None):
|
||||||
|
captured_payload.append(crewai_trigger_payload)
|
||||||
|
return "no_trigger"
|
||||||
|
|
||||||
|
flow = NormalFlow()
|
||||||
|
|
||||||
|
flow.kickoff(inputs={"other_data": "some value"})
|
||||||
|
|
||||||
|
assert captured_payload[0] is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_flow_trigger_payload_with_structured_state():
|
||||||
|
class TriggerState(BaseModel):
|
||||||
|
id: str = "test"
|
||||||
|
message: str = ""
|
||||||
|
|
||||||
|
class StructuredFlow(Flow[TriggerState]):
|
||||||
|
@start()
|
||||||
|
def start_method(self, crewai_trigger_payload=None):
|
||||||
|
return crewai_trigger_payload
|
||||||
|
|
||||||
|
flow = StructuredFlow()
|
||||||
|
|
||||||
|
test_payload = "Structured state trigger data"
|
||||||
|
result = flow.kickoff(inputs={"crewai_trigger_payload": test_payload})
|
||||||
|
|
||||||
|
assert result == test_payload
|
||||||
|
|
||||||
|
|
||||||
|
def test_flow_start_method_without_trigger_parameter():
|
||||||
|
execution_order = []
|
||||||
|
|
||||||
|
class FlowWithoutParameter(Flow):
|
||||||
|
@start()
|
||||||
|
def start_without_param(self):
|
||||||
|
execution_order.append("start_executed")
|
||||||
|
return "started"
|
||||||
|
|
||||||
|
@listen(start_without_param)
|
||||||
|
def second_method(self):
|
||||||
|
execution_order.append("second_executed")
|
||||||
|
return "finished"
|
||||||
|
|
||||||
|
flow = FlowWithoutParameter()
|
||||||
|
|
||||||
|
result = flow.kickoff(inputs={"crewai_trigger_payload": "some data"})
|
||||||
|
|
||||||
|
assert execution_order == ["start_executed", "second_executed"]
|
||||||
|
assert result == "finished"
|
||||||
|
|
||||||
|
|
||||||
|
def test_async_flow_with_trigger_payload():
|
||||||
|
captured_payload = []
|
||||||
|
|
||||||
|
class AsyncTriggerFlow(Flow):
|
||||||
|
@start()
|
||||||
|
async def async_start_method(self, crewai_trigger_payload=None):
|
||||||
|
captured_payload.append(crewai_trigger_payload)
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
return "async_started"
|
||||||
|
|
||||||
|
@listen(async_start_method)
|
||||||
|
async def async_second_method(self, result):
|
||||||
|
captured_payload.append(result)
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
return "async_finished"
|
||||||
|
|
||||||
|
flow = AsyncTriggerFlow()
|
||||||
|
|
||||||
|
test_payload = "Async trigger data"
|
||||||
|
result = asyncio.run(flow.kickoff_async(inputs={"crewai_trigger_payload": test_payload}))
|
||||||
|
|
||||||
|
assert captured_payload == [test_payload, "async_started"]
|
||||||
|
assert result == "async_finished"
|
||||||
|
|
||||||
|
|
||||||
def test_structured_flow_event_emission():
|
def test_structured_flow_event_emission():
|
||||||
"""Test that the correct events are emitted during structured flow
|
"""Test that the correct events are emitted during structured flow
|
||||||
execution with all fields validated."""
|
execution with all fields validated."""
|
||||||
|
|||||||
Reference in New Issue
Block a user