diff --git a/docs/en/concepts/memory.mdx b/docs/en/concepts/memory.mdx index cdabc8f2c..67c13ee56 100644 --- a/docs/en/concepts/memory.mdx +++ b/docs/en/concepts/memory.mdx @@ -720,7 +720,16 @@ crew = Crew( ``` ### Advanced Mem0 Configuration +When using Mem0 Client, you can customize the memory configuration further, by using parameters like 'includes', 'excludes', 'custom_categories' and 'run_id' (this is only for short-term memory). +You can find more details in the [Mem0 documentation](https://docs.mem0.ai/). ```python + +new_categories = [ + {"lifestyle_management_concerns": "Tracks daily routines, habits, hobbies and interests including cooking, time management and work-life balance"}, + {"seeking_structure": "Documents goals around creating routines, schedules, and organized systems in various life areas"}, + {"personal_information": "Basic information about the user including name, preferences, and personality traits"} +] + crew = Crew( agents=[...], tasks=[...], @@ -732,6 +741,11 @@ crew = Crew( "org_id": "my_org_id", # Optional "project_id": "my_project_id", # Optional "api_key": "custom-api-key" # Optional - overrides env var + "run_id": "my_run_id", # Optional - for short-term memory + "includes": "include1", # Optional + "excludes": "exclude1", # Optional + "infer": True + "custom_categories": new_categories # Optional - custom categories for user memory }, "user_memory": {} } @@ -761,7 +775,8 @@ crew = Crew( "provider": "openai", "config": {"api_key": "your-api-key", "model": "text-embedding-3-small"} } - } + }, + "infer": True }, "user_memory": {} } diff --git a/src/crewai/memory/storage/mem0_storage.py b/src/crewai/memory/storage/mem0_storage.py index bf1b5ef1e..e0e8af890 100644 --- a/src/crewai/memory/storage/mem0_storage.py +++ b/src/crewai/memory/storage/mem0_storage.py @@ -4,7 +4,6 @@ from typing import Any, Dict, List from mem0 import Memory, MemoryClient from crewai.memory.storage.interface import Storage -from crewai.utilities.chromadb import sanitize_collection_name MAX_AGENT_ID_LENGTH_MEM0 = 255 @@ -13,137 +12,150 @@ class Mem0Storage(Storage): """ Extends Storage to handle embedding and searching across entities using Mem0. """ - def __init__(self, type, crew=None, config=None): super().__init__() - supported_types = ["user", "short_term", "long_term", "entities", "external"] - if type not in supported_types: - raise ValueError( - f"Invalid type '{type}' for Mem0Storage. Must be one of: " - + ", ".join(supported_types) - ) + self._validate_type(type) self.memory_type = type self.crew = crew - self.config = config or {} - # TODO: Memory config will be removed in the future the config will be passed as a parameter - self.memory_config = self.config or getattr(crew, "memory_config", {}) or {} - # User ID is required for user memory type "user" since it's used as a unique identifier for the user. - user_id = self._get_user_id() - if type == "user" and not user_id: + # TODO: Memory config will be removed in the future the config will be passed as a parameter + self.config = config or getattr(crew, "memory_config", {}).get("config", {}) or {} + + self._validate_user_id() + self._extract_config_values() + self._initialize_memory() + + def _validate_type(self, type): + supported_types = {"user", "short_term", "long_term", "entities", "external"} + if type not in supported_types: + raise ValueError( + f"Invalid type '{type}' for Mem0Storage. Must be one of: {', '.join(supported_types)}" + ) + + def _validate_user_id(self): + if self.memory_type == "user" and not self.config.get("user_id", ""): raise ValueError("User ID is required for user memory type") - # API key in memory config overrides the environment variable - config = self._get_config() - mem0_api_key = config.get("api_key") or os.getenv("MEM0_API_KEY") - mem0_org_id = config.get("org_id") - mem0_project_id = config.get("project_id") - mem0_local_config = config.get("local_mem0_config") + def _extract_config_values(self): + cfg = self.config + self.mem0_run_id = cfg.get("run_id") + self.includes = cfg.get("includes") + self.excludes = cfg.get("excludes") + self.custom_categories = cfg.get("custom_categories") + self.infer = cfg.get("infer", False) - # Initialize MemoryClient or Memory based on the presence of the mem0_api_key - if mem0_api_key: - if mem0_org_id and mem0_project_id: - self.memory = MemoryClient( - api_key=mem0_api_key, org_id=mem0_org_id, project_id=mem0_project_id - ) - else: - self.memory = MemoryClient(api_key=mem0_api_key) + def _initialize_memory(self): + api_key = self.config.get("api_key") or os.getenv("MEM0_API_KEY") + org_id = self.config.get("org_id") + project_id = self.config.get("project_id") + local_config = self.config.get("local_mem0_config") + + if api_key: + self.memory = ( + MemoryClient(api_key=api_key, org_id=org_id, project_id=project_id) + if org_id and project_id + else MemoryClient(api_key=api_key) + ) + if self.custom_categories: + self.memory.update_project(custom_categories=self.custom_categories) else: - if mem0_local_config and len(mem0_local_config): - self.memory = Memory.from_config(mem0_local_config) - else: - self.memory = Memory() + self.memory = ( + Memory.from_config(local_config) + if local_config and len(local_config) + else Memory() + ) - def _sanitize_role(self, role: str) -> str: + def _create_filter_for_search(self): """ - Sanitizes agent roles to ensure valid directory names. + Returns: + dict: A filter dictionary containing AND conditions for querying data. + - Includes user_id if memory_type is 'external'. + - Includes run_id if memory_type is 'short_term' and mem0_run_id is present. """ - return role.replace("\n", "").replace(" ", "_").replace("/", "_") + filter = { + "AND": [] + } + + # Add user_id condition if the memory type is external + if self.memory_type == "external": + filter["AND"].append({"user_id": self.config.get("user_id", "")}) + + # Add run_id condition if the memory type is short_term and a run ID is set + if self.memory_type == "short_term" and self.mem0_run_id: + filter["AND"].append({"run_id": self.mem0_run_id}) + + return filter def save(self, value: Any, metadata: Dict[str, Any]) -> None: - user_id = self._get_user_id() - agent_name = self._get_agent_name() + user_id = self.config.get("user_id", "") assistant_message = [{"role" : "assistant","content" : value}] - params = None - if self.memory_type == "short_term": - params = { - "agent_id": agent_name, - "infer": False, - "metadata": {"type": "short_term", **metadata}, - } - elif self.memory_type == "long_term": - params = { - "agent_id": agent_name, - "infer": False, - "metadata": {"type": "long_term", **metadata}, - } - elif self.memory_type == "entities": - params = { - "agent_id": agent_name, - "infer": False, - "metadata": {"type": "entity", **metadata}, - } - elif self.memory_type == "external": - params = { - "user_id": user_id, - "agent_id": agent_name, - "metadata": {"type": "external", **metadata}, - } - if params: - if isinstance(self.memory, MemoryClient): - params["output_format"] = "v1.1" - - self.memory.add(assistant_message, **params) + base_metadata = { + "short_term": "short_term", + "long_term": "long_term", + "entities": "entity", + "external": "external" + } - def search( - self, - query: str, - limit: int = 3, - score_threshold: float = 0.35, - ) -> List[Any]: - params = {"query": query, "limit": limit, "output_format": "v1.1"} - if user_id := self._get_user_id(): + # Shared base params + params: dict[str, Any] = { + "metadata": {"type": base_metadata[self.memory_type], **metadata}, + "infer": self.infer + } + + if self.memory_type == "external": params["user_id"] = user_id - agent_name = self._get_agent_name() - if self.memory_type == "short_term": - params["agent_id"] = agent_name - params["metadata"] = {"type": "short_term"} - elif self.memory_type == "long_term": - params["agent_id"] = agent_name - params["metadata"] = {"type": "long_term"} - elif self.memory_type == "entities": - params["agent_id"] = agent_name - params["metadata"] = {"type": "entity"} - elif self.memory_type == "external": - params["agent_id"] = agent_name - params["metadata"] = {"type": "external"} + + if params: + # MemoryClient-specific overrides + if isinstance(self.memory, MemoryClient): + params["includes"] = self.includes + params["excludes"] = self.excludes + params["output_format"] = "v1.1" + params["version"]="v2" + + if self.memory_type == "short_term": + params["run_id"] = self.mem0_run_id + + self.memory.add(assistant_message, **params) + + def search(self,query: str,limit: int = 3,score_threshold: float = 0.35) -> List[Any]: + params = { + "query": query, + "limit": limit, + "version": "v2", + "output_format": "v1.1" + } + + if user_id := self.config.get("user_id", ""): + params["user_id"] = user_id + + memory_type_map = { + "short_term": {"type": "short_term"}, + "long_term": {"type": "long_term"}, + "entities": {"type": "entity"}, + "external": {"type": "external"}, + } + + if self.memory_type in memory_type_map: + params["metadata"] = memory_type_map[self.memory_type] + if self.memory_type == "short_term": + params["run_id"] = self.mem0_run_id # Discard the filters for now since we create the filters # automatically when the crew is created. + + params["filters"] = self._create_filter_for_search() + params['threshold'] = score_threshold + if isinstance(self.memory, Memory): - del params["metadata"], params["output_format"] - + del params["metadata"], params["version"], params["run_id"], params['output_format'] + results = self.memory.search(**params) - return [r for r in results["results"] if r["score"] >= score_threshold] - - def _get_user_id(self) -> str: - return self._get_config().get("user_id", "") - - def _get_agent_name(self) -> str: - if not self.crew: - return "" - - agents = self.crew.agents - agents = [self._sanitize_role(agent.role) for agent in agents] - agents = "_".join(agents) - return sanitize_collection_name(name=agents,max_collection_length=MAX_AGENT_ID_LENGTH_MEM0) - - def _get_config(self) -> Dict[str, Any]: - return self.config or getattr(self, "memory_config", {}).get("config", {}) or {} - + return [r for r in results["results"]] + def reset(self): if self.memory: self.memory.reset() diff --git a/tests/storage/test_mem0_storage.py b/tests/storage/test_mem0_storage.py index c630b8298..6c4cf3c6e 100644 --- a/tests/storage/test_mem0_storage.py +++ b/tests/storage/test_mem0_storage.py @@ -55,10 +55,11 @@ def mem0_storage_with_mocked_config(mock_mem0_memory): } # Instantiate the class with memory_config + # Parameters like run_id, includes, and excludes doesn't matter in Memory OSS crew = MockCrew( memory_config={ "provider": "mem0", - "config": {"user_id": "test_user", "local_mem0_config": config}, + "config": {"user_id": "test_user", "local_mem0_config": config, "run_id": "my_run_id", "includes": "include1","excludes": "exclude1", "infer" : True}, } ) @@ -95,6 +96,10 @@ def mem0_storage_with_memory_client_using_config_from_crew(mock_mem0_memory_clie "api_key": "ABCDEFGH", "org_id": "my_org_id", "project_id": "my_project_id", + "run_id": "my_run_id", + "includes": "include1", + "excludes": "exclude1", + "infer": True }, } ) @@ -150,11 +155,37 @@ def test_mem0_storage_with_explict_config( assert ( mem0_storage_with_memory_client_using_explictly_config.config == expected_config ) - assert ( - mem0_storage_with_memory_client_using_explictly_config.memory_config - == expected_config + + +def test_mem0_storage_updates_project_with_custom_categories(mock_mem0_memory_client): + mock_mem0_memory_client.update_project = MagicMock() + + new_categories = [ + {"lifestyle_management_concerns": "Tracks daily routines, habits, hobbies and interests including cooking, time management and work-life balance"}, + ] + + crew = MockCrew( + memory_config={ + "provider": "mem0", + "config": { + "user_id": "test_user", + "api_key": "ABCDEFGH", + "org_id": "my_org_id", + "project_id": "my_project_id", + "custom_categories": new_categories, + }, + } ) + with patch.object(MemoryClient, "__new__", return_value=mock_mem0_memory_client): + _ = Mem0Storage(type="short_term", crew=crew) + + mock_mem0_memory_client.update_project.assert_called_once_with( + custom_categories=new_categories + ) + + + def test_save_method_with_memory_oss(mem0_storage_with_mocked_config): """Test save method for different memory types""" @@ -169,8 +200,7 @@ def test_save_method_with_memory_oss(mem0_storage_with_mocked_config): mem0_storage.memory.add.assert_called_once_with( [{'role': 'assistant' , 'content': test_value}], - agent_id="Test_Agent", - infer=False, + infer=True, metadata={"type": "short_term", "key": "value"}, ) @@ -188,10 +218,13 @@ def test_save_method_with_memory_client(mem0_storage_with_memory_client_using_co mem0_storage.memory.add.assert_called_once_with( [{'role': 'assistant' , 'content': test_value}], - agent_id="Test_Agent", - infer=False, + infer=True, metadata={"type": "short_term", "key": "value"}, - output_format="v1.1" + version="v2", + run_id="my_run_id", + includes="include1", + excludes="exclude1", + output_format='v1.1' ) @@ -206,11 +239,12 @@ def test_search_method_with_memory_oss(mem0_storage_with_mocked_config): mem0_storage.memory.search.assert_called_once_with( query="test query", limit=5, - agent_id="Test_Agent", - user_id="test_user" + user_id="test_user", + filters={'AND': [{'run_id': 'my_run_id'}]}, + threshold=0.5 ) - assert len(results) == 1 + assert len(results) == 2 assert results[0]["content"] == "Result 1" @@ -225,11 +259,14 @@ def test_search_method_with_memory_client(mem0_storage_with_memory_client_using_ mem0_storage.memory.search.assert_called_once_with( query="test query", limit=5, - agent_id="Test_Agent", metadata={"type": "short_term"}, user_id="test_user", - output_format='v1.1' + version='v2', + run_id="my_run_id", + output_format='v1.1', + filters={'AND': [{'run_id': 'my_run_id'}]}, + threshold=0.5 ) - assert len(results) == 1 + assert len(results) == 2 assert results[0]["content"] == "Result 1"