mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 07:38:29 +00:00
* fix: surfacing properly supported types by Mem0Storage * feat: prepare Mem0Storage to accept config paramenter We're planning to remove `memory_config` soon. This commit kindly prepare this storage to accept the config provided directly * feat: add external memory * fix: cleanup Mem0 warning while adding messages to the memory * feat: support set the current crew in memory This can be useful when a memory is initialized before the crew, but the crew might still be a very relevant attribute * fix: allow to reset only an external_memory from crew * test: add external memory test * test: ensure the config takes precedence over memory_config when setting mem0 * fix: support to provide a custom storage to External Memory * docs: add docs about external memory * chore: add warning messages about the deprecation of UserMemory * fix: fix typing check --------- Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
59 lines
1.8 KiB
Python
59 lines
1.8 KiB
Python
import warnings
|
|
from typing import Any, Dict, Optional
|
|
|
|
from crewai.memory.memory import Memory
|
|
|
|
|
|
class UserMemory(Memory):
|
|
"""
|
|
UserMemory class for handling user memory storage and retrieval.
|
|
Inherits from the Memory class and utilizes an instance of a class that
|
|
adheres to the Storage for data storage, specifically working with
|
|
MemoryItem instances.
|
|
"""
|
|
|
|
def __init__(self, crew=None):
|
|
warnings.warn(
|
|
"UserMemory is deprecated and will be removed in a future version. "
|
|
"Please use ExternalMemory instead.",
|
|
DeprecationWarning,
|
|
stacklevel=2,
|
|
)
|
|
try:
|
|
from crewai.memory.storage.mem0_storage import Mem0Storage
|
|
except ImportError:
|
|
raise ImportError(
|
|
"Mem0 is not installed. Please install it with `pip install mem0ai`."
|
|
)
|
|
storage = Mem0Storage(type="user", crew=crew)
|
|
super().__init__(storage)
|
|
|
|
def save(
|
|
self,
|
|
value,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
agent: Optional[str] = None,
|
|
) -> None:
|
|
# TODO: Change this function since we want to take care of the case where we save memories for the usr
|
|
data = f"Remember the details about the user: {value}"
|
|
super().save(data, metadata)
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
limit: int = 3,
|
|
score_threshold: float = 0.35,
|
|
):
|
|
results = self.storage.search(
|
|
query=query,
|
|
limit=limit,
|
|
score_threshold=score_threshold,
|
|
)
|
|
return results
|
|
|
|
def reset(self) -> None:
|
|
try:
|
|
self.storage.reset()
|
|
except Exception as e:
|
|
raise Exception(f"An error occurred while resetting the user memory: {e}")
|