fix: support to provide a custom storage to External Memory

This commit is contained in:
lucasgomide
2025-04-01 18:26:14 -03:00
committed by Lucas Gomide
parent dd6d29a767
commit d12306a71b
2 changed files with 42 additions and 9 deletions

View File

@@ -7,6 +7,7 @@ from crewai.agent import Agent
from crewai.crew import Crew, Process
from crewai.memory.external.external_memory import ExternalMemory
from crewai.memory.external.external_memory_item import ExternalMemoryItem
from crewai.memory.storage.interface import Storage
from crewai.task import Task
@@ -142,3 +143,38 @@ def test_crew_external_memory_save(mem_method, crew_with_external_memory):
) as mock_method:
crew_with_external_memory.kickoff()
assert mock_method.call_count > 0
def test_external_memory_custom_storage(crew_with_external_memory):
class CustomStorage(Storage):
def __init__(self):
self.memories = []
def save(self, value, metadata=None, agent=None):
self.memories.append({"value": value, "metadata": metadata, "agent": agent})
def search(self, query, limit=10, score_threshold=0.5):
return self.memories
def reset(self):
self.memories = []
custom_storage = CustomStorage()
external_memory = ExternalMemory(storage=custom_storage)
# by ensuring the crew is set, we can test that the storage is used
external_memory.set_crew(crew_with_external_memory)
test_value = "test value"
test_metadata = {"source": "test"}
test_agent = "test_agent"
external_memory.save(value=test_value, metadata=test_metadata, agent=test_agent)
results = external_memory.search("test")
assert len(results) == 1
assert results[0]["value"] == test_value
assert results[0]["metadata"] == test_metadata | {"agent": test_agent}
external_memory.reset()
results = external_memory.search("test")
assert len(results) == 0