diff --git a/lib/crewai/src/crewai/memory/unified_memory.py b/lib/crewai/src/crewai/memory/unified_memory.py index 8bd2a573a..5a8718eda 100644 --- a/lib/crewai/src/crewai/memory/unified_memory.py +++ b/lib/crewai/src/crewai/memory/unified_memory.py @@ -149,6 +149,7 @@ class Memory(BaseModel): ) _pending_saves: list[Future[Any]] = PrivateAttr(default_factory=list) _pending_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock) + _reset_lock: Any = PrivateAttr(default_factory=threading.RLock) def __deepcopy__(self, memo: dict[int, Any] | None = None) -> Memory: """Deepcopy that handles unpickleable private attrs (ThreadPoolExecutor, Lock).""" @@ -168,7 +169,10 @@ class Memory(BaseModel): ) private = {} for k, v in (self.__pydantic_private__ or {}).items(): - if isinstance(v, (ThreadPoolExecutor, threading.Lock)): + if k in {"_save_pool", "_pending_lock", "_reset_lock"}: + attr = self.__private_attributes__[k] + private[k] = attr.get_default() + elif isinstance(v, (ThreadPoolExecutor, threading.Lock)): attr = self.__private_attributes__[k] private[k] = attr.get_default() else: @@ -275,22 +279,25 @@ class Memory(BaseModel): If the pool has been shut down (e.g. after ``close()``), the save runs synchronously as a fallback so late saves still succeed. """ - ctx = contextvars.copy_context() - try: - future: Future[Any] = self._save_pool.submit(ctx.run, fn, *args, **kwargs) - except RuntimeError: - # Pool shut down -- run synchronously as fallback - future = Future() + with self._reset_lock: + ctx = contextvars.copy_context() try: - result = fn(*args, **kwargs) - future.set_result(result) - except Exception as exc: - future.set_exception(exc) + future: Future[Any] = self._save_pool.submit( + ctx.run, fn, *args, **kwargs + ) + except RuntimeError: + # Pool shut down -- run synchronously as fallback + future = Future() + try: + result = fn(*args, **kwargs) + future.set_result(result) + except Exception as exc: + future.set_exception(exc) + return future + with self._pending_lock: + self._pending_saves.append(future) + future.add_done_callback(self._on_save_done) return future - with self._pending_lock: - self._pending_saves.append(future) - future.add_done_callback(self._on_save_done) - return future def _on_save_done(self, future: Future[Any]) -> None: """Remove a completed future from the pending list and emit failure event if needed. @@ -990,18 +997,20 @@ class Memory(BaseModel): scope: Scope to reset. If None and root_scope is set, resets only within root_scope. If None and no root_scope, resets all. """ - self.drain_writes() - effective_scope = scope - if effective_scope is None and self.root_scope: - effective_scope = self.root_scope - elif effective_scope is not None and self.root_scope: - effective_scope = join_scope_paths(self.root_scope, effective_scope) - self._storage.reset(scope_prefix=effective_scope) + with self._reset_lock: + self.drain_writes() + effective_scope = scope + if effective_scope is None and self.root_scope: + effective_scope = self.root_scope + elif effective_scope is not None and self.root_scope: + effective_scope = join_scope_paths(self.root_scope, effective_scope) + self._storage.reset(scope_prefix=effective_scope) def reset_all(self) -> None: """Reset the entire backing memory store, ignoring ``root_scope``.""" - self.drain_writes() - self._storage.reset(scope_prefix=None) + with self._reset_lock: + self.drain_writes() + self._storage.reset(scope_prefix=None) async def aextract_memories(self, content: str) -> list[str]: """Async variant of extract_memories.""" diff --git a/lib/crewai/src/crewai/utilities/reset_memories.py b/lib/crewai/src/crewai/utilities/reset_memories.py index e6af9d55a..e9f4d525d 100644 --- a/lib/crewai/src/crewai/utilities/reset_memories.py +++ b/lib/crewai/src/crewai/utilities/reset_memories.py @@ -63,7 +63,14 @@ def _get_json_crew() -> Any | None: if crew_path is None: return None - crew, _ = load_crew(crew_path) + try: + crew, _ = load_crew(crew_path) + except Exception as exc: + click.echo( + f"Skipping JSON crew at {crew_path}: failed to load ({exc}).", + err=True, + ) + return None return crew diff --git a/lib/crewai/tests/cli/test_cli.py b/lib/crewai/tests/cli/test_cli.py index c2508d0df..8b65c865f 100644 --- a/lib/crewai/tests/cli/test_cli.py +++ b/lib/crewai/tests/cli/test_cli.py @@ -9,6 +9,7 @@ from unittest import mock from click.testing import CliRunner from crewai.crew import Crew +from crewai.memory.unified_memory import Memory from crewai_cli.cli import reset_memories import pytest @@ -185,6 +186,33 @@ def test_reset_flow_memory(mock_get_flows, mock_flow, runner): assert "[Flow (TestFlow)] Memory has been reset." in result.output +def test_reset_flow_unified_memory_uses_full_reset(runner, tmp_path): + flow = mock.Mock() + flow.name = "TestFlow" + flow.memory = Memory( + storage=str(tmp_path / "db"), + llm=mock.Mock(), + embedder=lambda texts: [[0.1] * 4 for _ in texts], + ) + + with mock.patch( + "crewai.utilities.reset_memories.get_flows", return_value=[flow] + ), mock.patch( + "crewai.utilities.reset_memories.get_crews", return_value=[] + ), mock.patch( + "crewai.utilities.reset_memories._get_json_crew", return_value=None + ), mock.patch.object( + Memory, "reset_all" + ) as reset_all, mock.patch.object( + Memory, "reset" + ) as reset: + result = runner.invoke(reset_memories, ["-m"]) + + reset_all.assert_called_once_with() + reset.assert_not_called() + assert "[Flow (TestFlow)] Memory has been reset." in result.output + + def test_reset_flow_all_memories(mock_get_flows, mock_flow, runner): result = runner.invoke(reset_memories, ["-a"]) mock_flow.memory.reset.assert_called_once() @@ -228,6 +256,28 @@ def test_reset_json_crew_memory(mock_crew, runner, monkeypatch, tmp_path): assert f"[Crew ({mock_crew.name})] Memory has been reset." in result.output +def test_reset_invalid_json_crew_does_not_block_classic_crew( + mock_crew, runner, monkeypatch, tmp_path +): + monkeypatch.chdir(tmp_path) + (tmp_path / "crew.jsonc").write_text("{invalid") + + with mock.patch( + "crewai.utilities.reset_memories.get_crews", return_value=[mock_crew] + ), mock.patch( + "crewai.utilities.reset_memories.get_flows", return_value=[] + ), mock.patch( + "crewai.utilities.reset_memories.load_crew", + side_effect=ValueError("invalid JSON"), + ) as mock_load_crew: + result = runner.invoke(reset_memories, ["-m"]) + + mock_load_crew.assert_called_once_with(Path("crew.jsonc")) + mock_crew.reset_memories.assert_called_once_with(command_type="memory") + assert "Skipping JSON crew at crew.jsonc: failed to load (invalid JSON)." in result.output + assert f"[Crew ({mock_crew.name})] Memory has been reset." in result.output + + def test_reset_json_crew_skipped_for_declared_flow_project( mock_crew, runner, monkeypatch, tmp_path ): diff --git a/lib/crewai/tests/memory/test_dimension_mismatch.py b/lib/crewai/tests/memory/test_dimension_mismatch.py index d4f3310e4..80dba17d8 100644 --- a/lib/crewai/tests/memory/test_dimension_mismatch.py +++ b/lib/crewai/tests/memory/test_dimension_mismatch.py @@ -8,6 +8,7 @@ not silently zero-fill vectors or return empty search results. from __future__ import annotations from pathlib import Path +from unittest.mock import MagicMock import pytest @@ -97,6 +98,33 @@ def test_lancedb_reopened_store_detects_mismatch(lancedb_path: Path) -> None: reopened.search([0.1] * 8) +def test_memory_reset_all_rebuilds_reopened_store_with_new_dimension( + lancedb_path: Path, +) -> None: + from crewai.memory.storage.lancedb_storage import LanceDBStorage + from crewai.memory.unified_memory import Memory + + old = LanceDBStorage(path=str(lancedb_path), vector_dim=4) + old.save([_record(4)]) + + mem = Memory( + storage=str(lancedb_path), + llm=MagicMock(), + embedder=lambda texts: [[0.1] * 8 for _ in texts], + root_scope="/crew/test", + ) + + mem.reset_all() + mem.remember( + "new embedder output", + scope="/facts", + categories=["test"], + importance=0.5, + ) + + assert mem.recall("new embedder output", scope="/facts", depth="shallow") + + def test_lancedb_matching_dim_still_works(lancedb_path: Path) -> None: from crewai.memory.storage.lancedb_storage import LanceDBStorage diff --git a/lib/crewai/tests/memory/test_unified_memory.py b/lib/crewai/tests/memory/test_unified_memory.py index 665893233..cbd9cc3ee 100644 --- a/lib/crewai/tests/memory/test_unified_memory.py +++ b/lib/crewai/tests/memory/test_unified_memory.py @@ -954,6 +954,54 @@ def test_remember_many_returns_immediately(tmp_path: Path) -> None: assert mem._storage.count() == 2 +def test_reset_all_blocks_new_save_submission_until_reset_completes( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """A save cannot be submitted between draining writes and resetting storage.""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=lambda texts: [[0.1] * 4 for _ in texts], + ) + reset_started = threading.Event() + release_reset = threading.Event() + submission_returned = threading.Event() + order: list[str] = [] + original_reset = mem._storage.reset + + def blocking_reset(scope_prefix: str | None = None) -> None: + order.append("reset-start") + reset_started.set() + assert release_reset.wait(timeout=2) + original_reset(scope_prefix=scope_prefix) + order.append("reset-end") + + def submit_save() -> None: + mem._submit_save(lambda: order.append("save")) + order.append("submit-returned") + submission_returned.set() + + monkeypatch.setattr(mem._storage, "reset", blocking_reset) + + reset_thread = threading.Thread(target=mem.reset_all) + reset_thread.start() + assert reset_started.wait(timeout=2) + + submit_thread = threading.Thread(target=submit_save) + submit_thread.start() + assert not submission_returned.wait(timeout=0.1) + + release_reset.set() + reset_thread.join(timeout=2) + submit_thread.join(timeout=2) + + assert not reset_thread.is_alive() + assert not submit_thread.is_alive() + assert order.index("reset-end") < order.index("submit-returned") + + def test_recall_drains_pending_writes(tmp_path: Path, mock_embedder: MagicMock) -> None: """recall() should automatically wait for pending background saves.""" from crewai.memory.unified_memory import Memory diff --git a/lib/crewai/tests/test_crew.py b/lib/crewai/tests/test_crew.py index 5b06685d8..82f3207dd 100644 --- a/lib/crewai/tests/test_crew.py +++ b/lib/crewai/tests/test_crew.py @@ -4584,6 +4584,26 @@ def test_reset_knowledge_with_no_crew_knowledge(researcher, writer): ) +def test_reset_memory_uses_full_unified_memory_reset(researcher): + crew = Crew( + agents=[researcher], + process=Process.sequential, + tasks=[ + Task(description="Task 1", expected_output="output", agent=researcher), + ], + memory=True, + ) + + assert isinstance(crew._memory, Memory) + with patch.object(Memory, "reset_all") as reset_all, patch.object( + Memory, "reset" + ) as reset: + crew.reset_memories(command_type="memory") + + reset_all.assert_called_once_with() + reset.assert_not_called() + + def test_reset_knowledge_with_only_crew_knowledge(researcher, writer): mock_ks = MagicMock(spec=Knowledge)