From da5a890eae4fb505e6a2039a7f1d21979157f4fa Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Sat, 4 Apr 2026 22:24:58 +0800 Subject: [PATCH] fix: share event metadata setup between emit and aemit Extract _prepare_event to set previous_event_id, triggered_by_event_id, emission_sequence, parent/child scoping, and event_record tracking. Both emit and aemit now call it, fixing aemit's missing metadata. --- lib/crewai/src/crewai/events/event_bus.py | 65 +++++++++++------------ 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/lib/crewai/src/crewai/events/event_bus.py b/lib/crewai/src/crewai/events/event_bus.py index 3e5bd8519..7faedb54d 100644 --- a/lib/crewai/src/crewai/events/event_bus.py +++ b/lib/crewai/src/crewai/events/event_bus.py @@ -435,32 +435,8 @@ class CrewAIEventsBus: if level_async: await self._acall_handlers(source, event, level_async) - def emit(self, source: Any, event: BaseEvent) -> Future[None] | None: - """Emit an event to all registered handlers. - - If handlers have dependencies (registered with depends_on), they execute - in dependency order. Otherwise, handlers execute as before (sync in thread - pool, async fire-and-forget). - - Stream chunk events always execute synchronously to preserve ordering. - - Args: - source: The emitting object - event: The event instance to emit - - Returns: - Future that completes when handlers finish. Returns: - - Future for sync-only handlers (ThreadPoolExecutor future) - - Future for async handlers or mixed handlers (asyncio future) - - Future for dependency-managed handlers (asyncio future) - - None if no handlers or sync stream chunk events - - Example: - >>> future = crewai_event_bus.emit(source, event) - >>> if future: - ... await asyncio.wrap_future(future) # In async test - ... # or future.result(timeout=5.0) in sync code - """ + def _prepare_event(self, source: Any, event: BaseEvent) -> None: + """Set event metadata, register the source entity, and record the event.""" if ( getattr(source, "entity_type", None) in ("flow", "crew", "agent") and id(source) not in self._registered_entity_ids @@ -494,6 +470,34 @@ class CrewAIEventsBus: if self._runtime_state is not None: self._runtime_state.event_record.add(event) + def emit(self, source: Any, event: BaseEvent) -> Future[None] | None: + """Emit an event to all registered handlers. + + If handlers have dependencies (registered with depends_on), they execute + in dependency order. Otherwise, handlers execute as before (sync in thread + pool, async fire-and-forget). + + Stream chunk events always execute synchronously to preserve ordering. + + Args: + source: The emitting object + event: The event instance to emit + + Returns: + Future that completes when handlers finish. Returns: + - Future for sync-only handlers (ThreadPoolExecutor future) + - Future for async handlers or mixed handlers (asyncio future) + - Future for dependency-managed handlers (asyncio future) + - None if no handlers or sync stream chunk events + + Example: + >>> future = crewai_event_bus.emit(source, event) + >>> if future: + ... await asyncio.wrap_future(future) # In async test + ... # or future.result(timeout=5.0) in sync code + """ + self._prepare_event(source, event) + event_type = type(event) with self._rwlock.r_locked(): @@ -592,14 +596,7 @@ class CrewAIEventsBus: source: The object emitting the event event: The event instance to emit """ - if ( - getattr(source, "entity_type", None) in ("flow", "crew", "agent") - and id(source) not in self._registered_entity_ids - ): - self.register_entity(source) - - if self._runtime_state is not None: - self._runtime_state.event_record.add(event) + self._prepare_event(source, event) event_type = type(event)