Compare commits

...

1 Commits

Author SHA1 Message Date
Greyson LaLonde
1308bdee63 feat: add started_event_id and set in eventbus
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Notify Downstream / notify-downstream (push) Waiting to run
* feat: add started_event_id and set in eventbus

* chore: update additional test assumption

* fix: restore event bus handlers on context exit

fix rollback in crewai events bus so that exiting the context restores
the previous _sync_handlers, _async_handlers, _handler_dependencies, and _execution_plan_cache by assigning shallow copies of the saved dicts. previously these
were set to empty dicts on exit, which caused registered handlers and cached execution plans to be lost.
2026-02-05 21:28:23 -05:00
5 changed files with 57 additions and 15 deletions

View File

@@ -63,6 +63,7 @@ class BaseEvent(BaseModel):
parent_event_id: str | None = None
previous_event_id: str | None = None
triggered_by_event_id: str | None = None
started_event_id: str | None = None
emission_sequence: int | None = None
def to_json(self, exclude: set[str] | None = None) -> Serializable:

View File

@@ -407,7 +407,8 @@ class CrewAIEventsBus:
if popped is None:
handle_empty_pop(event_type_name)
else:
_, popped_type = popped
popped_event_id, popped_type = popped
event.started_event_id = popped_event_id
expected_start = VALID_EVENT_PAIRS.get(event_type_name)
if expected_start and popped_type and popped_type != expected_start:
handle_mismatch(event_type_name, popped_type, expected_start)
@@ -569,24 +570,52 @@ class CrewAIEventsBus:
... # Do stuff...
... # Handlers are cleared after the context
"""
with self._rwlock.w_locked():
prev_sync = self._sync_handlers
prev_async = self._async_handlers
prev_deps = self._handler_dependencies
prev_cache = self._execution_plan_cache
self._sync_handlers = {}
self._async_handlers = {}
self._handler_dependencies = {}
self._execution_plan_cache = {}
with self._rwlock.r_locked():
saved_sync: dict[type[BaseEvent], frozenset[SyncHandler]] = dict(
self._sync_handlers
)
saved_async: dict[type[BaseEvent], frozenset[AsyncHandler]] = dict(
self._async_handlers
)
saved_deps: dict[type[BaseEvent], dict[Handler, list[Depends[Any]]]] = {
event_type: dict(handlers)
for event_type, handlers in self._handler_dependencies.items()
}
for event_type, sync_handlers in saved_sync.items():
for sync_handler in sync_handlers:
self.off(event_type, sync_handler)
for event_type, async_handlers in saved_async.items():
for async_handler in async_handlers:
self.off(event_type, async_handler)
try:
yield
finally:
with self._rwlock.w_locked():
self._sync_handlers = prev_sync
self._async_handlers = prev_async
self._handler_dependencies = prev_deps
self._execution_plan_cache = prev_cache
with self._rwlock.r_locked():
current_sync = dict(self._sync_handlers)
current_async = dict(self._async_handlers)
for event_type, cur_sync in current_sync.items():
orig_sync = saved_sync.get(event_type, frozenset())
for new_handler in cur_sync - orig_sync:
self.off(event_type, new_handler)
for event_type, cur_async in current_async.items():
orig_async = saved_async.get(event_type, frozenset())
for new_async_handler in cur_async - orig_async:
self.off(event_type, new_async_handler)
for event_type, sync_handlers in saved_sync.items():
for sync_handler in sync_handlers:
deps = saved_deps.get(event_type, {}).get(sync_handler)
self._register_handler(event_type, sync_handler, deps)
for event_type, async_handlers in saved_async.items():
for async_handler in async_handlers:
deps = saved_deps.get(event_type, {}).get(async_handler)
self._register_handler(event_type, async_handler, deps)
def shutdown(self, wait: bool = True) -> None:
"""Gracefully shutdown the event loop and wait for all tasks to finish.

View File

@@ -308,6 +308,7 @@ def test_external_memory_search_events(
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"query": "test value",
"limit": 3,
@@ -330,6 +331,7 @@ def test_external_memory_search_events(
"parent_event_id": ANY,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"query": "test value",
"results": [],
@@ -390,6 +392,7 @@ def test_external_memory_save_events(
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"value": "saving value",
"metadata": {"task": "test_task"},
@@ -411,6 +414,7 @@ def test_external_memory_save_events(
"parent_event_id": ANY,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"value": "saving value",
"metadata": {"task": "test_task"},

View File

@@ -74,6 +74,7 @@ def test_long_term_memory_save_events(long_term_memory):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"value": "test_task",
"metadata": {"task": "test_task", "quality": 0.5},
@@ -94,6 +95,7 @@ def test_long_term_memory_save_events(long_term_memory):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"value": "test_task",
"metadata": {
@@ -153,6 +155,7 @@ def test_long_term_memory_search_events(long_term_memory):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"query": "test query",
"limit": 5,
@@ -175,6 +178,7 @@ def test_long_term_memory_search_events(long_term_memory):
"parent_event_id": ANY,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"query": "test query",
"results": None,

View File

@@ -85,6 +85,7 @@ def test_short_term_memory_search_events(short_term_memory):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"query": "test value",
"limit": 3,
@@ -107,6 +108,7 @@ def test_short_term_memory_search_events(short_term_memory):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"query": "test value",
"results": [],
@@ -164,6 +166,7 @@ def test_short_term_memory_save_events(short_term_memory):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"value": "test value",
"metadata": {"task": "test_task"},
@@ -185,6 +188,7 @@ def test_short_term_memory_save_events(short_term_memory):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"value": "test value",
"metadata": {"task": "test_task"},