feat: add memory save event handling to activity log

Implemented event handlers for MemorySaveStartedEvent, MemorySaveCompletedEvent, and MemorySaveFailedEvent in the crew_run_tui module. This allows the application to log memory save operations, capturing their status and details in the activity log. Added corresponding tests to verify the correct logging behavior for successful and failed memory saves.
This commit is contained in:
Joao Moura
2026-06-17 23:39:14 -07:00
parent 6c82b0c735
commit f9301fad02
4 changed files with 227 additions and 1 deletions

View File

@@ -1923,8 +1923,113 @@ FooterKey .footer-key--key {
MemoryRetrievalCompletedEvent,
MemoryRetrievalFailedEvent,
MemoryRetrievalStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemorySaveStartedEvent,
)
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_memory_save_started(
source: Any, event: MemorySaveStartedEvent
) -> None:
with self._lock:
for entry in reversed(self._log_entries):
if (
entry["tool_name"] == "memory_save"
and entry.get("started_event_id") == event.event_id
):
entry["args"] = event.value
return
self._log_entries.append(
{
"tool_name": "memory_save",
"status": "running",
"args": event.value,
"result": None,
"error": None,
"start_time": time.time(),
"duration": None,
"task_idx": self._current_task_idx,
"event_id": event.event_id,
}
)
self._register_handler(MemorySaveStartedEvent, on_memory_save_started)
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_memory_save_completed(
source: Any, event: MemorySaveCompletedEvent
) -> None:
with self._lock:
for entry in reversed(self._log_entries):
if (
entry["tool_name"] == "memory_save"
and entry["status"] == "running"
and (
event.started_event_id is None
or entry.get("event_id") == event.started_event_id
)
):
entry["status"] = "success"
entry["duration"] = event.save_time_ms / 1000
entry["result"] = event.value
entry["started_event_id"] = event.started_event_id
break
else:
self._log_entries.append(
{
"tool_name": "memory_save",
"status": "success",
"args": None,
"result": event.value,
"error": None,
"start_time": time.time(),
"duration": event.save_time_ms / 1000,
"task_idx": self._current_task_idx,
"started_event_id": event.started_event_id,
}
)
self._register_handler(MemorySaveCompletedEvent, on_memory_save_completed)
@crewai_event_bus.on(MemorySaveFailedEvent)
def on_memory_save_failed(
source: Any, event: MemorySaveFailedEvent
) -> None:
with self._lock:
for idx, entry in reversed(list(enumerate(self._log_entries))):
if (
entry["tool_name"] == "memory_save"
and entry["status"] == "running"
and (
event.started_event_id is None
or entry.get("event_id") == event.started_event_id
)
):
entry["status"] = "error"
entry["error"] = event.error
entry["duration"] = time.time() - entry["start_time"]
entry["started_event_id"] = event.started_event_id
self._log_expanded.add(idx)
break
else:
self._log_entries.append(
{
"tool_name": "memory_save",
"status": "error",
"args": event.value,
"result": None,
"error": event.error,
"start_time": time.time(),
"duration": 0,
"task_idx": self._current_task_idx,
"started_event_id": event.started_event_id,
}
)
self._log_expanded.add(len(self._log_entries) - 1)
self._register_handler(MemorySaveFailedEvent, on_memory_save_failed)
@crewai_event_bus.on(MemoryRetrievalStartedEvent)
def on_memory_retrieval_started(
source: Any, event: MemoryRetrievalStartedEvent

View File

@@ -4,6 +4,11 @@ import time
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemorySaveStartedEvent,
)
from crewai.events.types.observation_events import (
GoalAchievedEarlyEvent,
PlanRefinementEvent,
@@ -335,6 +340,67 @@ def test_internal_reasoning_function_call_is_hidden_from_activity_log() -> None:
assert app._current_task_steps == []
def test_memory_save_events_are_shown_in_activity_log() -> None:
app = _app_with_plan()
app._current_task_idx = 1
app._subscribe()
try:
_emit_event(
MemorySaveStartedEvent(
value="2 memories (background)",
metadata={},
source_type="unified_memory",
)
)
_emit_event(
MemorySaveCompletedEvent(
value="2 memories saved",
metadata={},
save_time_ms=123,
source_type="unified_memory",
)
)
finally:
app._unsubscribe()
assert len(app._log_entries) == 1
assert app._log_entries[0]["tool_name"] == "memory_save"
assert app._log_entries[0]["status"] == "success"
assert app._log_entries[0]["args"] == "2 memories (background)"
assert app._log_entries[0]["result"] == "2 memories saved"
assert app._log_entries[0]["error"] is None
assert app._log_entries[0]["duration"] == 0.123
assert app._log_entries[0]["task_idx"] == 1
def test_memory_save_failure_is_shown_in_activity_log() -> None:
app = _app_with_plan()
app._subscribe()
try:
_emit_event(
MemorySaveStartedEvent(
value="background save",
metadata={},
source_type="unified_memory",
)
)
_emit_event(
MemorySaveFailedEvent(
value="background save",
metadata={},
error="embedding connection failed",
source_type="unified_memory",
)
)
finally:
app._unsubscribe()
assert app._log_entries[0]["tool_name"] == "memory_save"
assert app._log_entries[0]["status"] == "error"
assert app._log_entries[0]["error"] == "embedding connection failed"
assert app._log_expanded == {0}
def test_tool_failure_does_not_override_successful_plan_step_completion() -> None:
app = _app_with_plan()
app._subscribe()

View File

@@ -3,7 +3,9 @@
from __future__ import annotations
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import suppress
import contextvars
import copy
from datetime import datetime
import threading
import time
@@ -53,6 +55,24 @@ def _default_embedder() -> OpenAIEmbeddingFunction:
return build_embedder(spec)
def _non_streaming_analysis_llm(llm: Any) -> Any:
"""Return an isolated non-streaming LLM for internal memory analysis."""
if not isinstance(llm, BaseLLM):
return llm
try:
analysis_llm = copy.copy(llm)
except Exception:
try:
analysis_llm = llm.model_copy(deep=False)
except Exception:
return llm
with suppress(Exception):
analysis_llm.stream = False
return analysis_llm
class Memory(BaseModel):
"""Unified memory: standalone, LLM-analyzed, with intelligent recall flow.
@@ -200,7 +220,9 @@ class Memory(BaseModel):
query_analysis_threshold=self.query_analysis_threshold,
)
self._llm_instance = None if isinstance(self.llm, str) else self.llm
self._llm_instance = (
None if isinstance(self.llm, str) else _non_streaming_analysis_llm(self.llm)
)
self._embedder_instance = (
self.embedder
if (self.embedder is not None and not isinstance(self.embedder, dict))

View File

@@ -19,6 +19,39 @@ from crewai.memory.types import (
)
def test_memory_analysis_llm_is_isolated_from_streaming_agent_llm(
tmp_path: Path,
) -> None:
"""Memory analysis should not share a mutable streaming LLM with the agent UI."""
from crewai.llms.base_llm import BaseLLM
from crewai.memory.unified_memory import Memory
from crewai.utilities.types import LLMMessage
class FakeStreamingLLM(BaseLLM):
def call(
self,
messages: str | list[LLMMessage],
tools: list[dict] | None = None,
callbacks: list | None = None,
available_functions: dict | None = None,
from_task: object | None = None,
from_agent: object | None = None,
response_model: type | None = None,
) -> str:
return ""
agent_llm = FakeStreamingLLM(model="fake-model", stream=True)
mem = Memory(
storage=str(tmp_path / "db"),
llm=agent_llm,
embedder=lambda texts: [[0.1] for _ in texts],
)
assert mem._llm is not agent_llm
assert mem._llm.stream is False
agent_llm.stream = True
assert mem._llm.stream is False
def test_memory_record_defaults() -> None: