Add support for custom memory storage implementations (fixes #2278)

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-03-04 19:50:54 +00:00
parent 00eede0d5d
commit 583ac5711f
9 changed files with 506 additions and 70 deletions

View File

@@ -0,0 +1,151 @@
# Custom Memory Storage
CrewAI supports custom memory storage implementations for different memory types. You can provide your own storage implementation by extending the `Storage` interface and passing it to the memory instances or through the `memory_config` parameter.
## Implementing a Custom Storage
To create a custom storage implementation, you need to extend the `Storage` interface and implement the required methods:
```python
from typing import Any, Dict, List
from crewai.memory.storage.interface import Storage
class CustomStorage(Storage):
"""Custom storage implementation."""
def __init__(self):
# Initialize your storage backend
self.data = []
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
"""Save a value with metadata to the storage."""
# Implement your save logic
self.data.append({"value": value, "metadata": metadata})
def search(
self, query: str, limit: int = 3, score_threshold: float = 0.35
) -> List[Any]:
"""Search for values in the storage."""
# Implement your search logic
return [{"context": item["value"], "metadata": item["metadata"]} for item in self.data]
def reset(self) -> None:
"""Reset the storage."""
# Implement your reset logic
self.data = []
```
## Using Custom Storage
There are two ways to provide custom storage implementations to CrewAI:
### 1. Pass Custom Storage to Memory Instances
You can create memory instances with custom storage and pass them to the Crew:
```python
from crewai import Crew, Agent
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.user.user_memory import UserMemory
# Create custom storage instances
short_term_storage = CustomStorage()
long_term_storage = CustomStorage()
entity_storage = CustomStorage()
user_storage = CustomStorage()
# Create memory instances with custom storage
short_term_memory = ShortTermMemory(storage=short_term_storage)
long_term_memory = LongTermMemory(storage=long_term_storage)
entity_memory = EntityMemory(storage=entity_storage)
user_memory = UserMemory(storage=user_storage)
# Create a crew with custom memory instances
crew = Crew(
agents=[Agent(role="researcher", goal="research", backstory="I am a researcher")],
memory=True,
short_term_memory=short_term_memory,
long_term_memory=long_term_memory,
entity_memory=entity_memory,
memory_config={"user_memory": user_memory},
)
```
### 2. Pass Custom Storage through Memory Config
You can also provide custom storage implementations through the `memory_config` parameter:
```python
from crewai import Crew, Agent
# Create a crew with custom storage in memory_config
crew = Crew(
agents=[Agent(role="researcher", goal="research", backstory="I am a researcher")],
memory=True,
memory_config={
"storage": {
"short_term": CustomStorage(),
"long_term": CustomStorage(),
"entity": CustomStorage(),
"user": CustomStorage(),
}
},
)
```
## Example: Redis Storage
Here's an example of a custom storage implementation using Redis:
```python
import json
import redis
from typing import Any, Dict, List
from crewai.memory.storage.interface import Storage
class RedisStorage(Storage):
"""Redis-based storage implementation."""
def __init__(self, redis_url="redis://localhost:6379/0", prefix="crewai"):
self.redis = redis.from_url(redis_url)
self.prefix = prefix
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
"""Save a value with metadata to Redis."""
key = f"{self.prefix}:{len(self.redis.keys(f'{self.prefix}:*'))}"
data = {"value": value, "metadata": metadata}
self.redis.set(key, json.dumps(data))
def search(
self, query: str, limit: int = 3, score_threshold: float = 0.35
) -> List[Any]:
"""Search for values in Redis."""
# This is a simple implementation that returns all values
# In a real implementation, you would use Redis search capabilities
results = []
for key in self.redis.keys(f"{self.prefix}:*"):
data = json.loads(self.redis.get(key))
results.append({"context": data["value"], "metadata": data["metadata"]})
if len(results) >= limit:
break
return results
def reset(self) -> None:
"""Reset the Redis storage."""
for key in self.redis.keys(f"{self.prefix}:*"):
self.redis.delete(key)
```
## Benefits of Custom Storage
Using custom storage implementations allows you to:
1. Store memory data in external databases or services
2. Implement custom search algorithms
3. Share memory between different crews or applications
4. Persist memory across application restarts
5. Implement custom memory retention policies
By extending the `Storage` interface, you can integrate CrewAI with any storage backend that suits your needs.

View File

@@ -263,7 +263,7 @@ class Crew(BaseModel):
"""Set private attributes."""
if self.memory:
self._long_term_memory = (
self.long_term_memory if self.long_term_memory else LongTermMemory()
self.long_term_memory if self.long_term_memory else LongTermMemory(crew=self, embedder_config=self.embedder)
)
self._short_term_memory = (
self.short_term_memory

View File

@@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Dict, Optional
from pydantic import PrivateAttr
@@ -17,47 +17,73 @@ class EntityMemory(Memory):
_memory_provider: Optional[str] = PrivateAttr()
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
memory_provider = None
entity_storage = None
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_provider = crew.memory_config.get("provider")
else:
memory_provider = None
storage_config = crew.memory_config.get("storage", {})
entity_storage = storage_config.get("entity")
super().__init__(
storage=storage,
embedder_config=embedder_config,
memory_provider=memory_provider
)
if memory_provider == "mem0":
if storage:
# Use the provided storage
super().__init__(storage=storage, embedder_config=embedder_config)
elif entity_storage:
# Use the storage from memory_config
super().__init__(storage=entity_storage, embedder_config=embedder_config)
elif memory_provider == "mem0":
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
storage = Mem0Storage(type="entities", crew=crew)
super().__init__(
storage=Mem0Storage(type="entities", crew=crew),
embedder_config=embedder_config,
)
else:
storage = (
storage
if storage
else RAGStorage(
# Use RAGStorage (default)
super().__init__(
storage=RAGStorage(
type="entities",
allow_reset=True,
embedder_config=embedder_config,
crew=crew,
embedder_config=embedder_config,
path=path,
)
),
embedder_config=embedder_config,
)
super().__init__(storage=storage)
self._memory_provider = memory_provider
def save(self, item: EntityMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
"""Saves an entity item into the SQLite storage."""
if self._memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
def save(
self,
value: Any,
metadata: Dict[str, Any] = None,
agent: str = None,
) -> None:
"""Saves an entity item or value into the storage."""
if isinstance(value, EntityMemoryItem):
item = value
if self.memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
# Handle regular value and metadata
super().save(value, metadata, agent)
def reset(self) -> None:
try:

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.memory import Memory
@@ -14,23 +14,85 @@ class LongTermMemory(Memory):
LongTermMemoryItem instances.
"""
def __init__(self, storage=None, path=None):
if not storage:
storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage()
super().__init__(storage=storage)
def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
metadata = item.metadata
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
task_description=item.task,
score=metadata["quality"],
metadata=metadata,
datetime=item.datetime,
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
memory_provider = None
long_term_storage = None
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_provider = crew.memory_config.get("provider")
storage_config = crew.memory_config.get("storage", {})
long_term_storage = storage_config.get("long_term")
super().__init__(
storage=storage,
embedder_config=embedder_config,
memory_provider=memory_provider
)
def search(self, task: str, latest_n: int = 3) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory"
return self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
if storage:
# Use the provided storage
super().__init__(storage=storage, embedder_config=embedder_config)
elif long_term_storage:
# Use the storage from memory_config
super().__init__(storage=long_term_storage, embedder_config=embedder_config)
elif memory_provider == "mem0":
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
super().__init__(
storage=Mem0Storage(type="long_term", crew=crew),
embedder_config=embedder_config,
)
else:
# Use LTMSQLiteStorage (default)
storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage()
super().__init__(storage=storage, embedder_config=embedder_config)
def save(
self,
value: Any,
metadata: Dict[str, Any] = None,
agent: str = None,
) -> None:
"""Saves a value into the memory."""
if isinstance(value, LongTermMemoryItem):
item = value
item_metadata = item.metadata or {}
item_metadata.update({"agent": item.agent, "expected_output": item.expected_output})
# Handle special storage types like Mem0Storage
if hasattr(self.storage, "save") and callable(getattr(self.storage, "save")) and hasattr(self.storage.save, "__code__") and "task_description" in self.storage.save.__code__.co_varnames:
self.storage.save(
task_description=item.task,
score=item_metadata.get("quality", 0),
metadata=item_metadata,
datetime=item.datetime,
)
else:
# Use standard storage interface
self.storage.save(item.task, item_metadata)
else:
# Handle regular value and metadata
super().save(value, metadata, agent)
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
) -> List[Any]:
"""Search for values in the memory."""
# Try to use the standard storage interface first
if hasattr(self.storage, "search") and callable(getattr(self.storage, "search")):
return self.storage.search(query=query, limit=limit, score_threshold=score_threshold)
# Fall back to load method for backward compatibility
elif hasattr(self.storage, "load") and callable(getattr(self.storage, "load")):
return self.storage.load(query, limit)
else:
raise AttributeError("Storage does not implement search or load method")
def reset(self) -> None:
self.storage.reset()

View File

@@ -1,6 +1,6 @@
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from pydantic import BaseModel, Field
class Memory(BaseModel):
@@ -9,8 +9,8 @@ class Memory(BaseModel):
"""
embedder_config: Optional[Dict[str, Any]] = None
storage: Any
memory_provider: Optional[str] = Field(default=None, exclude=True)
def __init__(self, storage: Any, **data: Any):
super().__init__(storage=storage, **data)

View File

@@ -19,32 +19,48 @@ class ShortTermMemory(Memory):
_memory_provider: Optional[str] = PrivateAttr()
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
memory_provider = None
short_term_storage = None
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_provider = crew.memory_config.get("provider")
else:
memory_provider = None
storage_config = crew.memory_config.get("storage", {})
short_term_storage = storage_config.get("short_term")
super().__init__(
storage=storage,
embedder_config=embedder_config,
memory_provider=memory_provider
)
if memory_provider == "mem0":
if storage:
# Use the provided storage
super().__init__(storage=storage, embedder_config=embedder_config)
elif short_term_storage:
# Use the storage from memory_config
super().__init__(storage=short_term_storage, embedder_config=embedder_config)
elif memory_provider == "mem0":
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
storage = Mem0Storage(type="short_term", crew=crew)
else:
storage = (
storage
if storage
else RAGStorage(
type="short_term",
embedder_config=embedder_config,
crew=crew,
path=path,
)
super().__init__(
storage=Mem0Storage(type="short_term", crew=crew),
embedder_config=embedder_config,
)
else:
# Use RAGStorage (default)
super().__init__(
storage=RAGStorage(
type="short_term",
crew=crew,
embedder_config=embedder_config,
path=path,
),
embedder_config=embedder_config,
)
super().__init__(storage=storage)
self._memory_provider = memory_provider
def save(
self,
@@ -53,7 +69,7 @@ class ShortTermMemory(Memory):
agent: Optional[str] = None,
) -> None:
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if self._memory_provider == "mem0":
if self.memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
super().save(value=item.data, metadata=item.metadata, agent=item.agent)

View File

@@ -11,15 +11,50 @@ class UserMemory(Memory):
MemoryItem instances.
"""
def __init__(self, crew=None):
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
def __init__(self, crew=None, embedder_config=None, storage=None, path=None, **kwargs):
memory_provider = None
user_storage = None
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_provider = crew.memory_config.get("provider")
storage_config = crew.memory_config.get("storage", {})
user_storage = storage_config.get("user")
super().__init__(
storage=storage,
embedder_config=embedder_config,
memory_provider=memory_provider
)
if storage:
# Use the provided storage
super().__init__(storage=storage, embedder_config=embedder_config)
elif user_storage:
# Use the storage from memory_config
super().__init__(storage=user_storage, embedder_config=embedder_config)
elif memory_provider == "mem0":
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
super().__init__(
storage=Mem0Storage(type="user", crew=crew),
embedder_config=embedder_config,
)
else:
# Use RAGStorage (default)
from crewai.memory.storage.rag_storage import RAGStorage
super().__init__(
storage=RAGStorage(
type="user",
crew=crew,
embedder_config=embedder_config,
path=path,
),
embedder_config=embedder_config,
)
storage = Mem0Storage(type="user", crew=crew)
super().__init__(storage)
def save(
self,
@@ -43,3 +78,9 @@ class UserMemory(Memory):
score_threshold=score_threshold,
)
return results
def reset(self) -> None:
try:
self.storage.reset()
except Exception as e:
raise Exception(f"An error occurred while resetting the user memory: {e}")

View File

@@ -20,7 +20,7 @@ def test_save_and_search(long_term_memory):
metadata={"task": "test_task", "quality": 0.5},
)
long_term_memory.save(memory)
find = long_term_memory.search("test_task", latest_n=5)[0]
find = long_term_memory.search(query="test_task", limit=5)[0]
assert find["score"] == 0.5
assert find["datetime"] == "test_datetime"
assert find["metadata"]["agent"] == "test_agent"

View File

@@ -0,0 +1,140 @@
import pytest
from typing import Any, Dict, List
from crewai.crew import Crew
from crewai.agent import Agent
from crewai.memory.storage.interface import Storage
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.user.user_memory import UserMemory
class CustomStorage(Storage):
"""Custom storage implementation for testing."""
def __init__(self):
self.data = []
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
self.data.append({"value": value, "metadata": metadata})
def search(
self, query: str, limit: int = 3, score_threshold: float = 0.35
) -> List[Any]:
return [{"context": item["value"], "metadata": item["metadata"]} for item in self.data]
def reset(self) -> None:
self.data = []
def test_custom_storage_with_short_term_memory():
"""Test that custom storage works with short term memory."""
custom_storage = CustomStorage()
memory = ShortTermMemory(storage=custom_storage)
memory.save("test value", {"key": "value"})
results = memory.search("test")
assert len(results) > 0
assert results[0]["context"] == "test value"
assert results[0]["metadata"]["key"] == "value"
def test_custom_storage_with_long_term_memory():
"""Test that custom storage works with long term memory."""
custom_storage = CustomStorage()
memory = LongTermMemory(storage=custom_storage)
memory.save("test value", {"key": "value"})
results = memory.search("test")
assert len(results) > 0
assert results[0]["context"] == "test value"
assert results[0]["metadata"]["key"] == "value"
def test_custom_storage_with_entity_memory():
"""Test that custom storage works with entity memory."""
custom_storage = CustomStorage()
memory = EntityMemory(storage=custom_storage)
memory.save("test value", {"key": "value"})
results = memory.search("test")
assert len(results) > 0
assert results[0]["context"] == "test value"
assert results[0]["metadata"]["key"] == "value"
def test_custom_storage_with_user_memory():
"""Test that custom storage works with user memory."""
custom_storage = CustomStorage()
memory = UserMemory(storage=custom_storage)
memory.save("test value", {"key": "value"})
results = memory.search("test")
assert len(results) > 0
# UserMemory prepends "Remember the details about the user: " to the value
assert "test value" in results[0]["context"]
assert results[0]["metadata"]["key"] == "value"
def test_custom_storage_with_crew():
"""Test that custom storage works with crew."""
short_term_storage = CustomStorage()
long_term_storage = CustomStorage()
entity_storage = CustomStorage()
user_storage = CustomStorage()
# Create memory instances with custom storage
short_term_memory = ShortTermMemory(storage=short_term_storage)
long_term_memory = LongTermMemory(storage=long_term_storage)
entity_memory = EntityMemory(storage=entity_storage)
user_memory = UserMemory(storage=user_storage)
# Create a crew with custom memory instances
crew = Crew(
agents=[Agent(role="test", goal="test", backstory="test")],
memory=True,
short_term_memory=short_term_memory,
long_term_memory=long_term_memory,
entity_memory=entity_memory,
memory_config={"user_memory": user_memory},
)
# Test that the crew has the custom memory instances
assert crew._short_term_memory.storage == short_term_storage
assert crew._long_term_memory.storage == long_term_storage
assert crew._entity_memory.storage == entity_storage
assert crew._user_memory.storage == user_storage
def test_custom_storage_with_memory_config():
"""Test that custom storage works with memory_config."""
short_term_storage = CustomStorage()
long_term_storage = CustomStorage()
entity_storage = CustomStorage()
user_storage = CustomStorage()
# Create a crew with custom storage in memory_config
crew = Crew(
agents=[Agent(role="test", goal="test", backstory="test")],
memory=True,
memory_config={
"storage": {
"short_term": short_term_storage,
"long_term": long_term_storage,
"entity": entity_storage,
"user": user_storage,
},
"user_memory": {} # Enable user memory
},
)
# Test that the crew has the custom storage instances
assert crew._short_term_memory.storage == short_term_storage
assert crew._long_term_memory.storage == long_term_storage
assert crew._entity_memory.storage == entity_storage
assert crew._user_memory.storage == user_storage