diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index c32265a37..4e06d85d8 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -913,17 +913,52 @@ class Flow(Generic[T], metaclass=FlowMeta): - Triggers execution of any listeners waiting on this start method - Part of the flow's initialization sequence - Skips execution if method was already completed (e.g., after reload) + - Automatically injects crewai_trigger_payload if available in flow inputs """ if start_method_name in self._completed_methods: last_output = self._method_outputs[-1] if self._method_outputs else None await self._execute_listeners(start_method_name, last_output) return + method = self._methods[start_method_name] + enhanced_method = self._inject_trigger_payload_for_start_method(method) + result = await self._execute_method( - start_method_name, self._methods[start_method_name] + start_method_name, enhanced_method ) await self._execute_listeners(start_method_name, result) + def _inject_trigger_payload_for_start_method(self, original_method: Callable) -> Callable: + def prepare_kwargs(*args, **kwargs): + inputs = baggage.get_baggage("flow_inputs") or {} + trigger_payload = inputs.get("crewai_trigger_payload") + + sig = inspect.signature(original_method) + accepts_trigger_payload = "crewai_trigger_payload" in sig.parameters + + if trigger_payload is not None and accepts_trigger_payload: + kwargs["crewai_trigger_payload"] = trigger_payload + elif trigger_payload is not None: + self._log_flow_event( + f"Trigger payload available but {original_method.__name__} doesn't accept crewai_trigger_payload parameter", + color="yellow" + ) + return args, kwargs + + if asyncio.iscoroutinefunction(original_method): + async def enhanced_method(*args, **kwargs): + args, kwargs = prepare_kwargs(*args, **kwargs) + return await original_method(*args, **kwargs) + else: + def enhanced_method(*args, **kwargs): + args, kwargs = prepare_kwargs(*args, **kwargs) + return original_method(*args, **kwargs) + + enhanced_method.__name__ = original_method.__name__ + enhanced_method.__doc__ = original_method.__doc__ + + return enhanced_method + async def _execute_method( self, method_name: str, method: Callable, *args: Any, **kwargs: Any ) -> Any: diff --git a/tests/flow_test.py b/tests/flow_test.py index 32b93cd05..b0da39d52 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -497,6 +497,131 @@ def test_unstructured_flow_event_emission(): assert isinstance(received_events[6].timestamp, datetime) +def test_flow_trigger_payload_injection(): + captured_payload = [] + + class TriggerFlow(Flow): + @start() + def start_method(self, crewai_trigger_payload=None): + captured_payload.append(crewai_trigger_payload) + return "started" + + @listen(start_method) + def second_method(self): + captured_payload.append("no_parameter") + return "finished" + + flow = TriggerFlow() + + test_payload = "This is important trigger data" + flow.kickoff(inputs={"crewai_trigger_payload": test_payload}) + + assert captured_payload == [test_payload, "no_parameter"] + + +def test_flow_trigger_payload_injection_multiple_starts(): + captured_payloads = [] + + class MultiStartFlow(Flow): + @start() + def start_method_1(self, crewai_trigger_payload=None): + captured_payloads.append(("start_1", crewai_trigger_payload)) + return "start_1_done" + + @start() + def start_method_2(self, crewai_trigger_payload=None): + captured_payloads.append(("start_2", crewai_trigger_payload)) + return "start_2_done" + + flow = MultiStartFlow() + + test_payload = "Multiple start trigger data" + flow.kickoff(inputs={"crewai_trigger_payload": test_payload}) + + assert captured_payloads == [("start_1", test_payload), ("start_2", test_payload)] + + +def test_flow_without_trigger_payload(): + captured_payload = [] + + class NormalFlow(Flow): + @start() + def start_method(self, crewai_trigger_payload=None): + captured_payload.append(crewai_trigger_payload) + return "no_trigger" + + flow = NormalFlow() + + flow.kickoff(inputs={"other_data": "some value"}) + + assert captured_payload[0] is None + + +def test_flow_trigger_payload_with_structured_state(): + class TriggerState(BaseModel): + id: str = "test" + message: str = "" + + class StructuredFlow(Flow[TriggerState]): + @start() + def start_method(self, crewai_trigger_payload=None): + return crewai_trigger_payload + + flow = StructuredFlow() + + test_payload = "Structured state trigger data" + result = flow.kickoff(inputs={"crewai_trigger_payload": test_payload}) + + assert result == test_payload + + +def test_flow_start_method_without_trigger_parameter(): + execution_order = [] + + class FlowWithoutParameter(Flow): + @start() + def start_without_param(self): + execution_order.append("start_executed") + return "started" + + @listen(start_without_param) + def second_method(self): + execution_order.append("second_executed") + return "finished" + + flow = FlowWithoutParameter() + + result = flow.kickoff(inputs={"crewai_trigger_payload": "some data"}) + + assert execution_order == ["start_executed", "second_executed"] + assert result == "finished" + + +def test_async_flow_with_trigger_payload(): + captured_payload = [] + + class AsyncTriggerFlow(Flow): + @start() + async def async_start_method(self, crewai_trigger_payload=None): + captured_payload.append(crewai_trigger_payload) + await asyncio.sleep(0.01) + return "async_started" + + @listen(async_start_method) + async def async_second_method(self, result): + captured_payload.append(result) + await asyncio.sleep(0.01) + return "async_finished" + + flow = AsyncTriggerFlow() + + test_payload = "Async trigger data" + result = asyncio.run(flow.kickoff_async(inputs={"crewai_trigger_payload": test_payload})) + + assert captured_payload == [test_payload, "async_started"] + assert result == "async_finished" + + def test_structured_flow_event_emission(): """Test that the correct events are emitted during structured flow execution with all fields validated."""