Files
crewAI/lib/crewai/tests/events/test_event_ordering.py
2026-01-21 01:54:52 -05:00

1650 lines
56 KiB
Python

"""Tests for event ordering and parent-child relationships."""
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionStartedEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffStartedEvent,
)
from crewai.events.types.flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallStartedEvent,
)
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskStartedEvent,
)
from crewai.flow.flow import Flow, listen, start
from crewai.task import Task
class EventCollector:
"""Collects events and provides helpers to find related events."""
def __init__(self) -> None:
self.events: list[BaseEvent] = []
def add(self, event: BaseEvent) -> None:
self.events.append(event)
def first(self, event_type: type[BaseEvent]) -> BaseEvent | None:
for e in self.events:
if isinstance(e, event_type):
return e
return None
def all_of(self, event_type: type[BaseEvent]) -> list[BaseEvent]:
return [e for e in self.events if isinstance(e, event_type)]
def with_parent(self, parent_id: str) -> list[BaseEvent]:
return [e for e in self.events if e.parent_event_id == parent_id]
@pytest.fixture
def collector() -> EventCollector:
"""Fixture that collects events during test execution."""
c = EventCollector()
@crewai_event_bus.on(CrewKickoffStartedEvent)
def h1(source, event):
c.add(event)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def h2(source, event):
c.add(event)
@crewai_event_bus.on(TaskStartedEvent)
def h3(source, event):
c.add(event)
@crewai_event_bus.on(TaskCompletedEvent)
def h4(source, event):
c.add(event)
@crewai_event_bus.on(AgentExecutionStartedEvent)
def h5(source, event):
c.add(event)
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def h6(source, event):
c.add(event)
@crewai_event_bus.on(LLMCallStartedEvent)
def h7(source, event):
c.add(event)
@crewai_event_bus.on(LLMCallCompletedEvent)
def h8(source, event):
c.add(event)
@crewai_event_bus.on(FlowStartedEvent)
def h9(source, event):
c.add(event)
@crewai_event_bus.on(FlowFinishedEvent)
def h10(source, event):
c.add(event)
@crewai_event_bus.on(MethodExecutionStartedEvent)
def h11(source, event):
c.add(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def h12(source, event):
c.add(event)
return c
class TestCrewEventOrdering:
"""Tests for event ordering in crew execution."""
@pytest.mark.vcr()
def test_crew_events_have_event_ids(self, collector: EventCollector) -> None:
"""Every crew event should have a unique event_id."""
agent = Agent(
role="Responder",
goal="Respond briefly",
backstory="You give short answers.",
verbose=False,
)
task = Task(
description="Say 'hello' and nothing else.",
expected_output="The word hello.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
crew.kickoff()
crewai_event_bus.flush()
started = collector.first(CrewKickoffStartedEvent)
completed = collector.first(CrewKickoffCompletedEvent)
assert started is not None
assert started.event_id is not None
assert len(started.event_id) > 0
assert completed is not None
assert completed.event_id is not None
assert completed.event_id != started.event_id
@pytest.mark.vcr()
def test_crew_completed_after_started(self, collector: EventCollector) -> None:
"""Crew completed event should have higher sequence than started."""
agent = Agent(
role="Responder",
goal="Respond briefly",
backstory="You give short answers.",
verbose=False,
)
task = Task(
description="Say 'yes' and nothing else.",
expected_output="The word yes.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
crew.kickoff()
crewai_event_bus.flush()
started = collector.first(CrewKickoffStartedEvent)
completed = collector.first(CrewKickoffCompletedEvent)
assert started is not None
assert completed is not None
assert started.emission_sequence is not None
assert completed.emission_sequence is not None
assert completed.emission_sequence > started.emission_sequence
@pytest.mark.vcr()
def test_task_parent_is_crew(self, collector: EventCollector) -> None:
"""Task events should have crew event as parent."""
agent = Agent(
role="Responder",
goal="Respond briefly",
backstory="You give short answers.",
verbose=False,
)
task = Task(
description="Say 'ok' and nothing else.",
expected_output="The word ok.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
crew.kickoff()
crewai_event_bus.flush()
crew_started = collector.first(CrewKickoffStartedEvent)
task_started = collector.first(TaskStartedEvent)
assert crew_started is not None
assert task_started is not None
assert task_started.parent_event_id == crew_started.event_id
class TestAgentEventOrdering:
"""Tests for event ordering in agent execution."""
@pytest.mark.vcr()
def test_agent_events_have_event_ids(self, collector: EventCollector) -> None:
"""Agent execution events should have event_ids."""
agent = Agent(
role="Helper",
goal="Help with tasks",
backstory="You help.",
verbose=False,
)
task = Task(
description="Say 'done' and nothing else.",
expected_output="The word done.",
agent=agent,
)
agent.execute_task(task)
crewai_event_bus.flush()
started = collector.first(AgentExecutionStartedEvent)
completed = collector.first(AgentExecutionCompletedEvent)
if started:
assert started.event_id is not None
if completed:
assert completed.event_id is not None
@pytest.mark.vcr()
def test_llm_events_have_parent(self, collector: EventCollector) -> None:
"""LLM call events should have a parent event."""
agent = Agent(
role="Helper",
goal="Help with tasks",
backstory="You help.",
verbose=False,
)
task = Task(
description="Say 'hi' and nothing else.",
expected_output="The word hi.",
agent=agent,
)
agent.execute_task(task)
crewai_event_bus.flush()
llm_started = collector.first(LLMCallStartedEvent)
if llm_started:
assert llm_started.event_id is not None
# LLM events should have some parent in the hierarchy
assert llm_started.parent_event_id is not None
class TestFlowWithCrewEventOrdering:
"""Tests for event ordering in flows containing crews."""
@pytest.mark.asyncio
@pytest.mark.vcr()
async def test_flow_events_have_ids(self, collector: EventCollector) -> None:
"""Flow events should have event_ids."""
agent = Agent(
role="Worker",
goal="Do work",
backstory="You work.",
verbose=False,
)
task = Task(
description="Say 'complete' and nothing else.",
expected_output="The word complete.",
agent=agent,
)
class SimpleFlow(Flow):
@start()
async def run_crew(self):
c = Crew(agents=[agent], tasks=[task], verbose=False)
return await c.akickoff()
flow = SimpleFlow()
await flow.akickoff()
crewai_event_bus.flush()
flow_started = collector.first(FlowStartedEvent)
flow_finished = collector.first(FlowFinishedEvent)
assert flow_started is not None
assert flow_started.event_id is not None
assert flow_finished is not None
assert flow_finished.event_id is not None
@pytest.mark.asyncio
@pytest.mark.vcr()
async def test_method_parent_is_flow(self, collector: EventCollector) -> None:
"""Method execution events should have flow as parent."""
agent = Agent(
role="Worker",
goal="Do work",
backstory="You work.",
verbose=False,
)
task = Task(
description="Say 'ready' and nothing else.",
expected_output="The word ready.",
agent=agent,
)
class FlowWithMethod(Flow):
@start()
async def my_method(self):
c = Crew(agents=[agent], tasks=[task], verbose=False)
return await c.akickoff()
flow = FlowWithMethod()
await flow.akickoff()
crewai_event_bus.flush()
flow_started = collector.first(FlowStartedEvent)
method_started = collector.first(MethodExecutionStartedEvent)
assert flow_started is not None
assert method_started is not None
assert method_started.parent_event_id == flow_started.event_id
@pytest.mark.asyncio
@pytest.mark.vcr()
async def test_crew_parent_is_method(self, collector: EventCollector) -> None:
"""Crew inside flow method should have method as parent."""
agent = Agent(
role="Worker",
goal="Do work",
backstory="You work.",
verbose=False,
)
task = Task(
description="Say 'go' and nothing else.",
expected_output="The word go.",
agent=agent,
)
class FlowWithCrew(Flow):
@start()
async def run_it(self):
c = Crew(agents=[agent], tasks=[task], verbose=False)
return await c.akickoff()
flow = FlowWithCrew()
await flow.akickoff()
crewai_event_bus.flush()
method_started = collector.first(MethodExecutionStartedEvent)
crew_started = collector.first(CrewKickoffStartedEvent)
assert method_started is not None
assert crew_started is not None
assert crew_started.parent_event_id == method_started.event_id
class TestFlowWithMultipleCrewsEventOrdering:
"""Tests for event ordering in flows with multiple crews."""
@pytest.mark.asyncio
@pytest.mark.vcr()
async def test_two_crews_have_different_ids(
self, collector: EventCollector
) -> None:
"""Two crews in a flow should have different event_ids."""
agent1 = Agent(
role="First",
goal="Be first",
backstory="You go first.",
verbose=False,
)
agent2 = Agent(
role="Second",
goal="Be second",
backstory="You go second.",
verbose=False,
)
task1 = Task(
description="Say '1' and nothing else.",
expected_output="The number 1.",
agent=agent1,
)
task2 = Task(
description="Say '2' and nothing else.",
expected_output="The number 2.",
agent=agent2,
)
class TwoCrewFlow(Flow):
@start()
async def first(self):
c = Crew(agents=[agent1], tasks=[task1], verbose=False)
return await c.akickoff()
@listen(first)
async def second(self, _):
c = Crew(agents=[agent2], tasks=[task2], verbose=False)
return await c.akickoff()
flow = TwoCrewFlow()
await flow.akickoff()
crewai_event_bus.flush()
crew_started_events = collector.all_of(CrewKickoffStartedEvent)
assert len(crew_started_events) >= 2
assert crew_started_events[0].event_id != crew_started_events[1].event_id
@pytest.mark.asyncio
@pytest.mark.vcr()
async def test_second_crew_after_first(self, collector: EventCollector) -> None:
"""Second crew should have higher sequence than first."""
agent1 = Agent(
role="First",
goal="Be first",
backstory="You go first.",
verbose=False,
)
agent2 = Agent(
role="Second",
goal="Be second",
backstory="You go second.",
verbose=False,
)
task1 = Task(
description="Say 'a' and nothing else.",
expected_output="The letter a.",
agent=agent1,
)
task2 = Task(
description="Say 'b' and nothing else.",
expected_output="The letter b.",
agent=agent2,
)
class SequentialCrewFlow(Flow):
@start()
async def crew_a(self):
c = Crew(agents=[agent1], tasks=[task1], verbose=False)
return await c.akickoff()
@listen(crew_a)
async def crew_b(self, _):
c = Crew(agents=[agent2], tasks=[task2], verbose=False)
return await c.akickoff()
flow = SequentialCrewFlow()
await flow.akickoff()
crewai_event_bus.flush()
crew_started_events = collector.all_of(CrewKickoffStartedEvent)
assert len(crew_started_events) >= 2
first = crew_started_events[0]
second = crew_started_events[1]
assert first.emission_sequence is not None
assert second.emission_sequence is not None
assert second.emission_sequence > first.emission_sequence
@pytest.mark.asyncio
@pytest.mark.vcr()
async def test_tasks_have_correct_crew_parents(
self, collector: EventCollector
) -> None:
"""Tasks in different crews should have their own crew as parent."""
agent1 = Agent(
role="Alpha",
goal="Do alpha work",
backstory="You are alpha.",
verbose=False,
)
agent2 = Agent(
role="Beta",
goal="Do beta work",
backstory="You are beta.",
verbose=False,
)
task1 = Task(
description="Say 'alpha' and nothing else.",
expected_output="The word alpha.",
agent=agent1,
)
task2 = Task(
description="Say 'beta' and nothing else.",
expected_output="The word beta.",
agent=agent2,
)
class ParentTestFlow(Flow):
@start()
async def alpha_crew(self):
c = Crew(agents=[agent1], tasks=[task1], verbose=False)
return await c.akickoff()
@listen(alpha_crew)
async def beta_crew(self, _):
c = Crew(agents=[agent2], tasks=[task2], verbose=False)
return await c.akickoff()
flow = ParentTestFlow()
await flow.akickoff()
crewai_event_bus.flush()
crew_started_events = collector.all_of(CrewKickoffStartedEvent)
task_started_events = collector.all_of(TaskStartedEvent)
assert len(crew_started_events) >= 2
assert len(task_started_events) >= 2
crew1_id = crew_started_events[0].event_id
crew2_id = crew_started_events[1].event_id
task1_parent = task_started_events[0].parent_event_id
task2_parent = task_started_events[1].parent_event_id
assert task1_parent == crew1_id
assert task2_parent == crew2_id
class TestPreviousEventIdChain:
"""Tests for previous_event_id linear chain tracking."""
@pytest.mark.asyncio
async def test_previous_event_id_chain(self) -> None:
"""Events should have previous_event_id pointing to the prior event."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class SimpleFlow(Flow):
@start()
async def step_one(self):
return "step_one_done"
@listen(step_one)
async def step_two(self, result):
return "step_two_done"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowStartedEvent)
def h1(source, event):
events.append(event)
@crewai_event_bus.on(FlowFinishedEvent)
def h2(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionStartedEvent)
def h3(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def h4(source, event):
events.append(event)
flow = SimpleFlow()
await flow.akickoff()
crewai_event_bus.flush()
assert len(events) >= 4
all_events = sorted(events, key=lambda e: e.emission_sequence or 0)
all_event_ids = {e.event_id for e in all_events}
for event in all_events[1:]:
assert event.previous_event_id is not None, (
f"Event {event.type} (seq {event.emission_sequence}) has no previous_event_id"
)
if event.previous_event_id in all_event_ids:
prev = next(e for e in all_events if e.event_id == event.previous_event_id)
assert (prev.emission_sequence or 0) < (event.emission_sequence or 0), (
f"Event {event.type} (seq {event.emission_sequence}) has previous pointing "
f"to {prev.type} (seq {prev.emission_sequence}) which is not earlier"
)
@pytest.mark.asyncio
async def test_first_event_has_previous_pointing_back(self) -> None:
"""Non-first events should have previous_event_id set."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
events: list[BaseEvent] = []
class MinimalFlow(Flow):
@start()
async def do_nothing(self):
return "done"
reset_emission_counter()
reset_last_event_id()
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowStartedEvent)
def capture1(source, event):
events.append(event)
@crewai_event_bus.on(FlowFinishedEvent)
def capture2(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture3(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture4(source, event):
events.append(event)
flow = MinimalFlow()
await flow.akickoff()
crewai_event_bus.flush()
assert len(events) >= 2
sorted_events = sorted(events, key=lambda e: e.emission_sequence or 0)
for event in sorted_events[1:]:
assert event.previous_event_id is not None, (
f"Event {event.type} (seq {event.emission_sequence}) should have previous_event_id set"
)
class TestTriggeredByEventId:
"""Tests for triggered_by_event_id causal chain tracking."""
@pytest.mark.asyncio
async def test_triggered_by_event_id_for_listeners(self) -> None:
"""Listener events should have triggered_by_event_id pointing to the triggering method_execution_finished event."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class ListenerFlow(Flow):
@start()
async def start_method(self):
return "started"
@listen(start_method)
async def listener_method(self, result):
return "listened"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
flow = ListenerFlow()
await flow.akickoff()
crewai_event_bus.flush()
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
assert len(started_events) >= 2
assert len(finished_events) >= 2
start_method_finished = next(
(e for e in finished_events if e.method_name == "start_method"), None
)
listener_started = next(
(e for e in started_events if e.method_name == "listener_method"), None
)
assert start_method_finished is not None
assert listener_started is not None
assert listener_started.triggered_by_event_id == start_method_finished.event_id
@pytest.mark.asyncio
async def test_start_method_has_no_triggered_by(self) -> None:
"""Start method events should have triggered_by_event_id=None."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class StartOnlyFlow(Flow):
@start()
async def my_start(self):
return "started"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
flow = StartOnlyFlow()
await flow.akickoff()
crewai_event_bus.flush()
start_event = next(
(e for e in events if e.method_name == "my_start"), None
)
assert start_event is not None
assert start_event.triggered_by_event_id is None
@pytest.mark.asyncio
async def test_chained_listeners_triggered_by(self) -> None:
"""Chained listeners should have triggered_by_event_id pointing to their triggering method."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class ChainedFlow(Flow):
@start()
async def first(self):
return "first"
@listen(first)
async def second(self, result):
return "second"
@listen(second)
async def third(self, result):
return "third"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
flow = ChainedFlow()
await flow.akickoff()
crewai_event_bus.flush()
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
first_finished = next(
(e for e in finished_events if e.method_name == "first"), None
)
second_started = next(
(e for e in started_events if e.method_name == "second"), None
)
second_finished = next(
(e for e in finished_events if e.method_name == "second"), None
)
third_started = next(
(e for e in started_events if e.method_name == "third"), None
)
assert first_finished is not None
assert second_started is not None
assert second_finished is not None
assert third_started is not None
assert second_started.triggered_by_event_id == first_finished.event_id
assert third_started.triggered_by_event_id == second_finished.event_id
@pytest.mark.asyncio
async def test_parallel_listeners_same_trigger(self) -> None:
"""Parallel listeners should all have triggered_by_event_id pointing to the same triggering event."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class ParallelFlow(Flow):
@start()
async def trigger(self):
return "trigger"
@listen(trigger)
async def listener_a(self, result):
return "a"
@listen(trigger)
async def listener_b(self, result):
return "b"
@listen(trigger)
async def listener_c(self, result):
return "c"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
flow = ParallelFlow()
await flow.akickoff()
crewai_event_bus.flush()
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
trigger_finished = next(
(e for e in finished_events if e.method_name == "trigger"), None
)
listener_a_started = next(
(e for e in started_events if e.method_name == "listener_a"), None
)
listener_b_started = next(
(e for e in started_events if e.method_name == "listener_b"), None
)
listener_c_started = next(
(e for e in started_events if e.method_name == "listener_c"), None
)
assert trigger_finished is not None
assert listener_a_started is not None
assert listener_b_started is not None
assert listener_c_started is not None
# All parallel listeners should point to the same triggering event
assert listener_a_started.triggered_by_event_id == trigger_finished.event_id
assert listener_b_started.triggered_by_event_id == trigger_finished.event_id
assert listener_c_started.triggered_by_event_id == trigger_finished.event_id
@pytest.mark.asyncio
async def test_or_condition_triggered_by(self) -> None:
"""Listener with OR condition should have triggered_by_event_id pointing to whichever method triggered it."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
from crewai.flow.flow import or_
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class OrConditionFlow(Flow):
@start()
async def path_a(self):
return "a"
@listen(or_(path_a, "path_b"))
async def after_either(self, result):
return "done"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
flow = OrConditionFlow()
await flow.akickoff()
crewai_event_bus.flush()
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
path_a_finished = next(
(e for e in finished_events if e.method_name == "path_a"), None
)
after_either_started = next(
(e for e in started_events if e.method_name == "after_either"), None
)
assert path_a_finished is not None
assert after_either_started is not None
# The OR listener should be triggered by path_a since that's what ran
assert after_either_started.triggered_by_event_id == path_a_finished.event_id
@pytest.mark.asyncio
async def test_router_triggered_by(self) -> None:
"""Events from router-triggered paths should have correct triggered_by_event_id."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
from crewai.flow.flow import router
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class RouterFlow(Flow):
@start()
async def begin(self):
return "begin"
@router(begin)
async def route_decision(self, result):
return "approved"
@listen("approved")
async def handle_approved(self):
return "handled"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
flow = RouterFlow()
await flow.akickoff()
crewai_event_bus.flush()
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
begin_finished = next(
(e for e in finished_events if e.method_name == "begin"), None
)
route_decision_started = next(
(e for e in started_events if e.method_name == "route_decision"), None
)
route_decision_finished = next(
(e for e in finished_events if e.method_name == "route_decision"), None
)
handle_approved_started = next(
(e for e in started_events if e.method_name == "handle_approved"), None
)
assert begin_finished is not None
assert route_decision_started is not None
assert route_decision_finished is not None
assert handle_approved_started is not None
# Router should be triggered by begin
assert route_decision_started.triggered_by_event_id == begin_finished.event_id
# Handler should be triggered by router's finished event
assert handle_approved_started.triggered_by_event_id == route_decision_finished.event_id
@pytest.mark.asyncio
async def test_multiple_kickoffs_maintain_chains(self) -> None:
"""Multiple akickoff() calls should maintain correct triggered_by chains for each execution."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
first_run_events: list[BaseEvent] = []
second_run_events: list[BaseEvent] = []
class ReusableFlow(Flow):
@start()
async def begin(self):
return "begin"
@listen(begin)
async def process(self, result):
return "processed"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
if len(second_run_events) == 0 and not capturing_second:
first_run_events.append(event)
else:
second_run_events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
if len(second_run_events) == 0 and not capturing_second:
first_run_events.append(event)
else:
second_run_events.append(event)
# First kickoff
capturing_second = False
flow1 = ReusableFlow()
await flow1.akickoff()
crewai_event_bus.flush()
# Second kickoff
capturing_second = True
flow2 = ReusableFlow()
await flow2.akickoff()
crewai_event_bus.flush()
# Should have events from both runs
assert len(first_run_events) >= 4 # 2 started + 2 finished
assert len(second_run_events) >= 4
# Check first run's triggered_by chain
first_started = [e for e in first_run_events if isinstance(e, MethodExecutionStartedEvent)]
first_finished = [e for e in first_run_events if isinstance(e, MethodExecutionFinishedEvent)]
first_begin_finished = next(
(e for e in first_finished if e.method_name == "begin"), None
)
first_process_started = next(
(e for e in first_started if e.method_name == "process"), None
)
assert first_begin_finished is not None
assert first_process_started is not None
assert first_process_started.triggered_by_event_id == first_begin_finished.event_id
# Check second run's triggered_by chain
second_started = [e for e in second_run_events if isinstance(e, MethodExecutionStartedEvent)]
second_finished = [e for e in second_run_events if isinstance(e, MethodExecutionFinishedEvent)]
second_begin_finished = next(
(e for e in second_finished if e.method_name == "begin"), None
)
second_process_started = next(
(e for e in second_started if e.method_name == "process"), None
)
assert second_begin_finished is not None
assert second_process_started is not None
assert second_process_started.triggered_by_event_id == second_begin_finished.event_id
# Verify the two runs have different event_ids (not reusing)
assert first_begin_finished.event_id != second_begin_finished.event_id
# Verify each run has its own independent previous_event_id chain
# (chains reset at each top-level execution)
first_sorted = sorted(first_run_events, key=lambda e: e.emission_sequence or 0)
for event in first_sorted[1:]:
assert event.previous_event_id is not None, (
f"First run event {event.type} (seq {event.emission_sequence}) should have previous_event_id"
)
second_sorted = sorted(second_run_events, key=lambda e: e.emission_sequence or 0)
for event in second_sorted[1:]:
assert event.previous_event_id is not None, (
f"Second run event {event.type} (seq {event.emission_sequence}) should have previous_event_id"
)
@pytest.mark.asyncio
async def test_parallel_flows_maintain_separate_triggered_by_chains(self) -> None:
"""Parallel flow executions should maintain correct triggered_by chains independently."""
import asyncio
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class ParallelTestFlow(Flow):
def __init__(self, name: str):
super().__init__()
self.flow_name = name
@start()
async def begin(self):
await asyncio.sleep(0.01) # Small delay to interleave
return self.flow_name
@listen(begin)
async def process(self, result):
await asyncio.sleep(0.01)
return f"{result}_processed"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
# Run two flows in parallel
flow_a = ParallelTestFlow("flow_a")
flow_b = ParallelTestFlow("flow_b")
await asyncio.gather(flow_a.akickoff(), flow_b.akickoff())
crewai_event_bus.flush()
# Should have events from both flows (4 events each = 8 total)
assert len(events) >= 8
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
# Find flow_a's events by checking the result contains "flow_a"
flow_a_begin_finished = [
e for e in finished_events
if e.method_name == "begin" and "flow_a" in str(e.result)
]
flow_a_process_started = [
e for e in started_events
if e.method_name == "process"
]
flow_b_begin_finished = [
e for e in finished_events
if e.method_name == "begin" and "flow_b" in str(e.result)
]
assert len(flow_a_begin_finished) >= 1
assert len(flow_b_begin_finished) >= 1
# Each flow's process should be triggered by its own begin
# Find which process events were triggered by which begin events
for process_event in flow_a_process_started:
trigger_id = process_event.triggered_by_event_id
assert trigger_id is not None
# The triggering event should be a begin finished event
triggering_event = next(
(e for e in finished_events if e.event_id == trigger_id), None
)
assert triggering_event is not None
assert triggering_event.method_name == "begin"
# Verify previous_event_id forms a valid chain across all events
all_sorted = sorted(events, key=lambda e: e.emission_sequence or 0)
for event in all_sorted[1:]:
assert event.previous_event_id is not None
@pytest.mark.asyncio
async def test_and_condition_triggered_by_last_method(self) -> None:
"""AND condition listener should have triggered_by_event_id pointing to the last completing method."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
from crewai.flow.flow import and_
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class AndConditionFlow(Flow):
@start()
async def method_a(self):
return "a"
@listen(method_a)
async def method_b(self, result):
return "b"
@listen(and_(method_a, method_b))
async def after_both(self, result):
return "both_done"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
flow = AndConditionFlow()
await flow.akickoff()
crewai_event_bus.flush()
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
method_b_finished = next(
(e for e in finished_events if e.method_name == "method_b"), None
)
after_both_started = next(
(e for e in started_events if e.method_name == "after_both"), None
)
assert method_b_finished is not None
assert after_both_started is not None
# The AND listener should be triggered by method_b (the last one to complete)
assert after_both_started.triggered_by_event_id == method_b_finished.event_id
@pytest.mark.asyncio
async def test_exception_handling_triggered_by(self) -> None:
"""Events emitted after exception should still have correct triggered_by."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
from crewai.events.types.flow_events import MethodExecutionFailedEvent
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class ExceptionFlow(Flow):
@start()
async def will_fail(self):
raise ValueError("intentional error")
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFailedEvent)
def capture_failed(source, event):
events.append(event)
@crewai_event_bus.on(FlowStartedEvent)
def capture_flow_started(source, event):
events.append(event)
flow = ExceptionFlow()
try:
await flow.akickoff()
except ValueError:
pass # Expected
crewai_event_bus.flush()
# Even with exception, events should have proper previous_event_id chain
all_sorted = sorted(events, key=lambda e: e.emission_sequence or 0)
for event in all_sorted[1:]:
assert event.previous_event_id is not None, (
f"Event {event.type} (seq {event.emission_sequence}) should have previous_event_id"
)
@pytest.mark.asyncio
async def test_sync_method_in_flow_triggered_by(self) -> None:
"""Synchronous methods should still have correct triggered_by."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class SyncFlow(Flow):
@start()
def sync_start(self): # Synchronous method
return "sync_done"
@listen(sync_start)
async def async_listener(self, result):
return "async_done"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
flow = SyncFlow()
await flow.akickoff()
crewai_event_bus.flush()
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
sync_start_finished = next(
(e for e in finished_events if e.method_name == "sync_start"), None
)
async_listener_started = next(
(e for e in started_events if e.method_name == "async_listener"), None
)
assert sync_start_finished is not None
assert async_listener_started is not None
assert async_listener_started.triggered_by_event_id == sync_start_finished.event_id
@pytest.mark.asyncio
async def test_multiple_start_methods_triggered_by(self) -> None:
"""Multiple start methods should each have triggered_by_event_id=None."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class MultiStartFlow(Flow):
@start()
async def start_one(self):
return "one"
@start()
async def start_two(self):
return "two"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
flow = MultiStartFlow()
await flow.akickoff()
crewai_event_bus.flush()
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
start_one = next(
(e for e in started_events if e.method_name == "start_one"), None
)
start_two = next(
(e for e in started_events if e.method_name == "start_two"), None
)
assert start_one is not None
assert start_two is not None
# Both start methods should have no triggered_by (they're entry points)
assert start_one.triggered_by_event_id is None
assert start_two.triggered_by_event_id is None
@pytest.mark.asyncio
async def test_none_return_triggered_by(self) -> None:
"""Methods returning None should still have correct triggered_by chain."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class NoneReturnFlow(Flow):
@start()
async def returns_none(self):
return None
@listen(returns_none)
async def after_none(self, result):
return "got_none"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
flow = NoneReturnFlow()
await flow.akickoff()
crewai_event_bus.flush()
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
returns_none_finished = next(
(e for e in finished_events if e.method_name == "returns_none"), None
)
after_none_started = next(
(e for e in started_events if e.method_name == "after_none"), None
)
assert returns_none_finished is not None
assert after_none_started is not None
assert after_none_started.triggered_by_event_id == returns_none_finished.event_id
@pytest.mark.asyncio
async def test_deeply_nested_chain_triggered_by(self) -> None:
"""Deeply nested listener chains (5+) should maintain correct triggered_by."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class DeepChainFlow(Flow):
@start()
async def level_0(self):
return "0"
@listen(level_0)
async def level_1(self, result):
return "1"
@listen(level_1)
async def level_2(self, result):
return "2"
@listen(level_2)
async def level_3(self, result):
return "3"
@listen(level_3)
async def level_4(self, result):
return "4"
@listen(level_4)
async def level_5(self, result):
return "5"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
flow = DeepChainFlow()
await flow.akickoff()
crewai_event_bus.flush()
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
# Verify each level triggers the next
for i in range(5):
prev_finished = next(
(e for e in finished_events if e.method_name == f"level_{i}"), None
)
next_started = next(
(e for e in started_events if e.method_name == f"level_{i+1}"), None
)
assert prev_finished is not None, f"level_{i} finished event not found"
assert next_started is not None, f"level_{i+1} started event not found"
assert next_started.triggered_by_event_id == prev_finished.event_id, (
f"level_{i+1} should be triggered by level_{i}"
)
@pytest.mark.asyncio
async def test_router_conditional_path_triggered_by(self) -> None:
"""Router with conditional paths should have correct triggered_by for the selected path."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
from crewai.flow.flow import router
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class ConditionalRouterFlow(Flow):
@start()
async def begin(self):
return "begin"
@router(begin)
async def conditional_router(self, result):
# Conditionally return one route
return "path_a"
@listen("path_a")
async def handle_path_a(self):
return "a_done"
@listen("path_b")
async def handle_path_b(self):
return "b_done"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
flow = ConditionalRouterFlow()
await flow.akickoff()
crewai_event_bus.flush()
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
router_finished = next(
(e for e in finished_events if e.method_name == "conditional_router"), None
)
handle_path_a_started = next(
(e for e in started_events if e.method_name == "handle_path_a"), None
)
handle_path_b_started = next(
(e for e in started_events if e.method_name == "handle_path_b"), None
)
assert router_finished is not None
assert handle_path_a_started is not None
# path_b should NOT be executed since router returned "path_a"
assert handle_path_b_started is None
# The selected path should be triggered by the router
assert handle_path_a_started.triggered_by_event_id == router_finished.event_id
class TestCrewEventsInFlowTriggeredBy:
"""Tests for triggered_by in crew events running inside flows."""
@pytest.mark.asyncio
async def test_flow_listener_triggered_by_in_nested_context(self) -> None:
"""Nested listener contexts should maintain correct triggered_by chains."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class NestedFlow(Flow):
@start()
async def trigger_method(self):
return "trigger"
@listen(trigger_method)
async def middle_method(self, result):
return "middle"
@listen(middle_method)
async def final_method(self, result):
return "final"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_method_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_method_finished(source, event):
events.append(event)
flow = NestedFlow()
await flow.akickoff()
crewai_event_bus.flush()
method_started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
method_finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
trigger_finished = next(
(e for e in method_finished_events if e.method_name == "trigger_method"), None
)
middle_started = next(
(e for e in method_started_events if e.method_name == "middle_method"), None
)
middle_finished = next(
(e for e in method_finished_events if e.method_name == "middle_method"), None
)
final_started = next(
(e for e in method_started_events if e.method_name == "final_method"), None
)
assert trigger_finished is not None
assert middle_started is not None
assert middle_finished is not None
assert final_started is not None
# middle should be triggered by trigger_method
assert middle_started.triggered_by_event_id == trigger_finished.event_id
# final should be triggered by middle_method
assert final_started.triggered_by_event_id == middle_finished.event_id
# All events should have proper previous_event_id chain
all_sorted = sorted(events, key=lambda e: e.emission_sequence or 0)
for event in all_sorted[1:]:
assert event.previous_event_id is not None
def test_sync_kickoff_triggered_by(self) -> None:
"""Synchronous kickoff() should maintain correct triggered_by chains."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import reset_last_event_id
reset_emission_counter()
reset_last_event_id()
events: list[BaseEvent] = []
class SyncKickoffFlow(Flow):
@start()
def start_method(self):
return "started"
@listen(start_method)
def listener_method(self, result):
return "listened"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def capture_started(source, event):
events.append(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def capture_finished(source, event):
events.append(event)
flow = SyncKickoffFlow()
flow.kickoff() # Synchronous kickoff
crewai_event_bus.flush()
started_events = [e for e in events if isinstance(e, MethodExecutionStartedEvent)]
finished_events = [e for e in events if isinstance(e, MethodExecutionFinishedEvent)]
start_finished = next(
(e for e in finished_events if e.method_name == "start_method"), None
)
listener_started = next(
(e for e in started_events if e.method_name == "listener_method"), None
)
assert start_finished is not None
assert listener_started is not None
# Listener should be triggered by start_method
assert listener_started.triggered_by_event_id == start_finished.event_id
# Verify previous_event_id chain
all_sorted = sorted(events, key=lambda e: e.emission_sequence or 0)
for event in all_sorted[1:]:
assert event.previous_event_id is not None