Changed Mem0 Storage v1.1 -> v2 (#2893)

* Changed v1.1 -> v2

* Fixed Test Cases:

* Fixed linting issues

* Changed docs

* Refractored the storage

* Fixed test cases

* Fixing run-time checks

* Fixed Test Case

* Updated docs and added test case for custom categories

* Add the TODO back

* Minor Changes

* Added output_format in search

* Minor changes

* Added output_format and version in both search and save

* Small change

* Minor bugs

* Fixed test cases

* Changed docs

---------

Co-authored-by: Lucas Gomide <lucaslg200@gmail.com>
This commit is contained in:
Vidit Ostwal
2025-07-23 18:00:52 +05:30
committed by GitHub
parent 9a65573955
commit 30541239ad
3 changed files with 187 additions and 123 deletions

View File

@@ -4,7 +4,6 @@ from typing import Any, Dict, List
from mem0 import Memory, MemoryClient
from crewai.memory.storage.interface import Storage
from crewai.utilities.chromadb import sanitize_collection_name
MAX_AGENT_ID_LENGTH_MEM0 = 255
@@ -13,137 +12,150 @@ class Mem0Storage(Storage):
"""
Extends Storage to handle embedding and searching across entities using Mem0.
"""
def __init__(self, type, crew=None, config=None):
super().__init__()
supported_types = ["user", "short_term", "long_term", "entities", "external"]
if type not in supported_types:
raise ValueError(
f"Invalid type '{type}' for Mem0Storage. Must be one of: "
+ ", ".join(supported_types)
)
self._validate_type(type)
self.memory_type = type
self.crew = crew
self.config = config or {}
# TODO: Memory config will be removed in the future the config will be passed as a parameter
self.memory_config = self.config or getattr(crew, "memory_config", {}) or {}
# User ID is required for user memory type "user" since it's used as a unique identifier for the user.
user_id = self._get_user_id()
if type == "user" and not user_id:
# TODO: Memory config will be removed in the future the config will be passed as a parameter
self.config = config or getattr(crew, "memory_config", {}).get("config", {}) or {}
self._validate_user_id()
self._extract_config_values()
self._initialize_memory()
def _validate_type(self, type):
supported_types = {"user", "short_term", "long_term", "entities", "external"}
if type not in supported_types:
raise ValueError(
f"Invalid type '{type}' for Mem0Storage. Must be one of: {', '.join(supported_types)}"
)
def _validate_user_id(self):
if self.memory_type == "user" and not self.config.get("user_id", ""):
raise ValueError("User ID is required for user memory type")
# API key in memory config overrides the environment variable
config = self._get_config()
mem0_api_key = config.get("api_key") or os.getenv("MEM0_API_KEY")
mem0_org_id = config.get("org_id")
mem0_project_id = config.get("project_id")
mem0_local_config = config.get("local_mem0_config")
def _extract_config_values(self):
cfg = self.config
self.mem0_run_id = cfg.get("run_id")
self.includes = cfg.get("includes")
self.excludes = cfg.get("excludes")
self.custom_categories = cfg.get("custom_categories")
self.infer = cfg.get("infer", False)
# Initialize MemoryClient or Memory based on the presence of the mem0_api_key
if mem0_api_key:
if mem0_org_id and mem0_project_id:
self.memory = MemoryClient(
api_key=mem0_api_key, org_id=mem0_org_id, project_id=mem0_project_id
)
else:
self.memory = MemoryClient(api_key=mem0_api_key)
def _initialize_memory(self):
api_key = self.config.get("api_key") or os.getenv("MEM0_API_KEY")
org_id = self.config.get("org_id")
project_id = self.config.get("project_id")
local_config = self.config.get("local_mem0_config")
if api_key:
self.memory = (
MemoryClient(api_key=api_key, org_id=org_id, project_id=project_id)
if org_id and project_id
else MemoryClient(api_key=api_key)
)
if self.custom_categories:
self.memory.update_project(custom_categories=self.custom_categories)
else:
if mem0_local_config and len(mem0_local_config):
self.memory = Memory.from_config(mem0_local_config)
else:
self.memory = Memory()
self.memory = (
Memory.from_config(local_config)
if local_config and len(local_config)
else Memory()
)
def _sanitize_role(self, role: str) -> str:
def _create_filter_for_search(self):
"""
Sanitizes agent roles to ensure valid directory names.
Returns:
dict: A filter dictionary containing AND conditions for querying data.
- Includes user_id if memory_type is 'external'.
- Includes run_id if memory_type is 'short_term' and mem0_run_id is present.
"""
return role.replace("\n", "").replace(" ", "_").replace("/", "_")
filter = {
"AND": []
}
# Add user_id condition if the memory type is external
if self.memory_type == "external":
filter["AND"].append({"user_id": self.config.get("user_id", "")})
# Add run_id condition if the memory type is short_term and a run ID is set
if self.memory_type == "short_term" and self.mem0_run_id:
filter["AND"].append({"run_id": self.mem0_run_id})
return filter
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
user_id = self._get_user_id()
agent_name = self._get_agent_name()
user_id = self.config.get("user_id", "")
assistant_message = [{"role" : "assistant","content" : value}]
params = None
if self.memory_type == "short_term":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "short_term", **metadata},
}
elif self.memory_type == "long_term":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "long_term", **metadata},
}
elif self.memory_type == "entities":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "entity", **metadata},
}
elif self.memory_type == "external":
params = {
"user_id": user_id,
"agent_id": agent_name,
"metadata": {"type": "external", **metadata},
}
if params:
if isinstance(self.memory, MemoryClient):
params["output_format"] = "v1.1"
self.memory.add(assistant_message, **params)
base_metadata = {
"short_term": "short_term",
"long_term": "long_term",
"entities": "entity",
"external": "external"
}
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
) -> List[Any]:
params = {"query": query, "limit": limit, "output_format": "v1.1"}
if user_id := self._get_user_id():
# Shared base params
params: dict[str, Any] = {
"metadata": {"type": base_metadata[self.memory_type], **metadata},
"infer": self.infer
}
if self.memory_type == "external":
params["user_id"] = user_id
agent_name = self._get_agent_name()
if self.memory_type == "short_term":
params["agent_id"] = agent_name
params["metadata"] = {"type": "short_term"}
elif self.memory_type == "long_term":
params["agent_id"] = agent_name
params["metadata"] = {"type": "long_term"}
elif self.memory_type == "entities":
params["agent_id"] = agent_name
params["metadata"] = {"type": "entity"}
elif self.memory_type == "external":
params["agent_id"] = agent_name
params["metadata"] = {"type": "external"}
if params:
# MemoryClient-specific overrides
if isinstance(self.memory, MemoryClient):
params["includes"] = self.includes
params["excludes"] = self.excludes
params["output_format"] = "v1.1"
params["version"]="v2"
if self.memory_type == "short_term":
params["run_id"] = self.mem0_run_id
self.memory.add(assistant_message, **params)
def search(self,query: str,limit: int = 3,score_threshold: float = 0.35) -> List[Any]:
params = {
"query": query,
"limit": limit,
"version": "v2",
"output_format": "v1.1"
}
if user_id := self.config.get("user_id", ""):
params["user_id"] = user_id
memory_type_map = {
"short_term": {"type": "short_term"},
"long_term": {"type": "long_term"},
"entities": {"type": "entity"},
"external": {"type": "external"},
}
if self.memory_type in memory_type_map:
params["metadata"] = memory_type_map[self.memory_type]
if self.memory_type == "short_term":
params["run_id"] = self.mem0_run_id
# Discard the filters for now since we create the filters
# automatically when the crew is created.
params["filters"] = self._create_filter_for_search()
params['threshold'] = score_threshold
if isinstance(self.memory, Memory):
del params["metadata"], params["output_format"]
del params["metadata"], params["version"], params["run_id"], params['output_format']
results = self.memory.search(**params)
return [r for r in results["results"] if r["score"] >= score_threshold]
def _get_user_id(self) -> str:
return self._get_config().get("user_id", "")
def _get_agent_name(self) -> str:
if not self.crew:
return ""
agents = self.crew.agents
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = "_".join(agents)
return sanitize_collection_name(name=agents,max_collection_length=MAX_AGENT_ID_LENGTH_MEM0)
def _get_config(self) -> Dict[str, Any]:
return self.config or getattr(self, "memory_config", {}).get("config", {}) or {}
return [r for r in results["results"]]
def reset(self):
if self.memory:
self.memory.reset()