diff --git a/lib/cli/src/crewai_cli/crew_run_tui.py b/lib/cli/src/crewai_cli/crew_run_tui.py index ce60c2e93..23c024055 100644 --- a/lib/cli/src/crewai_cli/crew_run_tui.py +++ b/lib/cli/src/crewai_cli/crew_run_tui.py @@ -1923,8 +1923,113 @@ FooterKey .footer-key--key { MemoryRetrievalCompletedEvent, MemoryRetrievalFailedEvent, MemoryRetrievalStartedEvent, + MemorySaveCompletedEvent, + MemorySaveFailedEvent, + MemorySaveStartedEvent, ) + @crewai_event_bus.on(MemorySaveStartedEvent) + def on_memory_save_started( + source: Any, event: MemorySaveStartedEvent + ) -> None: + with self._lock: + for entry in reversed(self._log_entries): + if ( + entry["tool_name"] == "memory_save" + and entry.get("started_event_id") == event.event_id + ): + entry["args"] = event.value + return + self._log_entries.append( + { + "tool_name": "memory_save", + "status": "running", + "args": event.value, + "result": None, + "error": None, + "start_time": time.time(), + "duration": None, + "task_idx": self._current_task_idx, + "event_id": event.event_id, + } + ) + + self._register_handler(MemorySaveStartedEvent, on_memory_save_started) + + @crewai_event_bus.on(MemorySaveCompletedEvent) + def on_memory_save_completed( + source: Any, event: MemorySaveCompletedEvent + ) -> None: + with self._lock: + for entry in reversed(self._log_entries): + if ( + entry["tool_name"] == "memory_save" + and entry["status"] == "running" + and ( + event.started_event_id is None + or entry.get("event_id") == event.started_event_id + ) + ): + entry["status"] = "success" + entry["duration"] = event.save_time_ms / 1000 + entry["result"] = event.value + entry["started_event_id"] = event.started_event_id + break + else: + self._log_entries.append( + { + "tool_name": "memory_save", + "status": "success", + "args": None, + "result": event.value, + "error": None, + "start_time": time.time(), + "duration": event.save_time_ms / 1000, + "task_idx": self._current_task_idx, + "started_event_id": event.started_event_id, + } + ) + + self._register_handler(MemorySaveCompletedEvent, on_memory_save_completed) + + @crewai_event_bus.on(MemorySaveFailedEvent) + def on_memory_save_failed( + source: Any, event: MemorySaveFailedEvent + ) -> None: + with self._lock: + for idx, entry in reversed(list(enumerate(self._log_entries))): + if ( + entry["tool_name"] == "memory_save" + and entry["status"] == "running" + and ( + event.started_event_id is None + or entry.get("event_id") == event.started_event_id + ) + ): + entry["status"] = "error" + entry["error"] = event.error + entry["duration"] = time.time() - entry["start_time"] + entry["started_event_id"] = event.started_event_id + self._log_expanded.add(idx) + break + else: + self._log_entries.append( + { + "tool_name": "memory_save", + "status": "error", + "args": event.value, + "result": None, + "error": event.error, + "start_time": time.time(), + "duration": 0, + "task_idx": self._current_task_idx, + "started_event_id": event.started_event_id, + } + ) + self._log_expanded.add(len(self._log_entries) - 1) + + self._register_handler(MemorySaveFailedEvent, on_memory_save_failed) + @crewai_event_bus.on(MemoryRetrievalStartedEvent) def on_memory_retrieval_started( source: Any, event: MemoryRetrievalStartedEvent diff --git a/lib/cli/tests/test_crew_run_tui.py b/lib/cli/tests/test_crew_run_tui.py index 7b018107a..15290e2d9 100644 --- a/lib/cli/tests/test_crew_run_tui.py +++ b/lib/cli/tests/test_crew_run_tui.py @@ -4,6 +4,11 @@ import time import pytest from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.memory_events import ( + MemorySaveCompletedEvent, + MemorySaveFailedEvent, + MemorySaveStartedEvent, +) from crewai.events.types.observation_events import ( GoalAchievedEarlyEvent, PlanRefinementEvent, @@ -335,6 +340,67 @@ def test_internal_reasoning_function_call_is_hidden_from_activity_log() -> None: assert app._current_task_steps == [] +def test_memory_save_events_are_shown_in_activity_log() -> None: + app = _app_with_plan() + app._current_task_idx = 1 + app._subscribe() + try: + _emit_event( + MemorySaveStartedEvent( + value="2 memories (background)", + metadata={}, + source_type="unified_memory", + ) + ) + _emit_event( + MemorySaveCompletedEvent( + value="2 memories saved", + metadata={}, + save_time_ms=123, + source_type="unified_memory", + ) + ) + finally: + app._unsubscribe() + + assert len(app._log_entries) == 1 + assert app._log_entries[0]["tool_name"] == "memory_save" + assert app._log_entries[0]["status"] == "success" + assert app._log_entries[0]["args"] == "2 memories (background)" + assert app._log_entries[0]["result"] == "2 memories saved" + assert app._log_entries[0]["error"] is None + assert app._log_entries[0]["duration"] == 0.123 + assert app._log_entries[0]["task_idx"] == 1 + + +def test_memory_save_failure_is_shown_in_activity_log() -> None: + app = _app_with_plan() + app._subscribe() + try: + _emit_event( + MemorySaveStartedEvent( + value="background save", + metadata={}, + source_type="unified_memory", + ) + ) + _emit_event( + MemorySaveFailedEvent( + value="background save", + metadata={}, + error="embedding connection failed", + source_type="unified_memory", + ) + ) + finally: + app._unsubscribe() + + assert app._log_entries[0]["tool_name"] == "memory_save" + assert app._log_entries[0]["status"] == "error" + assert app._log_entries[0]["error"] == "embedding connection failed" + assert app._log_expanded == {0} + + def test_tool_failure_does_not_override_successful_plan_step_completion() -> None: app = _app_with_plan() app._subscribe() diff --git a/lib/crewai/src/crewai/memory/unified_memory.py b/lib/crewai/src/crewai/memory/unified_memory.py index 5a8718eda..dcd5383ce 100644 --- a/lib/crewai/src/crewai/memory/unified_memory.py +++ b/lib/crewai/src/crewai/memory/unified_memory.py @@ -3,7 +3,9 @@ from __future__ import annotations from concurrent.futures import Future, ThreadPoolExecutor +from contextlib import suppress import contextvars +import copy from datetime import datetime import threading import time @@ -53,6 +55,24 @@ def _default_embedder() -> OpenAIEmbeddingFunction: return build_embedder(spec) +def _non_streaming_analysis_llm(llm: Any) -> Any: + """Return an isolated non-streaming LLM for internal memory analysis.""" + if not isinstance(llm, BaseLLM): + return llm + + try: + analysis_llm = copy.copy(llm) + except Exception: + try: + analysis_llm = llm.model_copy(deep=False) + except Exception: + return llm + + with suppress(Exception): + analysis_llm.stream = False + return analysis_llm + + class Memory(BaseModel): """Unified memory: standalone, LLM-analyzed, with intelligent recall flow. @@ -200,7 +220,9 @@ class Memory(BaseModel): query_analysis_threshold=self.query_analysis_threshold, ) - self._llm_instance = None if isinstance(self.llm, str) else self.llm + self._llm_instance = ( + None if isinstance(self.llm, str) else _non_streaming_analysis_llm(self.llm) + ) self._embedder_instance = ( self.embedder if (self.embedder is not None and not isinstance(self.embedder, dict)) diff --git a/lib/crewai/tests/memory/test_unified_memory.py b/lib/crewai/tests/memory/test_unified_memory.py index cbd9cc3ee..8a3d52e7a 100644 --- a/lib/crewai/tests/memory/test_unified_memory.py +++ b/lib/crewai/tests/memory/test_unified_memory.py @@ -19,6 +19,39 @@ from crewai.memory.types import ( ) +def test_memory_analysis_llm_is_isolated_from_streaming_agent_llm( + tmp_path: Path, +) -> None: + """Memory analysis should not share a mutable streaming LLM with the agent UI.""" + from crewai.llms.base_llm import BaseLLM + from crewai.memory.unified_memory import Memory + from crewai.utilities.types import LLMMessage + + class FakeStreamingLLM(BaseLLM): + def call( + self, + messages: str | list[LLMMessage], + tools: list[dict] | None = None, + callbacks: list | None = None, + available_functions: dict | None = None, + from_task: object | None = None, + from_agent: object | None = None, + response_model: type | None = None, + ) -> str: + return "" + + agent_llm = FakeStreamingLLM(model="fake-model", stream=True) + mem = Memory( + storage=str(tmp_path / "db"), + llm=agent_llm, + embedder=lambda texts: [[0.1] for _ in texts], + ) + + assert mem._llm is not agent_llm + assert mem._llm.stream is False + + agent_llm.stream = True + assert mem._llm.stream is False def test_memory_record_defaults() -> None: