diff --git a/lib/crewai/src/crewai/events/base_events.py b/lib/crewai/src/crewai/events/base_events.py index 7148e5e1d..6eeaa06e8 100644 --- a/lib/crewai/src/crewai/events/base_events.py +++ b/lib/crewai/src/crewai/events/base_events.py @@ -63,6 +63,7 @@ class BaseEvent(BaseModel): parent_event_id: str | None = None previous_event_id: str | None = None triggered_by_event_id: str | None = None + started_event_id: str | None = None emission_sequence: int | None = None def to_json(self, exclude: set[str] | None = None) -> Serializable: diff --git a/lib/crewai/src/crewai/events/event_bus.py b/lib/crewai/src/crewai/events/event_bus.py index d0aaa4455..b30d469b9 100644 --- a/lib/crewai/src/crewai/events/event_bus.py +++ b/lib/crewai/src/crewai/events/event_bus.py @@ -407,7 +407,8 @@ class CrewAIEventsBus: if popped is None: handle_empty_pop(event_type_name) else: - _, popped_type = popped + popped_event_id, popped_type = popped + event.started_event_id = popped_event_id expected_start = VALID_EVENT_PAIRS.get(event_type_name) if expected_start and popped_type and popped_type != expected_start: handle_mismatch(event_type_name, popped_type, expected_start) @@ -569,24 +570,52 @@ class CrewAIEventsBus: ... # Do stuff... ... # Handlers are cleared after the context """ - with self._rwlock.w_locked(): - prev_sync = self._sync_handlers - prev_async = self._async_handlers - prev_deps = self._handler_dependencies - prev_cache = self._execution_plan_cache - self._sync_handlers = {} - self._async_handlers = {} - self._handler_dependencies = {} - self._execution_plan_cache = {} + with self._rwlock.r_locked(): + saved_sync: dict[type[BaseEvent], frozenset[SyncHandler]] = dict( + self._sync_handlers + ) + saved_async: dict[type[BaseEvent], frozenset[AsyncHandler]] = dict( + self._async_handlers + ) + saved_deps: dict[type[BaseEvent], dict[Handler, list[Depends[Any]]]] = { + event_type: dict(handlers) + for event_type, handlers in self._handler_dependencies.items() + } + + for event_type, sync_handlers in saved_sync.items(): + for sync_handler in sync_handlers: + self.off(event_type, sync_handler) + + for event_type, async_handlers in saved_async.items(): + for async_handler in async_handlers: + self.off(event_type, async_handler) try: yield finally: - with self._rwlock.w_locked(): - self._sync_handlers = prev_sync - self._async_handlers = prev_async - self._handler_dependencies = prev_deps - self._execution_plan_cache = prev_cache + with self._rwlock.r_locked(): + current_sync = dict(self._sync_handlers) + current_async = dict(self._async_handlers) + + for event_type, cur_sync in current_sync.items(): + orig_sync = saved_sync.get(event_type, frozenset()) + for new_handler in cur_sync - orig_sync: + self.off(event_type, new_handler) + + for event_type, cur_async in current_async.items(): + orig_async = saved_async.get(event_type, frozenset()) + for new_async_handler in cur_async - orig_async: + self.off(event_type, new_async_handler) + + for event_type, sync_handlers in saved_sync.items(): + for sync_handler in sync_handlers: + deps = saved_deps.get(event_type, {}).get(sync_handler) + self._register_handler(event_type, sync_handler, deps) + + for event_type, async_handlers in saved_async.items(): + for async_handler in async_handlers: + deps = saved_deps.get(event_type, {}).get(async_handler) + self._register_handler(event_type, async_handler, deps) def shutdown(self, wait: bool = True) -> None: """Gracefully shutdown the event loop and wait for all tasks to finish. diff --git a/lib/crewai/tests/memory/test_external_memory.py b/lib/crewai/tests/memory/test_external_memory.py index 8718c5aca..1872bc0af 100644 --- a/lib/crewai/tests/memory/test_external_memory.py +++ b/lib/crewai/tests/memory/test_external_memory.py @@ -308,6 +308,7 @@ def test_external_memory_search_events( "parent_event_id": None, "previous_event_id": ANY, "triggered_by_event_id": None, + "started_event_id": ANY, "emission_sequence": ANY, "query": "test value", "limit": 3, @@ -330,6 +331,7 @@ def test_external_memory_search_events( "parent_event_id": ANY, "previous_event_id": ANY, "triggered_by_event_id": None, + "started_event_id": ANY, "emission_sequence": ANY, "query": "test value", "results": [], @@ -390,6 +392,7 @@ def test_external_memory_save_events( "parent_event_id": None, "previous_event_id": ANY, "triggered_by_event_id": None, + "started_event_id": ANY, "emission_sequence": ANY, "value": "saving value", "metadata": {"task": "test_task"}, @@ -411,6 +414,7 @@ def test_external_memory_save_events( "parent_event_id": ANY, "previous_event_id": ANY, "triggered_by_event_id": None, + "started_event_id": ANY, "emission_sequence": ANY, "value": "saving value", "metadata": {"task": "test_task"}, diff --git a/lib/crewai/tests/memory/test_long_term_memory.py b/lib/crewai/tests/memory/test_long_term_memory.py index c33e4469b..500fab169 100644 --- a/lib/crewai/tests/memory/test_long_term_memory.py +++ b/lib/crewai/tests/memory/test_long_term_memory.py @@ -74,6 +74,7 @@ def test_long_term_memory_save_events(long_term_memory): "parent_event_id": None, "previous_event_id": ANY, "triggered_by_event_id": None, + "started_event_id": ANY, "emission_sequence": ANY, "value": "test_task", "metadata": {"task": "test_task", "quality": 0.5}, @@ -94,6 +95,7 @@ def test_long_term_memory_save_events(long_term_memory): "parent_event_id": None, "previous_event_id": ANY, "triggered_by_event_id": None, + "started_event_id": ANY, "emission_sequence": ANY, "value": "test_task", "metadata": { @@ -153,6 +155,7 @@ def test_long_term_memory_search_events(long_term_memory): "parent_event_id": None, "previous_event_id": ANY, "triggered_by_event_id": None, + "started_event_id": ANY, "emission_sequence": ANY, "query": "test query", "limit": 5, @@ -175,6 +178,7 @@ def test_long_term_memory_search_events(long_term_memory): "parent_event_id": ANY, "previous_event_id": ANY, "triggered_by_event_id": None, + "started_event_id": ANY, "emission_sequence": ANY, "query": "test query", "results": None, diff --git a/lib/crewai/tests/memory/test_short_term_memory.py b/lib/crewai/tests/memory/test_short_term_memory.py index 8ea64553a..5e74b688d 100644 --- a/lib/crewai/tests/memory/test_short_term_memory.py +++ b/lib/crewai/tests/memory/test_short_term_memory.py @@ -85,6 +85,7 @@ def test_short_term_memory_search_events(short_term_memory): "parent_event_id": None, "previous_event_id": ANY, "triggered_by_event_id": None, + "started_event_id": ANY, "emission_sequence": ANY, "query": "test value", "limit": 3, @@ -107,6 +108,7 @@ def test_short_term_memory_search_events(short_term_memory): "parent_event_id": None, "previous_event_id": ANY, "triggered_by_event_id": None, + "started_event_id": ANY, "emission_sequence": ANY, "query": "test value", "results": [], @@ -164,6 +166,7 @@ def test_short_term_memory_save_events(short_term_memory): "parent_event_id": None, "previous_event_id": ANY, "triggered_by_event_id": None, + "started_event_id": ANY, "emission_sequence": ANY, "value": "test value", "metadata": {"task": "test_task"}, @@ -185,6 +188,7 @@ def test_short_term_memory_save_events(short_term_memory): "parent_event_id": None, "previous_event_id": ANY, "triggered_by_event_id": None, + "started_event_id": ANY, "emission_sequence": ANY, "value": "test value", "metadata": {"task": "test_task"},