feat: set triggered_by context for listener execution

This commit is contained in:
Greyson LaLonde
2026-01-21 01:54:35 -05:00
parent 29397da144
commit e68ba52b80

View File

@@ -33,7 +33,11 @@ from rich.panel import Panel
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_context import get_current_parent_id
from crewai.events.event_context import (
get_current_parent_id,
reset_last_event_id,
triggered_by_scope,
)
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
@@ -755,6 +759,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
racing_listeners: frozenset[FlowMethodName],
other_listeners: list[FlowMethodName],
result: Any,
triggering_event_id: str | None = None,
) -> None:
"""Execute racing listeners with first-wins semantics.
@@ -766,10 +771,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
racing_listeners: Set of listener names that race for an OR condition.
other_listeners: Other listeners to execute in parallel (not racing).
result: The result from the triggering method.
triggering_event_id: The event_id of the event that triggered these listeners.
"""
racing_tasks = [
asyncio.create_task(
self._execute_single_listener(name, result),
self._execute_single_listener(name, result, triggering_event_id),
name=str(name),
)
for name in racing_listeners
@@ -777,7 +783,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
other_tasks = [
asyncio.create_task(
self._execute_single_listener(name, result),
self._execute_single_listener(name, result, triggering_event_id),
name=str(name),
)
for name in other_listeners
@@ -1561,6 +1567,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
if get_current_parent_id() is None:
reset_emission_counter()
reset_last_event_id()
# Emit FlowStartedEvent and log the start of the flow.
if not self.suppress_flow_events:
@@ -1741,12 +1748,14 @@ class Flow(Generic[T], metaclass=FlowMeta):
method = self._methods[start_method_name]
enhanced_method = self._inject_trigger_payload_for_start_method(method)
result = await self._execute_method(start_method_name, enhanced_method)
result, finished_event_id = await self._execute_method(
start_method_name, enhanced_method
)
# If start method is a router, use its result as an additional trigger
if start_method_name in self._routers and result is not None:
# Execute listeners for the start method name first
await self._execute_listeners(start_method_name, result)
await self._execute_listeners(start_method_name, result, finished_event_id)
# Then execute listeners for the router result (e.g., "approved")
router_result_trigger = FlowMethodName(str(result))
listeners_for_result = self._find_triggered_methods(
@@ -1770,16 +1779,21 @@ class Flow(Generic[T], metaclass=FlowMeta):
if name not in racing_members
]
await self._execute_racing_listeners(
racing_members, other_listeners, listener_result
racing_members,
other_listeners,
listener_result,
finished_event_id,
)
else:
tasks = [
self._execute_single_listener(listener_name, listener_result)
self._execute_single_listener(
listener_name, listener_result, finished_event_id
)
for listener_name in listeners_for_result
]
await asyncio.gather(*tasks)
else:
await self._execute_listeners(start_method_name, result)
await self._execute_listeners(start_method_name, result, finished_event_id)
def _inject_trigger_payload_for_start_method(
self, original_method: Callable[..., Any]
@@ -1823,7 +1837,14 @@ class Flow(Generic[T], metaclass=FlowMeta):
method: Callable[..., Any],
*args: Any,
**kwargs: Any,
) -> Any:
) -> tuple[Any, str | None]:
"""Execute a method and emit events.
Returns:
A tuple of (result, finished_event_id) where finished_event_id is
the event_id of the MethodExecutionFinishedEvent, or None if events
are suppressed.
"""
try:
dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (
kwargs or {}
@@ -1864,21 +1885,21 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._completed_methods.add(method_name)
finished_event_id: str | None = None
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
result=result,
),
finished_event = MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
result=result,
)
finished_event_id = finished_event.event_id
future = crewai_event_bus.emit(self, finished_event)
if future:
self._event_futures.append(future)
return result
return result, finished_event_id
except Exception as e:
# Check if this is a HumanFeedbackPending exception (paused, not failed)
from crewai.flow.async_feedback.types import HumanFeedbackPending
@@ -1932,7 +1953,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
return state_copy
async def _execute_listeners(
self, trigger_method: FlowMethodName, result: Any
self,
trigger_method: FlowMethodName,
result: Any,
triggering_event_id: str | None = None,
) -> None:
"""Executes all listeners and routers triggered by a method completion.
@@ -1943,6 +1967,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
Args:
trigger_method: The name of the method that triggered these listeners.
result: The result from the triggering method, passed to listeners that accept parameters.
triggering_event_id: The event_id of the MethodExecutionFinishedEvent that
triggered these listeners, used for causal chain tracking.
Note:
- Routers are executed sequentially to maintain flow control
@@ -1957,6 +1983,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
] = {} # Map outcome -> HumanFeedbackResult
current_trigger = trigger_method
current_result = result # Track the result to pass to each router
current_triggering_event_id = triggering_event_id
while True:
routers_triggered = self._find_triggered_methods(
@@ -1970,7 +1997,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
router_input = router_result_to_feedback.get(
str(current_trigger), current_result
)
await self._execute_single_listener(router_name, router_input)
current_triggering_event_id = await self._execute_single_listener(
router_name, router_input, current_triggering_event_id
)
# After executing router, the router's result is the path
router_result = (
self._method_outputs[-1] if self._method_outputs else None
@@ -2013,12 +2042,15 @@ class Flow(Generic[T], metaclass=FlowMeta):
if name not in racing_members
]
await self._execute_racing_listeners(
racing_members, other_listeners, listener_result
racing_members,
other_listeners,
listener_result,
triggering_event_id,
)
else:
tasks = [
self._execute_single_listener(
listener_name, listener_result
listener_name, listener_result, triggering_event_id
)
for listener_name in listeners_triggered
]
@@ -2197,8 +2229,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
return triggered
async def _execute_single_listener(
self, listener_name: FlowMethodName, result: Any
) -> None:
self,
listener_name: FlowMethodName,
result: Any,
triggering_event_id: str | None = None,
) -> str | None:
"""Executes a single listener method with proper event handling.
This internal method manages the execution of an individual listener,
@@ -2207,6 +2242,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
Args:
listener_name: The name of the listener method to execute.
result: The result from the triggering method, which may be passed to the listener if it accepts parameters.
triggering_event_id: The event_id of the event that triggered this listener,
used for causal chain tracking.
Returns:
The event_id of the MethodExecutionFinishedEvent emitted by this listener,
or None if events are suppressed.
Note:
- Inspects method signature to determine if it accepts the trigger result
@@ -2232,7 +2273,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
):
# This conditional start was executed, continue its chain
await self._execute_start_method(start_method_name)
return
return None
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(listener_name)
# Also clear from fired OR listeners for cyclic flows
@@ -2245,15 +2286,30 @@ class Flow(Generic[T], metaclass=FlowMeta):
params = list(sig.parameters.values())
method_params = [p for p in params if p.name != "self"]
if method_params:
listener_result = await self._execute_method(
listener_name, method, result
)
if triggering_event_id:
with triggered_by_scope(triggering_event_id):
if method_params:
listener_result, finished_event_id = await self._execute_method(
listener_name, method, result
)
else:
listener_result, finished_event_id = await self._execute_method(
listener_name, method
)
else:
listener_result = await self._execute_method(listener_name, method)
if method_params:
listener_result, finished_event_id = await self._execute_method(
listener_name, method, result
)
else:
listener_result, finished_event_id = await self._execute_method(
listener_name, method
)
# Execute listeners (and possibly routers) of this listener
await self._execute_listeners(listener_name, listener_result)
await self._execute_listeners(
listener_name, listener_result, finished_event_id
)
# If this listener is also a router (e.g., has @human_feedback with emit),
# we need to trigger listeners for the router result as well
@@ -2280,15 +2336,22 @@ class Flow(Generic[T], metaclass=FlowMeta):
if name not in racing_members
]
await self._execute_racing_listeners(
racing_members, other_listeners, feedback_result
racing_members,
other_listeners,
feedback_result,
finished_event_id,
)
else:
tasks = [
self._execute_single_listener(name, feedback_result)
self._execute_single_listener(
name, feedback_result, finished_event_id
)
for name in listeners_for_result
]
await asyncio.gather(*tasks)
return finished_event_id
except Exception as e:
# Don't log HumanFeedbackPending as an error - it's expected control flow
from crewai.flow.async_feedback.types import HumanFeedbackPending