fix: support to add memories to Mem0 with agent_id (#3217)

* fix: support to add memories to Mem0 with agent_id

* feat: removing memory_type checkings from Mem0Storage

* feat: ensure agent_id is always present while saving memory into Mem0

* fix: use OR operator when querying Mem0 memories with both user_id and agent_id
This commit is contained in:
Lucas Gomide
2025-07-30 12:56:46 -03:00
committed by GitHub
parent 498e8dc6e8
commit 34c3075fdb
2 changed files with 145 additions and 44 deletions

View File

@@ -1,7 +1,8 @@
import os
from typing import Any, Dict, List
from collections import defaultdict
from mem0 import Memory, MemoryClient
from crewai.utilities.chromadb import sanitize_collection_name
from crewai.memory.storage.interface import Storage
@@ -70,26 +71,32 @@ class Mem0Storage(Storage):
"""
Returns:
dict: A filter dictionary containing AND conditions for querying data.
- Includes user_id if memory_type is 'external'.
- Includes user_id and agent_id if both are present.
- Includes user_id if only user_id is present.
- Includes agent_id if only agent_id is present.
- Includes run_id if memory_type is 'short_term' and mem0_run_id is present.
"""
filter = {
"AND": []
}
filter = defaultdict(list)
# Add user_id condition if the memory type is external
if self.memory_type == "external":
filter["AND"].append({"user_id": self.config.get("user_id", "")})
# Add run_id condition if the memory type is short_term and a run ID is set
if self.memory_type == "short_term" and self.mem0_run_id:
filter["AND"].append({"run_id": self.mem0_run_id})
else:
user_id = self.config.get("user_id", "")
agent_id = self.config.get("agent_id", "")
if user_id and agent_id:
filter["OR"].append({"user_id": user_id})
filter["OR"].append({"agent_id": agent_id})
elif user_id:
filter["AND"].append({"user_id": user_id})
elif agent_id:
filter["AND"].append({"agent_id": agent_id})
return filter
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
user_id = self.config.get("user_id", "")
assistant_message = [{"role" : "assistant","content" : value}]
assistant_message = [{"role" : "assistant","content" : value}]
base_metadata = {
"short_term": "short_term",
@@ -104,31 +111,32 @@ class Mem0Storage(Storage):
"infer": self.infer
}
if self.memory_type == "external":
# MemoryClient-specific overrides
if isinstance(self.memory, MemoryClient):
params["includes"] = self.includes
params["excludes"] = self.excludes
params["output_format"] = "v1.1"
params["version"] = "v2"
if self.memory_type == "short_term" and self.mem0_run_id:
params["run_id"] = self.mem0_run_id
if user_id:
params["user_id"] = user_id
if params:
# MemoryClient-specific overrides
if isinstance(self.memory, MemoryClient):
params["includes"] = self.includes
params["excludes"] = self.excludes
params["output_format"] = "v1.1"
params["version"]="v2"
if agent_id := self.config.get("agent_id", self._get_agent_name()):
params["agent_id"] = agent_id
if self.memory_type == "short_term":
params["run_id"] = self.mem0_run_id
self.memory.add(assistant_message, **params)
self.memory.add(assistant_message, **params)
def search(self,query: str,limit: int = 3,score_threshold: float = 0.35) -> List[Any]:
params = {
"query": query,
"limit": limit,
"query": query,
"limit": limit,
"version": "v2",
"output_format": "v1.1"
}
if user_id := self.config.get("user_id", ""):
params["user_id"] = user_id
@@ -138,7 +146,7 @@ class Mem0Storage(Storage):
"entities": {"type": "entity"},
"external": {"type": "external"},
}
if self.memory_type in memory_type_map:
params["metadata"] = memory_type_map[self.memory_type]
if self.memory_type == "short_term":
@@ -151,11 +159,28 @@ class Mem0Storage(Storage):
params['threshold'] = score_threshold
if isinstance(self.memory, Memory):
del params["metadata"], params["version"], params["run_id"], params['output_format']
del params["metadata"], params["version"], params['output_format']
if params.get("run_id"):
del params["run_id"]
results = self.memory.search(**params)
return [r for r in results["results"]]
def reset(self):
if self.memory:
self.memory.reset()
def _sanitize_role(self, role: str) -> str:
"""
Sanitizes agent roles to ensure valid directory names.
"""
return role.replace("\n", "").replace(" ", "_").replace("/", "_")
def _get_agent_name(self) -> str:
if not self.crew:
return ""
agents = self.crew.agents
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = "_".join(agents)
return sanitize_collection_name(name=agents, max_collection_length=MAX_AGENT_ID_LENGTH_MEM0)