From f9301fad027776436d2d603056cf034a2dd6ab0e Mon Sep 17 00:00:00 2001 From: Joao Moura Date: Wed, 17 Jun 2026 23:39:14 -0700 Subject: [PATCH] feat: add memory save event handling to activity log Implemented event handlers for MemorySaveStartedEvent, MemorySaveCompletedEvent, and MemorySaveFailedEvent in the crew_run_tui module. This allows the application to log memory save operations, capturing their status and details in the activity log. Added corresponding tests to verify the correct logging behavior for successful and failed memory saves. --- lib/cli/src/crewai_cli/crew_run_tui.py | 105 ++++++++++++++++++ lib/cli/tests/test_crew_run_tui.py | 66 +++++++++++ .../src/crewai/memory/unified_memory.py | 24 +++- .../tests/memory/test_unified_memory.py | 33 ++++++ 4 files changed, 227 insertions(+), 1 deletion(-) diff --git a/lib/cli/src/crewai_cli/crew_run_tui.py b/lib/cli/src/crewai_cli/crew_run_tui.py index ce60c2e93..23c024055 100644 --- a/lib/cli/src/crewai_cli/crew_run_tui.py +++ b/lib/cli/src/crewai_cli/crew_run_tui.py @@ -1923,8 +1923,113 @@ FooterKey .footer-key--key { MemoryRetrievalCompletedEvent, MemoryRetrievalFailedEvent, MemoryRetrievalStartedEvent, + MemorySaveCompletedEvent, + MemorySaveFailedEvent, + MemorySaveStartedEvent, ) + @crewai_event_bus.on(MemorySaveStartedEvent) + def on_memory_save_started( + source: Any, event: MemorySaveStartedEvent + ) -> None: + with self._lock: + for entry in reversed(self._log_entries): + if ( + entry["tool_name"] == "memory_save" + and entry.get("started_event_id") == event.event_id + ): + entry["args"] = event.value + return + self._log_entries.append( + { + "tool_name": "memory_save", + "status": "running", + "args": event.value, + "result": None, + "error": None, + "start_time": time.time(), + "duration": None, + "task_idx": self._current_task_idx, + "event_id": event.event_id, + } + ) + + self._register_handler(MemorySaveStartedEvent, on_memory_save_started) + + @crewai_event_bus.on(MemorySaveCompletedEvent) + def on_memory_save_completed( + source: Any, event: MemorySaveCompletedEvent + ) -> None: + with self._lock: + for entry in reversed(self._log_entries): + if ( + entry["tool_name"] == "memory_save" + and entry["status"] == "running" + and ( + event.started_event_id is None + or entry.get("event_id") == event.started_event_id + ) + ): + entry["status"] = "success" + entry["duration"] = event.save_time_ms / 1000 + entry["result"] = event.value + entry["started_event_id"] = event.started_event_id + break + else: + self._log_entries.append( + { + "tool_name": "memory_save", + "status": "success", + "args": None, + "result": event.value, + "error": None, + "start_time": time.time(), + "duration": event.save_time_ms / 1000, + "task_idx": self._current_task_idx, + "started_event_id": event.started_event_id, + } + ) + + self._register_handler(MemorySaveCompletedEvent, on_memory_save_completed) + + @crewai_event_bus.on(MemorySaveFailedEvent) + def on_memory_save_failed( + source: Any, event: MemorySaveFailedEvent + ) -> None: + with self._lock: + for idx, entry in reversed(list(enumerate(self._log_entries))): + if ( + entry["tool_name"] == "memory_save" + and entry["status"] == "running" + and ( + event.started_event_id is None + or entry.get("event_id") == event.started_event_id + ) + ): + entry["status"] = "error" + entry["error"] = event.error + entry["duration"] = time.time() - entry["start_time"] + entry["started_event_id"] = event.started_event_id + self._log_expanded.add(idx) + break + else: + self._log_entries.append( + { + "tool_name": "memory_save", + "status": "error", + "args": event.value, + "result": None, + "error": event.error, + "start_time": time.time(), + "duration": 0, + "task_idx": self._current_task_idx, + "started_event_id": event.started_event_id, + } + ) + self._log_expanded.add(len(self._log_entries) - 1) + + self._register_handler(MemorySaveFailedEvent, on_memory_save_failed) + @crewai_event_bus.on(MemoryRetrievalStartedEvent) def on_memory_retrieval_started( source: Any, event: MemoryRetrievalStartedEvent diff --git a/lib/cli/tests/test_crew_run_tui.py b/lib/cli/tests/test_crew_run_tui.py index 7b018107a..15290e2d9 100644 --- a/lib/cli/tests/test_crew_run_tui.py +++ b/lib/cli/tests/test_crew_run_tui.py @@ -4,6 +4,11 @@ import time import pytest from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.memory_events import ( + MemorySaveCompletedEvent, + MemorySaveFailedEvent, + MemorySaveStartedEvent, +) from crewai.events.types.observation_events import ( GoalAchievedEarlyEvent, PlanRefinementEvent, @@ -335,6 +340,67 @@ def test_internal_reasoning_function_call_is_hidden_from_activity_log() -> None: assert app._current_task_steps == [] +def test_memory_save_events_are_shown_in_activity_log() -> None: + app = _app_with_plan() + app._current_task_idx = 1 + app._subscribe() + try: + _emit_event( + MemorySaveStartedEvent( + value="2 memories (background)", + metadata={}, + source_type="unified_memory", + ) + ) + _emit_event( + MemorySaveCompletedEvent( + value="2 memories saved", + metadata={}, + save_time_ms=123, + source_type="unified_memory", + ) + ) + finally: + app._unsubscribe() + + assert len(app._log_entries) == 1 + assert app._log_entries[0]["tool_name"] == "memory_save" + assert app._log_entries[0]["status"] == "success" + assert app._log_entries[0]["args"] == "2 memories (background)" + assert app._log_entries[0]["result"] == "2 memories saved" + assert app._log_entries[0]["error"] is None + assert app._log_entries[0]["duration"] == 0.123 + assert app._log_entries[0]["task_idx"] == 1 + + +def test_memory_save_failure_is_shown_in_activity_log() -> None: + app = _app_with_plan() + app._subscribe() + try: + _emit_event( + MemorySaveStartedEvent( + value="background save", + metadata={}, + source_type="unified_memory", + ) + ) + _emit_event( + MemorySaveFailedEvent( + value="background save", + metadata={}, + error="embedding connection failed", + source_type="unified_memory", + ) + ) + finally: + app._unsubscribe() + + assert app._log_entries[0]["tool_name"] == "memory_save" + assert app._log_entries[0]["status"] == "error" + assert app._log_entries[0]["error"] == "embedding connection failed" + assert app._log_expanded == {0} + + def test_tool_failure_does_not_override_successful_plan_step_completion() -> None: app = _app_with_plan() app._subscribe() diff --git a/lib/crewai/src/crewai/memory/unified_memory.py b/lib/crewai/src/crewai/memory/unified_memory.py index 5a8718eda..dcd5383ce 100644 --- a/lib/crewai/src/crewai/memory/unified_memory.py +++ b/lib/crewai/src/crewai/memory/unified_memory.py @@ -3,7 +3,9 @@ from __future__ import annotations from concurrent.futures import Future, ThreadPoolExecutor +from contextlib import suppress import contextvars +import copy from datetime import datetime import threading import time @@ -53,6 +55,24 @@ def _default_embedder() -> OpenAIEmbeddingFunction: return build_embedder(spec) +def _non_streaming_analysis_llm(llm: Any) -> Any: + """Return an isolated non-streaming LLM for internal memory analysis.""" + if not isinstance(llm, BaseLLM): + return llm + + try: + analysis_llm = copy.copy(llm) + except Exception: + try: + analysis_llm = llm.model_copy(deep=False) + except Exception: + return llm + + with suppress(Exception): + analysis_llm.stream = False + return analysis_llm + + class Memory(BaseModel): """Unified memory: standalone, LLM-analyzed, with intelligent recall flow. @@ -200,7 +220,9 @@ class Memory(BaseModel): query_analysis_threshold=self.query_analysis_threshold, ) - self._llm_instance = None if isinstance(self.llm, str) else self.llm + self._llm_instance = ( + None if isinstance(self.llm, str) else _non_streaming_analysis_llm(self.llm) + ) self._embedder_instance = ( self.embedder if (self.embedder is not None and not isinstance(self.embedder, dict)) diff --git a/lib/crewai/tests/memory/test_unified_memory.py b/lib/crewai/tests/memory/test_unified_memory.py index cbd9cc3ee..8a3d52e7a 100644 --- a/lib/crewai/tests/memory/test_unified_memory.py +++ b/lib/crewai/tests/memory/test_unified_memory.py @@ -19,6 +19,39 @@ from crewai.memory.types import ( ) +def test_memory_analysis_llm_is_isolated_from_streaming_agent_llm( + tmp_path: Path, +) -> None: + """Memory analysis should not share a mutable streaming LLM with the agent UI.""" + from crewai.llms.base_llm import BaseLLM + from crewai.memory.unified_memory import Memory + from crewai.utilities.types import LLMMessage + + class FakeStreamingLLM(BaseLLM): + def call( + self, + messages: str | list[LLMMessage], + tools: list[dict] | None = None, + callbacks: list | None = None, + available_functions: dict | None = None, + from_task: object | None = None, + from_agent: object | None = None, + response_model: type | None = None, + ) -> str: + return "" + + agent_llm = FakeStreamingLLM(model="fake-model", stream=True) + mem = Memory( + storage=str(tmp_path / "db"), + llm=agent_llm, + embedder=lambda texts: [[0.1] for _ in texts], + ) + + assert mem._llm is not agent_llm + assert mem._llm.stream is False + + agent_llm.stream = True + assert mem._llm.stream is False def test_memory_record_defaults() -> None: