diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 9d11f2bc1..36146d009 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -33,7 +33,11 @@ from rich.panel import Panel from crewai.events.base_events import reset_emission_counter from crewai.events.event_bus import crewai_event_bus -from crewai.events.event_context import get_current_parent_id +from crewai.events.event_context import ( + get_current_parent_id, + reset_last_event_id, + triggered_by_scope, +) from crewai.events.listeners.tracing.trace_listener import ( TraceCollectionListener, ) @@ -755,6 +759,7 @@ class Flow(Generic[T], metaclass=FlowMeta): racing_listeners: frozenset[FlowMethodName], other_listeners: list[FlowMethodName], result: Any, + triggering_event_id: str | None = None, ) -> None: """Execute racing listeners with first-wins semantics. @@ -766,10 +771,11 @@ class Flow(Generic[T], metaclass=FlowMeta): racing_listeners: Set of listener names that race for an OR condition. other_listeners: Other listeners to execute in parallel (not racing). result: The result from the triggering method. + triggering_event_id: The event_id of the event that triggered these listeners. """ racing_tasks = [ asyncio.create_task( - self._execute_single_listener(name, result), + self._execute_single_listener(name, result, triggering_event_id), name=str(name), ) for name in racing_listeners @@ -777,7 +783,7 @@ class Flow(Generic[T], metaclass=FlowMeta): other_tasks = [ asyncio.create_task( - self._execute_single_listener(name, result), + self._execute_single_listener(name, result, triggering_event_id), name=str(name), ) for name in other_listeners @@ -1561,6 +1567,7 @@ class Flow(Generic[T], metaclass=FlowMeta): if get_current_parent_id() is None: reset_emission_counter() + reset_last_event_id() # Emit FlowStartedEvent and log the start of the flow. if not self.suppress_flow_events: @@ -1741,12 +1748,14 @@ class Flow(Generic[T], metaclass=FlowMeta): method = self._methods[start_method_name] enhanced_method = self._inject_trigger_payload_for_start_method(method) - result = await self._execute_method(start_method_name, enhanced_method) + result, finished_event_id = await self._execute_method( + start_method_name, enhanced_method + ) # If start method is a router, use its result as an additional trigger if start_method_name in self._routers and result is not None: # Execute listeners for the start method name first - await self._execute_listeners(start_method_name, result) + await self._execute_listeners(start_method_name, result, finished_event_id) # Then execute listeners for the router result (e.g., "approved") router_result_trigger = FlowMethodName(str(result)) listeners_for_result = self._find_triggered_methods( @@ -1770,16 +1779,21 @@ class Flow(Generic[T], metaclass=FlowMeta): if name not in racing_members ] await self._execute_racing_listeners( - racing_members, other_listeners, listener_result + racing_members, + other_listeners, + listener_result, + finished_event_id, ) else: tasks = [ - self._execute_single_listener(listener_name, listener_result) + self._execute_single_listener( + listener_name, listener_result, finished_event_id + ) for listener_name in listeners_for_result ] await asyncio.gather(*tasks) else: - await self._execute_listeners(start_method_name, result) + await self._execute_listeners(start_method_name, result, finished_event_id) def _inject_trigger_payload_for_start_method( self, original_method: Callable[..., Any] @@ -1823,7 +1837,14 @@ class Flow(Generic[T], metaclass=FlowMeta): method: Callable[..., Any], *args: Any, **kwargs: Any, - ) -> Any: + ) -> tuple[Any, str | None]: + """Execute a method and emit events. + + Returns: + A tuple of (result, finished_event_id) where finished_event_id is + the event_id of the MethodExecutionFinishedEvent, or None if events + are suppressed. + """ try: dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | ( kwargs or {} @@ -1864,21 +1885,21 @@ class Flow(Generic[T], metaclass=FlowMeta): self._completed_methods.add(method_name) + finished_event_id: str | None = None if not self.suppress_flow_events: - future = crewai_event_bus.emit( - self, - MethodExecutionFinishedEvent( - type="method_execution_finished", - method_name=method_name, - flow_name=self.name or self.__class__.__name__, - state=self._copy_and_serialize_state(), - result=result, - ), + finished_event = MethodExecutionFinishedEvent( + type="method_execution_finished", + method_name=method_name, + flow_name=self.name or self.__class__.__name__, + state=self._copy_and_serialize_state(), + result=result, ) + finished_event_id = finished_event.event_id + future = crewai_event_bus.emit(self, finished_event) if future: self._event_futures.append(future) - return result + return result, finished_event_id except Exception as e: # Check if this is a HumanFeedbackPending exception (paused, not failed) from crewai.flow.async_feedback.types import HumanFeedbackPending @@ -1932,7 +1953,10 @@ class Flow(Generic[T], metaclass=FlowMeta): return state_copy async def _execute_listeners( - self, trigger_method: FlowMethodName, result: Any + self, + trigger_method: FlowMethodName, + result: Any, + triggering_event_id: str | None = None, ) -> None: """Executes all listeners and routers triggered by a method completion. @@ -1943,6 +1967,8 @@ class Flow(Generic[T], metaclass=FlowMeta): Args: trigger_method: The name of the method that triggered these listeners. result: The result from the triggering method, passed to listeners that accept parameters. + triggering_event_id: The event_id of the MethodExecutionFinishedEvent that + triggered these listeners, used for causal chain tracking. Note: - Routers are executed sequentially to maintain flow control @@ -1957,6 +1983,7 @@ class Flow(Generic[T], metaclass=FlowMeta): ] = {} # Map outcome -> HumanFeedbackResult current_trigger = trigger_method current_result = result # Track the result to pass to each router + current_triggering_event_id = triggering_event_id while True: routers_triggered = self._find_triggered_methods( @@ -1970,7 +1997,9 @@ class Flow(Generic[T], metaclass=FlowMeta): router_input = router_result_to_feedback.get( str(current_trigger), current_result ) - await self._execute_single_listener(router_name, router_input) + current_triggering_event_id = await self._execute_single_listener( + router_name, router_input, current_triggering_event_id + ) # After executing router, the router's result is the path router_result = ( self._method_outputs[-1] if self._method_outputs else None @@ -2013,12 +2042,15 @@ class Flow(Generic[T], metaclass=FlowMeta): if name not in racing_members ] await self._execute_racing_listeners( - racing_members, other_listeners, listener_result + racing_members, + other_listeners, + listener_result, + triggering_event_id, ) else: tasks = [ self._execute_single_listener( - listener_name, listener_result + listener_name, listener_result, triggering_event_id ) for listener_name in listeners_triggered ] @@ -2197,8 +2229,11 @@ class Flow(Generic[T], metaclass=FlowMeta): return triggered async def _execute_single_listener( - self, listener_name: FlowMethodName, result: Any - ) -> None: + self, + listener_name: FlowMethodName, + result: Any, + triggering_event_id: str | None = None, + ) -> str | None: """Executes a single listener method with proper event handling. This internal method manages the execution of an individual listener, @@ -2207,6 +2242,12 @@ class Flow(Generic[T], metaclass=FlowMeta): Args: listener_name: The name of the listener method to execute. result: The result from the triggering method, which may be passed to the listener if it accepts parameters. + triggering_event_id: The event_id of the event that triggered this listener, + used for causal chain tracking. + + Returns: + The event_id of the MethodExecutionFinishedEvent emitted by this listener, + or None if events are suppressed. Note: - Inspects method signature to determine if it accepts the trigger result @@ -2232,7 +2273,7 @@ class Flow(Generic[T], metaclass=FlowMeta): ): # This conditional start was executed, continue its chain await self._execute_start_method(start_method_name) - return + return None # For cyclic flows, clear from completed to allow re-execution self._completed_methods.discard(listener_name) # Also clear from fired OR listeners for cyclic flows @@ -2245,15 +2286,30 @@ class Flow(Generic[T], metaclass=FlowMeta): params = list(sig.parameters.values()) method_params = [p for p in params if p.name != "self"] - if method_params: - listener_result = await self._execute_method( - listener_name, method, result - ) + if triggering_event_id: + with triggered_by_scope(triggering_event_id): + if method_params: + listener_result, finished_event_id = await self._execute_method( + listener_name, method, result + ) + else: + listener_result, finished_event_id = await self._execute_method( + listener_name, method + ) else: - listener_result = await self._execute_method(listener_name, method) + if method_params: + listener_result, finished_event_id = await self._execute_method( + listener_name, method, result + ) + else: + listener_result, finished_event_id = await self._execute_method( + listener_name, method + ) # Execute listeners (and possibly routers) of this listener - await self._execute_listeners(listener_name, listener_result) + await self._execute_listeners( + listener_name, listener_result, finished_event_id + ) # If this listener is also a router (e.g., has @human_feedback with emit), # we need to trigger listeners for the router result as well @@ -2280,15 +2336,22 @@ class Flow(Generic[T], metaclass=FlowMeta): if name not in racing_members ] await self._execute_racing_listeners( - racing_members, other_listeners, feedback_result + racing_members, + other_listeners, + feedback_result, + finished_event_id, ) else: tasks = [ - self._execute_single_listener(name, feedback_result) + self._execute_single_listener( + name, feedback_result, finished_event_id + ) for name in listeners_for_result ] await asyncio.gather(*tasks) + return finished_event_id + except Exception as e: # Don't log HumanFeedbackPending as an error - it's expected control flow from crewai.flow.async_feedback.types import HumanFeedbackPending