From 583ac5711f153975464261333e9b1178930fba52 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 4 Mar 2025 19:50:54 +0000 Subject: [PATCH] Add support for custom memory storage implementations (fixes #2278) Co-Authored-By: Joe Moura --- docs/concepts/custom_memory_storage.mdx | 151 ++++++++++++++++++ src/crewai/crew.py | 2 +- src/crewai/memory/entity/entity_memory.py | 76 ++++++--- .../memory/long_term/long_term_memory.py | 94 +++++++++-- src/crewai/memory/memory.py | 4 +- .../memory/short_term/short_term_memory.py | 50 ++++-- src/crewai/memory/user/user_memory.py | 57 ++++++- tests/memory/long_term_memory_test.py | 2 +- tests/memory/test_custom_storage.py | 140 ++++++++++++++++ 9 files changed, 506 insertions(+), 70 deletions(-) create mode 100644 docs/concepts/custom_memory_storage.mdx create mode 100644 tests/memory/test_custom_storage.py diff --git a/docs/concepts/custom_memory_storage.mdx b/docs/concepts/custom_memory_storage.mdx new file mode 100644 index 000000000..f483d02ff --- /dev/null +++ b/docs/concepts/custom_memory_storage.mdx @@ -0,0 +1,151 @@ +# Custom Memory Storage + +CrewAI supports custom memory storage implementations for different memory types. You can provide your own storage implementation by extending the `Storage` interface and passing it to the memory instances or through the `memory_config` parameter. + +## Implementing a Custom Storage + +To create a custom storage implementation, you need to extend the `Storage` interface and implement the required methods: + +```python +from typing import Any, Dict, List +from crewai.memory.storage.interface import Storage + +class CustomStorage(Storage): + """Custom storage implementation.""" + + def __init__(self): + # Initialize your storage backend + self.data = [] + + def save(self, value: Any, metadata: Dict[str, Any]) -> None: + """Save a value with metadata to the storage.""" + # Implement your save logic + self.data.append({"value": value, "metadata": metadata}) + + def search( + self, query: str, limit: int = 3, score_threshold: float = 0.35 + ) -> List[Any]: + """Search for values in the storage.""" + # Implement your search logic + return [{"context": item["value"], "metadata": item["metadata"]} for item in self.data] + + def reset(self) -> None: + """Reset the storage.""" + # Implement your reset logic + self.data = [] +``` + +## Using Custom Storage + +There are two ways to provide custom storage implementations to CrewAI: + +### 1. Pass Custom Storage to Memory Instances + +You can create memory instances with custom storage and pass them to the Crew: + +```python +from crewai import Crew, Agent +from crewai.memory.short_term.short_term_memory import ShortTermMemory +from crewai.memory.long_term.long_term_memory import LongTermMemory +from crewai.memory.entity.entity_memory import EntityMemory +from crewai.memory.user.user_memory import UserMemory + +# Create custom storage instances +short_term_storage = CustomStorage() +long_term_storage = CustomStorage() +entity_storage = CustomStorage() +user_storage = CustomStorage() + +# Create memory instances with custom storage +short_term_memory = ShortTermMemory(storage=short_term_storage) +long_term_memory = LongTermMemory(storage=long_term_storage) +entity_memory = EntityMemory(storage=entity_storage) +user_memory = UserMemory(storage=user_storage) + +# Create a crew with custom memory instances +crew = Crew( + agents=[Agent(role="researcher", goal="research", backstory="I am a researcher")], + memory=True, + short_term_memory=short_term_memory, + long_term_memory=long_term_memory, + entity_memory=entity_memory, + memory_config={"user_memory": user_memory}, +) +``` + +### 2. Pass Custom Storage through Memory Config + +You can also provide custom storage implementations through the `memory_config` parameter: + +```python +from crewai import Crew, Agent + +# Create a crew with custom storage in memory_config +crew = Crew( + agents=[Agent(role="researcher", goal="research", backstory="I am a researcher")], + memory=True, + memory_config={ + "storage": { + "short_term": CustomStorage(), + "long_term": CustomStorage(), + "entity": CustomStorage(), + "user": CustomStorage(), + } + }, +) +``` + +## Example: Redis Storage + +Here's an example of a custom storage implementation using Redis: + +```python +import json +import redis +from typing import Any, Dict, List +from crewai.memory.storage.interface import Storage + +class RedisStorage(Storage): + """Redis-based storage implementation.""" + + def __init__(self, redis_url="redis://localhost:6379/0", prefix="crewai"): + self.redis = redis.from_url(redis_url) + self.prefix = prefix + + def save(self, value: Any, metadata: Dict[str, Any]) -> None: + """Save a value with metadata to Redis.""" + key = f"{self.prefix}:{len(self.redis.keys(f'{self.prefix}:*'))}" + data = {"value": value, "metadata": metadata} + self.redis.set(key, json.dumps(data)) + + def search( + self, query: str, limit: int = 3, score_threshold: float = 0.35 + ) -> List[Any]: + """Search for values in Redis.""" + # This is a simple implementation that returns all values + # In a real implementation, you would use Redis search capabilities + results = [] + for key in self.redis.keys(f"{self.prefix}:*"): + data = json.loads(self.redis.get(key)) + results.append({"context": data["value"], "metadata": data["metadata"]}) + if len(results) >= limit: + break + return results + + def reset(self) -> None: + """Reset the Redis storage.""" + for key in self.redis.keys(f"{self.prefix}:*"): + self.redis.delete(key) +``` + +## Benefits of Custom Storage + +Using custom storage implementations allows you to: + +1. Store memory data in external databases or services +2. Implement custom search algorithms +3. Share memory between different crews or applications +4. Persist memory across application restarts +5. Implement custom memory retention policies + +By extending the `Storage` interface, you can integrate CrewAI with any storage backend that suits your needs. diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 9cecfed3a..91d37c720 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -263,7 +263,7 @@ class Crew(BaseModel): """Set private attributes.""" if self.memory: self._long_term_memory = ( - self.long_term_memory if self.long_term_memory else LongTermMemory() + self.long_term_memory if self.long_term_memory else LongTermMemory(crew=self, embedder_config=self.embedder) ) self._short_term_memory = ( self.short_term_memory diff --git a/src/crewai/memory/entity/entity_memory.py b/src/crewai/memory/entity/entity_memory.py index 264b64103..bba6a6774 100644 --- a/src/crewai/memory/entity/entity_memory.py +++ b/src/crewai/memory/entity/entity_memory.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Any, Dict, Optional from pydantic import PrivateAttr @@ -17,47 +17,73 @@ class EntityMemory(Memory): _memory_provider: Optional[str] = PrivateAttr() def __init__(self, crew=None, embedder_config=None, storage=None, path=None): + memory_provider = None + entity_storage = None + if crew and hasattr(crew, "memory_config") and crew.memory_config is not None: memory_provider = crew.memory_config.get("provider") - else: - memory_provider = None + storage_config = crew.memory_config.get("storage", {}) + entity_storage = storage_config.get("entity") + + super().__init__( + storage=storage, + embedder_config=embedder_config, + memory_provider=memory_provider + ) - if memory_provider == "mem0": + if storage: + # Use the provided storage + super().__init__(storage=storage, embedder_config=embedder_config) + elif entity_storage: + # Use the storage from memory_config + super().__init__(storage=entity_storage, embedder_config=embedder_config) + elif memory_provider == "mem0": try: from crewai.memory.storage.mem0_storage import Mem0Storage except ImportError: raise ImportError( "Mem0 is not installed. Please install it with `pip install mem0ai`." ) - storage = Mem0Storage(type="entities", crew=crew) + super().__init__( + storage=Mem0Storage(type="entities", crew=crew), + embedder_config=embedder_config, + ) else: - storage = ( - storage - if storage - else RAGStorage( + # Use RAGStorage (default) + super().__init__( + storage=RAGStorage( type="entities", allow_reset=True, - embedder_config=embedder_config, crew=crew, + embedder_config=embedder_config, path=path, - ) + ), + embedder_config=embedder_config, ) + - super().__init__(storage=storage) - self._memory_provider = memory_provider - - def save(self, item: EntityMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory" - """Saves an entity item into the SQLite storage.""" - if self._memory_provider == "mem0": - data = f""" - Remember details about the following entity: - Name: {item.name} - Type: {item.type} - Entity Description: {item.description} - """ + def save( + self, + value: Any, + metadata: Dict[str, Any] = None, + agent: str = None, + ) -> None: + """Saves an entity item or value into the storage.""" + if isinstance(value, EntityMemoryItem): + item = value + if self.memory_provider == "mem0": + data = f""" + Remember details about the following entity: + Name: {item.name} + Type: {item.type} + Entity Description: {item.description} + """ + else: + data = f"{item.name}({item.type}): {item.description}" + super().save(data, item.metadata) else: - data = f"{item.name}({item.type}): {item.description}" - super().save(data, item.metadata) + # Handle regular value and metadata + super().save(value, metadata, agent) def reset(self) -> None: try: diff --git a/src/crewai/memory/long_term/long_term_memory.py b/src/crewai/memory/long_term/long_term_memory.py index 94aac3a97..3a68b83b7 100644 --- a/src/crewai/memory/long_term/long_term_memory.py +++ b/src/crewai/memory/long_term/long_term_memory.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem from crewai.memory.memory import Memory @@ -14,23 +14,85 @@ class LongTermMemory(Memory): LongTermMemoryItem instances. """ - def __init__(self, storage=None, path=None): - if not storage: - storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage() - super().__init__(storage=storage) - - def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory" - metadata = item.metadata - metadata.update({"agent": item.agent, "expected_output": item.expected_output}) - self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage" - task_description=item.task, - score=metadata["quality"], - metadata=metadata, - datetime=item.datetime, + def __init__(self, crew=None, embedder_config=None, storage=None, path=None): + memory_provider = None + long_term_storage = None + + if crew and hasattr(crew, "memory_config") and crew.memory_config is not None: + memory_provider = crew.memory_config.get("provider") + storage_config = crew.memory_config.get("storage", {}) + long_term_storage = storage_config.get("long_term") + + super().__init__( + storage=storage, + embedder_config=embedder_config, + memory_provider=memory_provider ) - def search(self, task: str, latest_n: int = 3) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory" - return self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load" + if storage: + # Use the provided storage + super().__init__(storage=storage, embedder_config=embedder_config) + elif long_term_storage: + # Use the storage from memory_config + super().__init__(storage=long_term_storage, embedder_config=embedder_config) + elif memory_provider == "mem0": + try: + from crewai.memory.storage.mem0_storage import Mem0Storage + except ImportError: + raise ImportError( + "Mem0 is not installed. Please install it with `pip install mem0ai`." + ) + super().__init__( + storage=Mem0Storage(type="long_term", crew=crew), + embedder_config=embedder_config, + ) + else: + # Use LTMSQLiteStorage (default) + storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage() + super().__init__(storage=storage, embedder_config=embedder_config) + + def save( + self, + value: Any, + metadata: Dict[str, Any] = None, + agent: str = None, + ) -> None: + """Saves a value into the memory.""" + if isinstance(value, LongTermMemoryItem): + item = value + item_metadata = item.metadata or {} + item_metadata.update({"agent": item.agent, "expected_output": item.expected_output}) + + # Handle special storage types like Mem0Storage + if hasattr(self.storage, "save") and callable(getattr(self.storage, "save")) and hasattr(self.storage.save, "__code__") and "task_description" in self.storage.save.__code__.co_varnames: + self.storage.save( + task_description=item.task, + score=item_metadata.get("quality", 0), + metadata=item_metadata, + datetime=item.datetime, + ) + else: + # Use standard storage interface + self.storage.save(item.task, item_metadata) + else: + # Handle regular value and metadata + super().save(value, metadata, agent) + + def search( + self, + query: str, + limit: int = 3, + score_threshold: float = 0.35, + ) -> List[Any]: + """Search for values in the memory.""" + # Try to use the standard storage interface first + if hasattr(self.storage, "search") and callable(getattr(self.storage, "search")): + return self.storage.search(query=query, limit=limit, score_threshold=score_threshold) + # Fall back to load method for backward compatibility + elif hasattr(self.storage, "load") and callable(getattr(self.storage, "load")): + return self.storage.load(query, limit) + else: + raise AttributeError("Storage does not implement search or load method") def reset(self) -> None: self.storage.reset() diff --git a/src/crewai/memory/memory.py b/src/crewai/memory/memory.py index 9a362a512..d81ae21d3 100644 --- a/src/crewai/memory/memory.py +++ b/src/crewai/memory/memory.py @@ -1,6 +1,6 @@ from typing import Any, Dict, List, Optional -from pydantic import BaseModel +from pydantic import BaseModel, Field class Memory(BaseModel): @@ -9,8 +9,8 @@ class Memory(BaseModel): """ embedder_config: Optional[Dict[str, Any]] = None - storage: Any + memory_provider: Optional[str] = Field(default=None, exclude=True) def __init__(self, storage: Any, **data: Any): super().__init__(storage=storage, **data) diff --git a/src/crewai/memory/short_term/short_term_memory.py b/src/crewai/memory/short_term/short_term_memory.py index b7581f400..b0dad4155 100644 --- a/src/crewai/memory/short_term/short_term_memory.py +++ b/src/crewai/memory/short_term/short_term_memory.py @@ -19,32 +19,48 @@ class ShortTermMemory(Memory): _memory_provider: Optional[str] = PrivateAttr() def __init__(self, crew=None, embedder_config=None, storage=None, path=None): + memory_provider = None + short_term_storage = None + if crew and hasattr(crew, "memory_config") and crew.memory_config is not None: memory_provider = crew.memory_config.get("provider") - else: - memory_provider = None + storage_config = crew.memory_config.get("storage", {}) + short_term_storage = storage_config.get("short_term") + + super().__init__( + storage=storage, + embedder_config=embedder_config, + memory_provider=memory_provider + ) - if memory_provider == "mem0": + if storage: + # Use the provided storage + super().__init__(storage=storage, embedder_config=embedder_config) + elif short_term_storage: + # Use the storage from memory_config + super().__init__(storage=short_term_storage, embedder_config=embedder_config) + elif memory_provider == "mem0": try: from crewai.memory.storage.mem0_storage import Mem0Storage except ImportError: raise ImportError( "Mem0 is not installed. Please install it with `pip install mem0ai`." ) - storage = Mem0Storage(type="short_term", crew=crew) - else: - storage = ( - storage - if storage - else RAGStorage( - type="short_term", - embedder_config=embedder_config, - crew=crew, - path=path, - ) + super().__init__( + storage=Mem0Storage(type="short_term", crew=crew), + embedder_config=embedder_config, + ) + else: + # Use RAGStorage (default) + super().__init__( + storage=RAGStorage( + type="short_term", + crew=crew, + embedder_config=embedder_config, + path=path, + ), + embedder_config=embedder_config, ) - super().__init__(storage=storage) - self._memory_provider = memory_provider def save( self, @@ -53,7 +69,7 @@ class ShortTermMemory(Memory): agent: Optional[str] = None, ) -> None: item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent) - if self._memory_provider == "mem0": + if self.memory_provider == "mem0": item.data = f"Remember the following insights from Agent run: {item.data}" super().save(value=item.data, metadata=item.metadata, agent=item.agent) diff --git a/src/crewai/memory/user/user_memory.py b/src/crewai/memory/user/user_memory.py index 24e5fe035..6d81b31ff 100644 --- a/src/crewai/memory/user/user_memory.py +++ b/src/crewai/memory/user/user_memory.py @@ -11,15 +11,50 @@ class UserMemory(Memory): MemoryItem instances. """ - def __init__(self, crew=None): - try: - from crewai.memory.storage.mem0_storage import Mem0Storage - except ImportError: - raise ImportError( - "Mem0 is not installed. Please install it with `pip install mem0ai`." + def __init__(self, crew=None, embedder_config=None, storage=None, path=None, **kwargs): + memory_provider = None + user_storage = None + + if crew and hasattr(crew, "memory_config") and crew.memory_config is not None: + memory_provider = crew.memory_config.get("provider") + storage_config = crew.memory_config.get("storage", {}) + user_storage = storage_config.get("user") + + super().__init__( + storage=storage, + embedder_config=embedder_config, + memory_provider=memory_provider + ) + + if storage: + # Use the provided storage + super().__init__(storage=storage, embedder_config=embedder_config) + elif user_storage: + # Use the storage from memory_config + super().__init__(storage=user_storage, embedder_config=embedder_config) + elif memory_provider == "mem0": + try: + from crewai.memory.storage.mem0_storage import Mem0Storage + except ImportError: + raise ImportError( + "Mem0 is not installed. Please install it with `pip install mem0ai`." + ) + super().__init__( + storage=Mem0Storage(type="user", crew=crew), + embedder_config=embedder_config, + ) + else: + # Use RAGStorage (default) + from crewai.memory.storage.rag_storage import RAGStorage + super().__init__( + storage=RAGStorage( + type="user", + crew=crew, + embedder_config=embedder_config, + path=path, + ), + embedder_config=embedder_config, ) - storage = Mem0Storage(type="user", crew=crew) - super().__init__(storage) def save( self, @@ -43,3 +78,9 @@ class UserMemory(Memory): score_threshold=score_threshold, ) return results + + def reset(self) -> None: + try: + self.storage.reset() + except Exception as e: + raise Exception(f"An error occurred while resetting the user memory: {e}") diff --git a/tests/memory/long_term_memory_test.py b/tests/memory/long_term_memory_test.py index 3639054e3..9bb81ee8e 100644 --- a/tests/memory/long_term_memory_test.py +++ b/tests/memory/long_term_memory_test.py @@ -20,7 +20,7 @@ def test_save_and_search(long_term_memory): metadata={"task": "test_task", "quality": 0.5}, ) long_term_memory.save(memory) - find = long_term_memory.search("test_task", latest_n=5)[0] + find = long_term_memory.search(query="test_task", limit=5)[0] assert find["score"] == 0.5 assert find["datetime"] == "test_datetime" assert find["metadata"]["agent"] == "test_agent" diff --git a/tests/memory/test_custom_storage.py b/tests/memory/test_custom_storage.py new file mode 100644 index 000000000..dd632381d --- /dev/null +++ b/tests/memory/test_custom_storage.py @@ -0,0 +1,140 @@ +import pytest +from typing import Any, Dict, List + +from crewai.crew import Crew +from crewai.agent import Agent +from crewai.memory.storage.interface import Storage +from crewai.memory.short_term.short_term_memory import ShortTermMemory +from crewai.memory.long_term.long_term_memory import LongTermMemory +from crewai.memory.entity.entity_memory import EntityMemory +from crewai.memory.user.user_memory import UserMemory + + +class CustomStorage(Storage): + """Custom storage implementation for testing.""" + + def __init__(self): + self.data = [] + + def save(self, value: Any, metadata: Dict[str, Any]) -> None: + self.data.append({"value": value, "metadata": metadata}) + + def search( + self, query: str, limit: int = 3, score_threshold: float = 0.35 + ) -> List[Any]: + return [{"context": item["value"], "metadata": item["metadata"]} for item in self.data] + + def reset(self) -> None: + self.data = [] + + +def test_custom_storage_with_short_term_memory(): + """Test that custom storage works with short term memory.""" + custom_storage = CustomStorage() + memory = ShortTermMemory(storage=custom_storage) + + memory.save("test value", {"key": "value"}) + results = memory.search("test") + + assert len(results) > 0 + assert results[0]["context"] == "test value" + assert results[0]["metadata"]["key"] == "value" + + +def test_custom_storage_with_long_term_memory(): + """Test that custom storage works with long term memory.""" + custom_storage = CustomStorage() + memory = LongTermMemory(storage=custom_storage) + + memory.save("test value", {"key": "value"}) + results = memory.search("test") + + assert len(results) > 0 + assert results[0]["context"] == "test value" + assert results[0]["metadata"]["key"] == "value" + + +def test_custom_storage_with_entity_memory(): + """Test that custom storage works with entity memory.""" + custom_storage = CustomStorage() + memory = EntityMemory(storage=custom_storage) + + memory.save("test value", {"key": "value"}) + results = memory.search("test") + + assert len(results) > 0 + assert results[0]["context"] == "test value" + assert results[0]["metadata"]["key"] == "value" + + +def test_custom_storage_with_user_memory(): + """Test that custom storage works with user memory.""" + custom_storage = CustomStorage() + memory = UserMemory(storage=custom_storage) + + memory.save("test value", {"key": "value"}) + results = memory.search("test") + + assert len(results) > 0 + # UserMemory prepends "Remember the details about the user: " to the value + assert "test value" in results[0]["context"] + assert results[0]["metadata"]["key"] == "value" + + +def test_custom_storage_with_crew(): + """Test that custom storage works with crew.""" + short_term_storage = CustomStorage() + long_term_storage = CustomStorage() + entity_storage = CustomStorage() + user_storage = CustomStorage() + + # Create memory instances with custom storage + short_term_memory = ShortTermMemory(storage=short_term_storage) + long_term_memory = LongTermMemory(storage=long_term_storage) + entity_memory = EntityMemory(storage=entity_storage) + user_memory = UserMemory(storage=user_storage) + + # Create a crew with custom memory instances + crew = Crew( + agents=[Agent(role="test", goal="test", backstory="test")], + memory=True, + short_term_memory=short_term_memory, + long_term_memory=long_term_memory, + entity_memory=entity_memory, + memory_config={"user_memory": user_memory}, + ) + + # Test that the crew has the custom memory instances + assert crew._short_term_memory.storage == short_term_storage + assert crew._long_term_memory.storage == long_term_storage + assert crew._entity_memory.storage == entity_storage + assert crew._user_memory.storage == user_storage + + +def test_custom_storage_with_memory_config(): + """Test that custom storage works with memory_config.""" + short_term_storage = CustomStorage() + long_term_storage = CustomStorage() + entity_storage = CustomStorage() + user_storage = CustomStorage() + + # Create a crew with custom storage in memory_config + crew = Crew( + agents=[Agent(role="test", goal="test", backstory="test")], + memory=True, + memory_config={ + "storage": { + "short_term": short_term_storage, + "long_term": long_term_storage, + "entity": entity_storage, + "user": user_storage, + }, + "user_memory": {} # Enable user memory + }, + ) + + # Test that the crew has the custom storage instances + assert crew._short_term_memory.storage == short_term_storage + assert crew._long_term_memory.storage == long_term_storage + assert crew._entity_memory.storage == entity_storage + assert crew._user_memory.storage == user_storage