Dropping messages from metadata in Mem0 Storage (#3390)

* Dropped messages from metadata and added user-assistant interaction directly

* Fixed test cases for this

* Fixed static type checking issue

* Changed logic to take latest user and assistant messages

* Added default value to be string

* Linting checks

* Removed duplication of tool calling

* Fixed Linting Changes

* Ruff check

* Removed console formatter file from commit

* Linting fixed

* Linting checks

* Ignoring missing imports error

* Added suggested changes

* Fixed import untyped error
This commit is contained in:
Vidit Ostwal
2025-09-13 00:55:29 +05:30
committed by GitHub
parent 45d0c9912c
commit 1b00cc71ef
2 changed files with 55 additions and 25 deletions

View File

@@ -1,10 +1,12 @@
import os
from typing import Any, Dict, List
import re
from collections import defaultdict
from mem0 import Memory, MemoryClient
from crewai.utilities.chromadb import sanitize_collection_name
from typing import Any, Iterable
from mem0 import Memory, MemoryClient # type: ignore[import-untyped]
from crewai.memory.storage.interface import Storage
from crewai.utilities.chromadb import sanitize_collection_name
MAX_AGENT_ID_LENGTH_MEM0 = 255
@@ -86,9 +88,28 @@ class Mem0Storage(Storage):
return filter
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
def save(self, value: Any, metadata: dict[str, Any]) -> None:
def _last_content(messages: Iterable[dict[str, Any]], role: str) -> str:
return next(
(m.get("content", "") for m in reversed(list(messages)) if m.get("role") == role),
""
)
conversations = []
messages = metadata.pop("messages", None)
if messages:
last_user = _last_content(messages, "user")
last_assistant = _last_content(messages, "assistant")
if user_msg := self._get_user_message(last_user):
conversations.append({"role": "user", "content": user_msg})
if assistant_msg := self._get_assistant_message(last_assistant):
conversations.append({"role": "assistant", "content": assistant_msg})
else:
conversations.append({"role": "assistant", "content": value})
user_id = self.config.get("user_id", "")
assistant_message = [{"role" : "assistant","content" : value}]
base_metadata = {
"short_term": "short_term",
@@ -119,9 +140,9 @@ class Mem0Storage(Storage):
if agent_id := self.config.get("agent_id", self._get_agent_name()):
params["agent_id"] = agent_id
self.memory.add(assistant_message, **params)
self.memory.add(conversations, **params)
def search(self,query: str,limit: int = 3,score_threshold: float = 0.35) -> List[Any]:
def search(self,query: str,limit: int = 3,score_threshold: float = 0.35) -> list[Any]:
params = {
"query": query,
"limit": limit,
@@ -160,7 +181,7 @@ class Mem0Storage(Storage):
# This makes it compatible for Contextual Memory to retrieve
for result in results["results"]:
result["context"] = result["memory"]
return [r for r in results["results"]]
def reset(self):
@@ -181,3 +202,16 @@ class Mem0Storage(Storage):
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = "_".join(agents)
return sanitize_collection_name(name=agents, max_collection_length=MAX_AGENT_ID_LENGTH_MEM0)
def _get_assistant_message(self, text: str) -> str:
marker = "Final Answer:"
if marker in text:
return text.split(marker, 1)[1].strip()
return text
def _get_user_message(self, text: str) -> str:
pattern = r"User message:\s*(.*)"
match = re.search(pattern, text)
if match:
return match.group(1).strip()
return text