feat: integrate ordering and hierarchy into event bus

This commit is contained in:
Greyson LaLonde
2026-01-20 01:07:13 -05:00
parent 1707df8785
commit f7e1bdb64e

View File

@@ -16,8 +16,17 @@ from typing import Any, Final, ParamSpec, TypeVar
from typing_extensions import Self
from crewai.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent, get_next_emission_sequence
from crewai.events.depends import Depends
from crewai.events.event_context import (
SCOPE_ENDING_EVENTS,
SCOPE_STARTING_EVENTS,
VALID_EVENT_PAIRS,
get_current_parent_id,
get_enclosing_parent_id,
pop_event_scope,
push_event_scope,
)
from crewai.events.handler_graph import build_execution_plan
from crewai.events.types.event_bus_types import (
AsyncHandler,
@@ -326,6 +335,32 @@ class CrewAIEventsBus:
... await asyncio.wrap_future(future) # In async test
... # or future.result(timeout=5.0) in sync code
"""
event.emission_sequence = get_next_emission_sequence()
if event.parent_event_id is None:
event_type_name = event.type
if event_type_name in SCOPE_ENDING_EVENTS:
event.parent_event_id = get_enclosing_parent_id()
popped = pop_event_scope()
if popped is None:
self._console.print(
f"[CrewAIEventsBus] Warning: Ending event '{event_type_name}' "
"emitted with empty scope stack. Missing starting event?"
)
else:
_, popped_type = popped
expected_start = VALID_EVENT_PAIRS.get(event_type_name)
if expected_start and popped_type and popped_type != expected_start:
self._console.print(
f"[CrewAIEventsBus] Warning: Event pairing mismatch. "
f"'{event_type_name}' closed '{popped_type}' "
f"(expected '{expected_start}')"
)
elif event_type_name in SCOPE_STARTING_EVENTS:
event.parent_event_id = get_current_parent_id()
push_event_scope(event.event_id, event_type_name)
else:
event.parent_event_id = get_current_parent_id()
event_type = type(event)
with self._rwlock.r_locked():