mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-30 14:52:36 +00:00
feat: add started_event_id and set in eventbus
Some checks failed
Some checks failed
* feat: add started_event_id and set in eventbus * chore: update additional test assumption * fix: restore event bus handlers on context exit fix rollback in crewai events bus so that exiting the context restores the previous _sync_handlers, _async_handlers, _handler_dependencies, and _execution_plan_cache by assigning shallow copies of the saved dicts. previously these were set to empty dicts on exit, which caused registered handlers and cached execution plans to be lost.
This commit is contained in:
@@ -63,6 +63,7 @@ class BaseEvent(BaseModel):
|
||||
parent_event_id: str | None = None
|
||||
previous_event_id: str | None = None
|
||||
triggered_by_event_id: str | None = None
|
||||
started_event_id: str | None = None
|
||||
emission_sequence: int | None = None
|
||||
|
||||
def to_json(self, exclude: set[str] | None = None) -> Serializable:
|
||||
|
||||
@@ -407,7 +407,8 @@ class CrewAIEventsBus:
|
||||
if popped is None:
|
||||
handle_empty_pop(event_type_name)
|
||||
else:
|
||||
_, popped_type = popped
|
||||
popped_event_id, popped_type = popped
|
||||
event.started_event_id = popped_event_id
|
||||
expected_start = VALID_EVENT_PAIRS.get(event_type_name)
|
||||
if expected_start and popped_type and popped_type != expected_start:
|
||||
handle_mismatch(event_type_name, popped_type, expected_start)
|
||||
@@ -569,24 +570,52 @@ class CrewAIEventsBus:
|
||||
... # Do stuff...
|
||||
... # Handlers are cleared after the context
|
||||
"""
|
||||
with self._rwlock.w_locked():
|
||||
prev_sync = self._sync_handlers
|
||||
prev_async = self._async_handlers
|
||||
prev_deps = self._handler_dependencies
|
||||
prev_cache = self._execution_plan_cache
|
||||
self._sync_handlers = {}
|
||||
self._async_handlers = {}
|
||||
self._handler_dependencies = {}
|
||||
self._execution_plan_cache = {}
|
||||
with self._rwlock.r_locked():
|
||||
saved_sync: dict[type[BaseEvent], frozenset[SyncHandler]] = dict(
|
||||
self._sync_handlers
|
||||
)
|
||||
saved_async: dict[type[BaseEvent], frozenset[AsyncHandler]] = dict(
|
||||
self._async_handlers
|
||||
)
|
||||
saved_deps: dict[type[BaseEvent], dict[Handler, list[Depends[Any]]]] = {
|
||||
event_type: dict(handlers)
|
||||
for event_type, handlers in self._handler_dependencies.items()
|
||||
}
|
||||
|
||||
for event_type, sync_handlers in saved_sync.items():
|
||||
for sync_handler in sync_handlers:
|
||||
self.off(event_type, sync_handler)
|
||||
|
||||
for event_type, async_handlers in saved_async.items():
|
||||
for async_handler in async_handlers:
|
||||
self.off(event_type, async_handler)
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
with self._rwlock.w_locked():
|
||||
self._sync_handlers = prev_sync
|
||||
self._async_handlers = prev_async
|
||||
self._handler_dependencies = prev_deps
|
||||
self._execution_plan_cache = prev_cache
|
||||
with self._rwlock.r_locked():
|
||||
current_sync = dict(self._sync_handlers)
|
||||
current_async = dict(self._async_handlers)
|
||||
|
||||
for event_type, cur_sync in current_sync.items():
|
||||
orig_sync = saved_sync.get(event_type, frozenset())
|
||||
for new_handler in cur_sync - orig_sync:
|
||||
self.off(event_type, new_handler)
|
||||
|
||||
for event_type, cur_async in current_async.items():
|
||||
orig_async = saved_async.get(event_type, frozenset())
|
||||
for new_async_handler in cur_async - orig_async:
|
||||
self.off(event_type, new_async_handler)
|
||||
|
||||
for event_type, sync_handlers in saved_sync.items():
|
||||
for sync_handler in sync_handlers:
|
||||
deps = saved_deps.get(event_type, {}).get(sync_handler)
|
||||
self._register_handler(event_type, sync_handler, deps)
|
||||
|
||||
for event_type, async_handlers in saved_async.items():
|
||||
for async_handler in async_handlers:
|
||||
deps = saved_deps.get(event_type, {}).get(async_handler)
|
||||
self._register_handler(event_type, async_handler, deps)
|
||||
|
||||
def shutdown(self, wait: bool = True) -> None:
|
||||
"""Gracefully shutdown the event loop and wait for all tasks to finish.
|
||||
|
||||
@@ -308,6 +308,7 @@ def test_external_memory_search_events(
|
||||
"parent_event_id": None,
|
||||
"previous_event_id": ANY,
|
||||
"triggered_by_event_id": None,
|
||||
"started_event_id": ANY,
|
||||
"emission_sequence": ANY,
|
||||
"query": "test value",
|
||||
"limit": 3,
|
||||
@@ -330,6 +331,7 @@ def test_external_memory_search_events(
|
||||
"parent_event_id": ANY,
|
||||
"previous_event_id": ANY,
|
||||
"triggered_by_event_id": None,
|
||||
"started_event_id": ANY,
|
||||
"emission_sequence": ANY,
|
||||
"query": "test value",
|
||||
"results": [],
|
||||
@@ -390,6 +392,7 @@ def test_external_memory_save_events(
|
||||
"parent_event_id": None,
|
||||
"previous_event_id": ANY,
|
||||
"triggered_by_event_id": None,
|
||||
"started_event_id": ANY,
|
||||
"emission_sequence": ANY,
|
||||
"value": "saving value",
|
||||
"metadata": {"task": "test_task"},
|
||||
@@ -411,6 +414,7 @@ def test_external_memory_save_events(
|
||||
"parent_event_id": ANY,
|
||||
"previous_event_id": ANY,
|
||||
"triggered_by_event_id": None,
|
||||
"started_event_id": ANY,
|
||||
"emission_sequence": ANY,
|
||||
"value": "saving value",
|
||||
"metadata": {"task": "test_task"},
|
||||
|
||||
@@ -74,6 +74,7 @@ def test_long_term_memory_save_events(long_term_memory):
|
||||
"parent_event_id": None,
|
||||
"previous_event_id": ANY,
|
||||
"triggered_by_event_id": None,
|
||||
"started_event_id": ANY,
|
||||
"emission_sequence": ANY,
|
||||
"value": "test_task",
|
||||
"metadata": {"task": "test_task", "quality": 0.5},
|
||||
@@ -94,6 +95,7 @@ def test_long_term_memory_save_events(long_term_memory):
|
||||
"parent_event_id": None,
|
||||
"previous_event_id": ANY,
|
||||
"triggered_by_event_id": None,
|
||||
"started_event_id": ANY,
|
||||
"emission_sequence": ANY,
|
||||
"value": "test_task",
|
||||
"metadata": {
|
||||
@@ -153,6 +155,7 @@ def test_long_term_memory_search_events(long_term_memory):
|
||||
"parent_event_id": None,
|
||||
"previous_event_id": ANY,
|
||||
"triggered_by_event_id": None,
|
||||
"started_event_id": ANY,
|
||||
"emission_sequence": ANY,
|
||||
"query": "test query",
|
||||
"limit": 5,
|
||||
@@ -175,6 +178,7 @@ def test_long_term_memory_search_events(long_term_memory):
|
||||
"parent_event_id": ANY,
|
||||
"previous_event_id": ANY,
|
||||
"triggered_by_event_id": None,
|
||||
"started_event_id": ANY,
|
||||
"emission_sequence": ANY,
|
||||
"query": "test query",
|
||||
"results": None,
|
||||
|
||||
@@ -85,6 +85,7 @@ def test_short_term_memory_search_events(short_term_memory):
|
||||
"parent_event_id": None,
|
||||
"previous_event_id": ANY,
|
||||
"triggered_by_event_id": None,
|
||||
"started_event_id": ANY,
|
||||
"emission_sequence": ANY,
|
||||
"query": "test value",
|
||||
"limit": 3,
|
||||
@@ -107,6 +108,7 @@ def test_short_term_memory_search_events(short_term_memory):
|
||||
"parent_event_id": None,
|
||||
"previous_event_id": ANY,
|
||||
"triggered_by_event_id": None,
|
||||
"started_event_id": ANY,
|
||||
"emission_sequence": ANY,
|
||||
"query": "test value",
|
||||
"results": [],
|
||||
@@ -164,6 +166,7 @@ def test_short_term_memory_save_events(short_term_memory):
|
||||
"parent_event_id": None,
|
||||
"previous_event_id": ANY,
|
||||
"triggered_by_event_id": None,
|
||||
"started_event_id": ANY,
|
||||
"emission_sequence": ANY,
|
||||
"value": "test value",
|
||||
"metadata": {"task": "test_task"},
|
||||
@@ -185,6 +188,7 @@ def test_short_term_memory_save_events(short_term_memory):
|
||||
"parent_event_id": None,
|
||||
"previous_event_id": ANY,
|
||||
"triggered_by_event_id": None,
|
||||
"started_event_id": ANY,
|
||||
"emission_sequence": ANY,
|
||||
"value": "test value",
|
||||
"metadata": {"task": "test_task"},
|
||||
|
||||
Reference in New Issue
Block a user