diff --git a/lib/crewai/src/crewai/crews/utils.py b/lib/crewai/src/crewai/crews/utils.py index 70e8828f0..4e78a30e5 100644 --- a/lib/crewai/src/crewai/crews/utils.py +++ b/lib/crewai/src/crewai/crews/utils.py @@ -191,11 +191,12 @@ def prepare_kickoff(crew: Crew, inputs: dict[str, Any] | None) -> dict[str, Any] """ from crewai.events.base_events import reset_emission_counter from crewai.events.event_bus import crewai_event_bus - from crewai.events.event_context import get_current_parent_id + from crewai.events.event_context import get_current_parent_id, reset_last_event_id from crewai.events.types.crew_events import CrewKickoffStartedEvent if get_current_parent_id() is None: reset_emission_counter() + reset_last_event_id() for before_callback in crew.before_kickoff_callbacks: if inputs is None: diff --git a/lib/crewai/tests/events/test_event_ordering.py b/lib/crewai/tests/events/test_event_ordering.py index 82f585f91..b9970bf77 100644 --- a/lib/crewai/tests/events/test_event_ordering.py +++ b/lib/crewai/tests/events/test_event_ordering.py @@ -516,3 +516,1134 @@ class TestFlowWithMultipleCrewsEventOrdering: assert task1_parent == crew1_id assert task2_parent == crew2_id + + +class TestPreviousEventIdChain: + """Tests for previous_event_id linear chain tracking.""" + + @pytest.mark.asyncio + async def test_previous_event_id_chain(self) -> None: + """Events should have previous_event_id pointing to the prior event.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class SimpleFlow(Flow): + @start() + async def step_one(self): + return "step_one_done" + + @listen(step_one) + async def step_two(self, result): + return "step_two_done" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(FlowStartedEvent) + def h1(source, event): + events.append(event) + + @crewai_event_bus.on(FlowFinishedEvent) + def h2(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def h3(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def h4(source, event): + events.append(event) + + flow = SimpleFlow() + await flow.akickoff() + crewai_event_bus.flush() + + assert len(events) >= 4 + + all_events = sorted(events, key=lambda e: e.emission_sequence or 0) + all_event_ids = {e.event_id for e in all_events} + + for event in all_events[1:]: + assert event.previous_event_id is not None, ( + f"Event {event.type} (seq {event.emission_sequence}) has no previous_event_id" + ) + if event.previous_event_id in all_event_ids: + prev = next(e for e in all_events if e.event_id == event.previous_event_id) + assert (prev.emission_sequence or 0) < (event.emission_sequence or 0), ( + f"Event {event.type} (seq {event.emission_sequence}) has previous pointing " + f"to {prev.type} (seq {prev.emission_sequence}) which is not earlier" + ) + + @pytest.mark.asyncio + async def test_first_event_has_previous_pointing_back(self) -> None: + """Non-first events should have previous_event_id set.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + events: list[BaseEvent] = [] + + class MinimalFlow(Flow): + @start() + async def do_nothing(self): + return "done" + + reset_emission_counter() + reset_last_event_id() + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(FlowStartedEvent) + def capture1(source, event): + events.append(event) + + @crewai_event_bus.on(FlowFinishedEvent) + def capture2(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture3(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture4(source, event): + events.append(event) + + flow = MinimalFlow() + await flow.akickoff() + crewai_event_bus.flush() + + assert len(events) >= 2 + + sorted_events = sorted(events, key=lambda e: e.emission_sequence or 0) + for event in sorted_events[1:]: + assert event.previous_event_id is not None, ( + f"Event {event.type} (seq {event.emission_sequence}) should have previous_event_id set" + ) + + +class TestTriggeredByEventId: + """Tests for triggered_by_event_id causal chain tracking.""" + + @pytest.mark.asyncio + async def test_triggered_by_event_id_for_listeners(self) -> None: + """Listener events should have triggered_by_event_id pointing to the triggering method_execution_finished event.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class ListenerFlow(Flow): + @start() + async def start_method(self): + return "started" + + @listen(start_method) + async def listener_method(self, result): + return "listened" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + flow = ListenerFlow() + await flow.akickoff() + crewai_event_bus.flush() + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + assert len(started_events) >= 2 + assert len(finished_events) >= 2 + + start_method_finished = next( + (e for e in finished_events if e.method_name == "start_method"), None + ) + listener_started = next( + (e for e in started_events if e.method_name == "listener_method"), None + ) + + assert start_method_finished is not None + assert listener_started is not None + assert listener_started.triggered_by_event_id == start_method_finished.event_id + + @pytest.mark.asyncio + async def test_start_method_has_no_triggered_by(self) -> None: + """Start method events should have triggered_by_event_id=None.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class StartOnlyFlow(Flow): + @start() + async def my_start(self): + return "started" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + flow = StartOnlyFlow() + await flow.akickoff() + crewai_event_bus.flush() + + start_event = next( + (e for e in events if e.method_name == "my_start"), None + ) + assert start_event is not None + assert start_event.triggered_by_event_id is None + + @pytest.mark.asyncio + async def test_chained_listeners_triggered_by(self) -> None: + """Chained listeners should have triggered_by_event_id pointing to their triggering method.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class ChainedFlow(Flow): + @start() + async def first(self): + return "first" + + @listen(first) + async def second(self, result): + return "second" + + @listen(second) + async def third(self, result): + return "third" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + flow = ChainedFlow() + await flow.akickoff() + crewai_event_bus.flush() + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + first_finished = next( + (e for e in finished_events if e.method_name == "first"), None + ) + second_started = next( + (e for e in started_events if e.method_name == "second"), None + ) + second_finished = next( + (e for e in finished_events if e.method_name == "second"), None + ) + third_started = next( + (e for e in started_events if e.method_name == "third"), None + ) + + assert first_finished is not None + assert second_started is not None + assert second_finished is not None + assert third_started is not None + + assert second_started.triggered_by_event_id == first_finished.event_id + assert third_started.triggered_by_event_id == second_finished.event_id + + @pytest.mark.asyncio + async def test_parallel_listeners_same_trigger(self) -> None: + """Parallel listeners should all have triggered_by_event_id pointing to the same triggering event.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class ParallelFlow(Flow): + @start() + async def trigger(self): + return "trigger" + + @listen(trigger) + async def listener_a(self, result): + return "a" + + @listen(trigger) + async def listener_b(self, result): + return "b" + + @listen(trigger) + async def listener_c(self, result): + return "c" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + flow = ParallelFlow() + await flow.akickoff() + crewai_event_bus.flush() + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + trigger_finished = next( + (e for e in finished_events if e.method_name == "trigger"), None + ) + listener_a_started = next( + (e for e in started_events if e.method_name == "listener_a"), None + ) + listener_b_started = next( + (e for e in started_events if e.method_name == "listener_b"), None + ) + listener_c_started = next( + (e for e in started_events if e.method_name == "listener_c"), None + ) + + assert trigger_finished is not None + assert listener_a_started is not None + assert listener_b_started is not None + assert listener_c_started is not None + + # All parallel listeners should point to the same triggering event + assert listener_a_started.triggered_by_event_id == trigger_finished.event_id + assert listener_b_started.triggered_by_event_id == trigger_finished.event_id + assert listener_c_started.triggered_by_event_id == trigger_finished.event_id + + @pytest.mark.asyncio + async def test_or_condition_triggered_by(self) -> None: + """Listener with OR condition should have triggered_by_event_id pointing to whichever method triggered it.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + from crewai.flow.flow import or_ + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class OrConditionFlow(Flow): + @start() + async def path_a(self): + return "a" + + @listen(or_(path_a, "path_b")) + async def after_either(self, result): + return "done" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + flow = OrConditionFlow() + await flow.akickoff() + crewai_event_bus.flush() + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + path_a_finished = next( + (e for e in finished_events if e.method_name == "path_a"), None + ) + after_either_started = next( + (e for e in started_events if e.method_name == "after_either"), None + ) + + assert path_a_finished is not None + assert after_either_started is not None + + # The OR listener should be triggered by path_a since that's what ran + assert after_either_started.triggered_by_event_id == path_a_finished.event_id + + @pytest.mark.asyncio + async def test_router_triggered_by(self) -> None: + """Events from router-triggered paths should have correct triggered_by_event_id.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + from crewai.flow.flow import router + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class RouterFlow(Flow): + @start() + async def begin(self): + return "begin" + + @router(begin) + async def route_decision(self, result): + return "approved" + + @listen("approved") + async def handle_approved(self): + return "handled" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + flow = RouterFlow() + await flow.akickoff() + crewai_event_bus.flush() + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + begin_finished = next( + (e for e in finished_events if e.method_name == "begin"), None + ) + route_decision_started = next( + (e for e in started_events if e.method_name == "route_decision"), None + ) + route_decision_finished = next( + (e for e in finished_events if e.method_name == "route_decision"), None + ) + handle_approved_started = next( + (e for e in started_events if e.method_name == "handle_approved"), None + ) + + assert begin_finished is not None + assert route_decision_started is not None + assert route_decision_finished is not None + assert handle_approved_started is not None + + # Router should be triggered by begin + assert route_decision_started.triggered_by_event_id == begin_finished.event_id + # Handler should be triggered by router's finished event + assert handle_approved_started.triggered_by_event_id == route_decision_finished.event_id + + @pytest.mark.asyncio + async def test_multiple_kickoffs_maintain_chains(self) -> None: + """Multiple akickoff() calls should maintain correct triggered_by chains for each execution.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + first_run_events: list[BaseEvent] = [] + second_run_events: list[BaseEvent] = [] + + class ReusableFlow(Flow): + @start() + async def begin(self): + return "begin" + + @listen(begin) + async def process(self, result): + return "processed" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + if len(second_run_events) == 0 and not capturing_second: + first_run_events.append(event) + else: + second_run_events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + if len(second_run_events) == 0 and not capturing_second: + first_run_events.append(event) + else: + second_run_events.append(event) + + # First kickoff + capturing_second = False + flow1 = ReusableFlow() + await flow1.akickoff() + crewai_event_bus.flush() + + # Second kickoff + capturing_second = True + flow2 = ReusableFlow() + await flow2.akickoff() + crewai_event_bus.flush() + + # Should have events from both runs + assert len(first_run_events) >= 4 # 2 started + 2 finished + assert len(second_run_events) >= 4 + + # Check first run's triggered_by chain + first_started = [e for e in first_run_events if isinstance(e, MethodExecutionStartedEvent)] + first_finished = [e for e in first_run_events if isinstance(e, MethodExecutionFinishedEvent)] + + first_begin_finished = next( + (e for e in first_finished if e.method_name == "begin"), None + ) + first_process_started = next( + (e for e in first_started if e.method_name == "process"), None + ) + assert first_begin_finished is not None + assert first_process_started is not None + assert first_process_started.triggered_by_event_id == first_begin_finished.event_id + + # Check second run's triggered_by chain + second_started = [e for e in second_run_events if isinstance(e, MethodExecutionStartedEvent)] + second_finished = [e for e in second_run_events if isinstance(e, MethodExecutionFinishedEvent)] + + second_begin_finished = next( + (e for e in second_finished if e.method_name == "begin"), None + ) + second_process_started = next( + (e for e in second_started if e.method_name == "process"), None + ) + assert second_begin_finished is not None + assert second_process_started is not None + assert second_process_started.triggered_by_event_id == second_begin_finished.event_id + + # Verify the two runs have different event_ids (not reusing) + assert first_begin_finished.event_id != second_begin_finished.event_id + + # Verify each run has its own independent previous_event_id chain + # (chains reset at each top-level execution) + first_sorted = sorted(first_run_events, key=lambda e: e.emission_sequence or 0) + for event in first_sorted[1:]: + assert event.previous_event_id is not None, ( + f"First run event {event.type} (seq {event.emission_sequence}) should have previous_event_id" + ) + + second_sorted = sorted(second_run_events, key=lambda e: e.emission_sequence or 0) + for event in second_sorted[1:]: + assert event.previous_event_id is not None, ( + f"Second run event {event.type} (seq {event.emission_sequence}) should have previous_event_id" + ) + + @pytest.mark.asyncio + async def test_parallel_flows_maintain_separate_triggered_by_chains(self) -> None: + """Parallel flow executions should maintain correct triggered_by chains independently.""" + import asyncio + + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class ParallelTestFlow(Flow): + def __init__(self, name: str): + super().__init__() + self.flow_name = name + + @start() + async def begin(self): + await asyncio.sleep(0.01) # Small delay to interleave + return self.flow_name + + @listen(begin) + async def process(self, result): + await asyncio.sleep(0.01) + return f"{result}_processed" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + # Run two flows in parallel + flow_a = ParallelTestFlow("flow_a") + flow_b = ParallelTestFlow("flow_b") + await asyncio.gather(flow_a.akickoff(), flow_b.akickoff()) + crewai_event_bus.flush() + + # Should have events from both flows (4 events each = 8 total) + assert len(events) >= 8 + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + # Find flow_a's events by checking the result contains "flow_a" + flow_a_begin_finished = [ + e for e in finished_events + if e.method_name == "begin" and "flow_a" in str(e.result) + ] + flow_a_process_started = [ + e for e in started_events + if e.method_name == "process" + ] + + flow_b_begin_finished = [ + e for e in finished_events + if e.method_name == "begin" and "flow_b" in str(e.result) + ] + + assert len(flow_a_begin_finished) >= 1 + assert len(flow_b_begin_finished) >= 1 + + # Each flow's process should be triggered by its own begin + # Find which process events were triggered by which begin events + for process_event in flow_a_process_started: + trigger_id = process_event.triggered_by_event_id + assert trigger_id is not None + + # The triggering event should be a begin finished event + triggering_event = next( + (e for e in finished_events if e.event_id == trigger_id), None + ) + assert triggering_event is not None + assert triggering_event.method_name == "begin" + + # Verify previous_event_id forms a valid chain across all events + all_sorted = sorted(events, key=lambda e: e.emission_sequence or 0) + for event in all_sorted[1:]: + assert event.previous_event_id is not None + + @pytest.mark.asyncio + async def test_and_condition_triggered_by_last_method(self) -> None: + """AND condition listener should have triggered_by_event_id pointing to the last completing method.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + from crewai.flow.flow import and_ + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class AndConditionFlow(Flow): + @start() + async def method_a(self): + return "a" + + @listen(method_a) + async def method_b(self, result): + return "b" + + @listen(and_(method_a, method_b)) + async def after_both(self, result): + return "both_done" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + flow = AndConditionFlow() + await flow.akickoff() + crewai_event_bus.flush() + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + method_b_finished = next( + (e for e in finished_events if e.method_name == "method_b"), None + ) + after_both_started = next( + (e for e in started_events if e.method_name == "after_both"), None + ) + + assert method_b_finished is not None + assert after_both_started is not None + + # The AND listener should be triggered by method_b (the last one to complete) + assert after_both_started.triggered_by_event_id == method_b_finished.event_id + + @pytest.mark.asyncio + async def test_exception_handling_triggered_by(self) -> None: + """Events emitted after exception should still have correct triggered_by.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + from crewai.events.types.flow_events import MethodExecutionFailedEvent + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class ExceptionFlow(Flow): + @start() + async def will_fail(self): + raise ValueError("intentional error") + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFailedEvent) + def capture_failed(source, event): + events.append(event) + + @crewai_event_bus.on(FlowStartedEvent) + def capture_flow_started(source, event): + events.append(event) + + flow = ExceptionFlow() + try: + await flow.akickoff() + except ValueError: + pass # Expected + crewai_event_bus.flush() + + # Even with exception, events should have proper previous_event_id chain + all_sorted = sorted(events, key=lambda e: e.emission_sequence or 0) + for event in all_sorted[1:]: + assert event.previous_event_id is not None, ( + f"Event {event.type} (seq {event.emission_sequence}) should have previous_event_id" + ) + + @pytest.mark.asyncio + async def test_sync_method_in_flow_triggered_by(self) -> None: + """Synchronous methods should still have correct triggered_by.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class SyncFlow(Flow): + @start() + def sync_start(self): # Synchronous method + return "sync_done" + + @listen(sync_start) + async def async_listener(self, result): + return "async_done" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + flow = SyncFlow() + await flow.akickoff() + crewai_event_bus.flush() + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + sync_start_finished = next( + (e for e in finished_events if e.method_name == "sync_start"), None + ) + async_listener_started = next( + (e for e in started_events if e.method_name == "async_listener"), None + ) + + assert sync_start_finished is not None + assert async_listener_started is not None + assert async_listener_started.triggered_by_event_id == sync_start_finished.event_id + + @pytest.mark.asyncio + async def test_multiple_start_methods_triggered_by(self) -> None: + """Multiple start methods should each have triggered_by_event_id=None.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class MultiStartFlow(Flow): + @start() + async def start_one(self): + return "one" + + @start() + async def start_two(self): + return "two" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + flow = MultiStartFlow() + await flow.akickoff() + crewai_event_bus.flush() + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + + start_one = next( + (e for e in started_events if e.method_name == "start_one"), None + ) + start_two = next( + (e for e in started_events if e.method_name == "start_two"), None + ) + + assert start_one is not None + assert start_two is not None + + # Both start methods should have no triggered_by (they're entry points) + assert start_one.triggered_by_event_id is None + assert start_two.triggered_by_event_id is None + + @pytest.mark.asyncio + async def test_none_return_triggered_by(self) -> None: + """Methods returning None should still have correct triggered_by chain.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class NoneReturnFlow(Flow): + @start() + async def returns_none(self): + return None + + @listen(returns_none) + async def after_none(self, result): + return "got_none" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + flow = NoneReturnFlow() + await flow.akickoff() + crewai_event_bus.flush() + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + returns_none_finished = next( + (e for e in finished_events if e.method_name == "returns_none"), None + ) + after_none_started = next( + (e for e in started_events if e.method_name == "after_none"), None + ) + + assert returns_none_finished is not None + assert after_none_started is not None + assert after_none_started.triggered_by_event_id == returns_none_finished.event_id + + @pytest.mark.asyncio + async def test_deeply_nested_chain_triggered_by(self) -> None: + """Deeply nested listener chains (5+) should maintain correct triggered_by.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class DeepChainFlow(Flow): + @start() + async def level_0(self): + return "0" + + @listen(level_0) + async def level_1(self, result): + return "1" + + @listen(level_1) + async def level_2(self, result): + return "2" + + @listen(level_2) + async def level_3(self, result): + return "3" + + @listen(level_3) + async def level_4(self, result): + return "4" + + @listen(level_4) + async def level_5(self, result): + return "5" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + flow = DeepChainFlow() + await flow.akickoff() + crewai_event_bus.flush() + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + # Verify each level triggers the next + for i in range(5): + prev_finished = next( + (e for e in finished_events if e.method_name == f"level_{i}"), None + ) + next_started = next( + (e for e in started_events if e.method_name == f"level_{i+1}"), None + ) + + assert prev_finished is not None, f"level_{i} finished event not found" + assert next_started is not None, f"level_{i+1} started event not found" + assert next_started.triggered_by_event_id == prev_finished.event_id, ( + f"level_{i+1} should be triggered by level_{i}" + ) + + @pytest.mark.asyncio + async def test_router_conditional_path_triggered_by(self) -> None: + """Router with conditional paths should have correct triggered_by for the selected path.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + from crewai.flow.flow import router + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class ConditionalRouterFlow(Flow): + @start() + async def begin(self): + return "begin" + + @router(begin) + async def conditional_router(self, result): + # Conditionally return one route + return "path_a" + + @listen("path_a") + async def handle_path_a(self): + return "a_done" + + @listen("path_b") + async def handle_path_b(self): + return "b_done" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + flow = ConditionalRouterFlow() + await flow.akickoff() + crewai_event_bus.flush() + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + router_finished = next( + (e for e in finished_events if e.method_name == "conditional_router"), None + ) + handle_path_a_started = next( + (e for e in started_events if e.method_name == "handle_path_a"), None + ) + handle_path_b_started = next( + (e for e in started_events if e.method_name == "handle_path_b"), None + ) + + assert router_finished is not None + assert handle_path_a_started is not None + # path_b should NOT be executed since router returned "path_a" + assert handle_path_b_started is None + + # The selected path should be triggered by the router + assert handle_path_a_started.triggered_by_event_id == router_finished.event_id + + +class TestCrewEventsInFlowTriggeredBy: + """Tests for triggered_by in crew events running inside flows.""" + + @pytest.mark.asyncio + async def test_flow_listener_triggered_by_in_nested_context(self) -> None: + """Nested listener contexts should maintain correct triggered_by chains.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class NestedFlow(Flow): + @start() + async def trigger_method(self): + return "trigger" + + @listen(trigger_method) + async def middle_method(self, result): + return "middle" + + @listen(middle_method) + async def final_method(self, result): + return "final" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_method_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_method_finished(source, event): + events.append(event) + + flow = NestedFlow() + await flow.akickoff() + crewai_event_bus.flush() + + method_started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + method_finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + trigger_finished = next( + (e for e in method_finished_events if e.method_name == "trigger_method"), None + ) + middle_started = next( + (e for e in method_started_events if e.method_name == "middle_method"), None + ) + middle_finished = next( + (e for e in method_finished_events if e.method_name == "middle_method"), None + ) + final_started = next( + (e for e in method_started_events if e.method_name == "final_method"), None + ) + + assert trigger_finished is not None + assert middle_started is not None + assert middle_finished is not None + assert final_started is not None + + # middle should be triggered by trigger_method + assert middle_started.triggered_by_event_id == trigger_finished.event_id + # final should be triggered by middle_method + assert final_started.triggered_by_event_id == middle_finished.event_id + + # All events should have proper previous_event_id chain + all_sorted = sorted(events, key=lambda e: e.emission_sequence or 0) + for event in all_sorted[1:]: + assert event.previous_event_id is not None + + def test_sync_kickoff_triggered_by(self) -> None: + """Synchronous kickoff() should maintain correct triggered_by chains.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import reset_last_event_id + + reset_emission_counter() + reset_last_event_id() + + events: list[BaseEvent] = [] + + class SyncKickoffFlow(Flow): + @start() + def start_method(self): + return "started" + + @listen(start_method) + def listener_method(self, result): + return "listened" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def capture_started(source, event): + events.append(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def capture_finished(source, event): + events.append(event) + + flow = SyncKickoffFlow() + flow.kickoff() # Synchronous kickoff + crewai_event_bus.flush() + + started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)] + finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)] + + start_finished = next( + (e for e in finished_events if e.method_name == "start_method"), None + ) + listener_started = next( + (e for e in started_events if e.method_name == "listener_method"), None + ) + + assert start_finished is not None + assert listener_started is not None + + # Listener should be triggered by start_method + assert listener_started.triggered_by_event_id == start_finished.event_id + + # Verify previous_event_id chain + all_sorted = sorted(events, key=lambda e: e.emission_sequence or 0) + for event in all_sorted[1:]: + assert event.previous_event_id is not None