mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-01 07:13:00 +00:00
fix: share event metadata setup between emit and aemit
Extract _prepare_event to set previous_event_id, triggered_by_event_id, emission_sequence, parent/child scoping, and event_record tracking. Both emit and aemit now call it, fixing aemit's missing metadata.
This commit is contained in:
@@ -435,32 +435,8 @@ class CrewAIEventsBus:
|
|||||||
if level_async:
|
if level_async:
|
||||||
await self._acall_handlers(source, event, level_async)
|
await self._acall_handlers(source, event, level_async)
|
||||||
|
|
||||||
def emit(self, source: Any, event: BaseEvent) -> Future[None] | None:
|
def _prepare_event(self, source: Any, event: BaseEvent) -> None:
|
||||||
"""Emit an event to all registered handlers.
|
"""Set event metadata, register the source entity, and record the event."""
|
||||||
|
|
||||||
If handlers have dependencies (registered with depends_on), they execute
|
|
||||||
in dependency order. Otherwise, handlers execute as before (sync in thread
|
|
||||||
pool, async fire-and-forget).
|
|
||||||
|
|
||||||
Stream chunk events always execute synchronously to preserve ordering.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
source: The emitting object
|
|
||||||
event: The event instance to emit
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Future that completes when handlers finish. Returns:
|
|
||||||
- Future for sync-only handlers (ThreadPoolExecutor future)
|
|
||||||
- Future for async handlers or mixed handlers (asyncio future)
|
|
||||||
- Future for dependency-managed handlers (asyncio future)
|
|
||||||
- None if no handlers or sync stream chunk events
|
|
||||||
|
|
||||||
Example:
|
|
||||||
>>> future = crewai_event_bus.emit(source, event)
|
|
||||||
>>> if future:
|
|
||||||
... await asyncio.wrap_future(future) # In async test
|
|
||||||
... # or future.result(timeout=5.0) in sync code
|
|
||||||
"""
|
|
||||||
if (
|
if (
|
||||||
getattr(source, "entity_type", None) in ("flow", "crew", "agent")
|
getattr(source, "entity_type", None) in ("flow", "crew", "agent")
|
||||||
and id(source) not in self._registered_entity_ids
|
and id(source) not in self._registered_entity_ids
|
||||||
@@ -494,6 +470,34 @@ class CrewAIEventsBus:
|
|||||||
if self._runtime_state is not None:
|
if self._runtime_state is not None:
|
||||||
self._runtime_state.event_record.add(event)
|
self._runtime_state.event_record.add(event)
|
||||||
|
|
||||||
|
def emit(self, source: Any, event: BaseEvent) -> Future[None] | None:
|
||||||
|
"""Emit an event to all registered handlers.
|
||||||
|
|
||||||
|
If handlers have dependencies (registered with depends_on), they execute
|
||||||
|
in dependency order. Otherwise, handlers execute as before (sync in thread
|
||||||
|
pool, async fire-and-forget).
|
||||||
|
|
||||||
|
Stream chunk events always execute synchronously to preserve ordering.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
source: The emitting object
|
||||||
|
event: The event instance to emit
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Future that completes when handlers finish. Returns:
|
||||||
|
- Future for sync-only handlers (ThreadPoolExecutor future)
|
||||||
|
- Future for async handlers or mixed handlers (asyncio future)
|
||||||
|
- Future for dependency-managed handlers (asyncio future)
|
||||||
|
- None if no handlers or sync stream chunk events
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> future = crewai_event_bus.emit(source, event)
|
||||||
|
>>> if future:
|
||||||
|
... await asyncio.wrap_future(future) # In async test
|
||||||
|
... # or future.result(timeout=5.0) in sync code
|
||||||
|
"""
|
||||||
|
self._prepare_event(source, event)
|
||||||
|
|
||||||
event_type = type(event)
|
event_type = type(event)
|
||||||
|
|
||||||
with self._rwlock.r_locked():
|
with self._rwlock.r_locked():
|
||||||
@@ -592,14 +596,7 @@ class CrewAIEventsBus:
|
|||||||
source: The object emitting the event
|
source: The object emitting the event
|
||||||
event: The event instance to emit
|
event: The event instance to emit
|
||||||
"""
|
"""
|
||||||
if (
|
self._prepare_event(source, event)
|
||||||
getattr(source, "entity_type", None) in ("flow", "crew", "agent")
|
|
||||||
and id(source) not in self._registered_entity_ids
|
|
||||||
):
|
|
||||||
self.register_entity(source)
|
|
||||||
|
|
||||||
if self._runtime_state is not None:
|
|
||||||
self._runtime_state.event_record.add(event)
|
|
||||||
|
|
||||||
event_type = type(event)
|
event_type = type(event)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user