Add support for External Memory (the future replacement for UserMemory) (#2510)

* fix: surfacing properly supported types by Mem0Storage

* feat: prepare Mem0Storage to accept config paramenter

We're planning to remove `memory_config` soon. This commit kindly prepare this storage to accept the config provided directly

* feat: add external memory

* fix: cleanup Mem0 warning while adding messages to the memory

* feat: support set the current crew in memory

This can be useful when a memory is initialized before the crew, but the crew might still be a very relevant attribute

* fix: allow to reset only an external_memory from crew

* test: add external memory test

* test: ensure the config takes precedence over memory_config when setting mem0

* fix: support to provide a custom storage to External Memory

* docs: add docs about external memory

* chore: add warning messages about the deprecation of UserMemory

* fix: fix typing check

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
This commit is contained in:
Lucas Gomide
2025-04-07 13:40:35 -04:00
committed by GitHub
parent 918c0589eb
commit d7fa8464c7
19 changed files with 3870 additions and 76 deletions

View File

@@ -207,6 +207,7 @@ class Agent(BaseAgent):
self.crew._long_term_memory,
self.crew._entity_memory,
self.crew._user_memory,
self.crew._external_memory,
)
memory = contextual_memory.build_context_for_task(task, context)
if memory.strip() != "":

View File

@@ -47,6 +47,27 @@ class CrewAgentExecutorMixin:
print(f"Failed to add to short term memory: {e}")
pass
def _create_external_memory(self, output) -> None:
"""Create and save a external-term memory item if conditions are met."""
if (
self.crew
and self.agent
and self.task
and hasattr(self.crew, "_external_memory")
and self.crew._external_memory
):
try:
self.crew._external_memory.save(
value=output.text,
metadata={
"description": self.task.description,
},
agent=self.agent.role,
)
except Exception as e:
print(f"Failed to add to external memory: {e}")
pass
def _create_long_term_memory(self, output) -> None:
"""Create and save long-term and entity memory items based on evaluation."""
if (

View File

@@ -129,6 +129,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
self._create_external_memory(formatted_answer)
return {"output": formatted_answer.output}
def _invoke_loop(self) -> AgentFinish:

View File

@@ -28,6 +28,7 @@ from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.llm import LLM, BaseLLM
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.external.external_memory import ExternalMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.user.user_memory import UserMemory
@@ -105,6 +106,7 @@ class Crew(BaseModel):
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
_user_memory: Optional[InstanceOf[UserMemory]] = PrivateAttr()
_external_memory: Optional[InstanceOf[ExternalMemory]] = PrivateAttr()
_train: Optional[bool] = PrivateAttr(default=False)
_train_iteration: Optional[int] = PrivateAttr()
_inputs: Optional[Dict[str, Any]] = PrivateAttr(default=None)
@@ -145,6 +147,10 @@ class Crew(BaseModel):
default=None,
description="An instance of the UserMemory to be used by the Crew to store/fetch memories of a specific user.",
)
external_memory: Optional[InstanceOf[ExternalMemory]] = Field(
default=None,
description="An Instance of the ExternalMemory to be used by the Crew",
)
embedder: Optional[dict] = Field(
default=None,
description="Configuration for the embedder to be used for the crew.",
@@ -289,6 +295,12 @@ class Crew(BaseModel):
if self.entity_memory
else EntityMemory(crew=self, embedder_config=self.embedder)
)
self._external_memory = (
# External memory doesnt support a default value since it was designed to be managed entirely externally
self.external_memory.set_crew(self)
if self.external_memory
else None
)
if (
self.memory_config
and "user_memory" in self.memory_config
@@ -1167,6 +1179,7 @@ class Crew(BaseModel):
"_short_term_memory",
"_long_term_memory",
"_entity_memory",
"_external_memory",
"_telemetry",
"agents",
"tasks",
@@ -1313,7 +1326,15 @@ class Crew(BaseModel):
RuntimeError: If memory reset operation fails.
"""
VALID_TYPES = frozenset(
["long", "short", "entity", "knowledge", "kickoff_outputs", "all"]
[
"long",
"short",
"entity",
"knowledge",
"kickoff_outputs",
"all",
"external",
]
)
if command_type not in VALID_TYPES:
@@ -1339,6 +1360,7 @@ class Crew(BaseModel):
memory_systems = [
("short term", getattr(self, "_short_term_memory", None)),
("entity", getattr(self, "_entity_memory", None)),
("external", getattr(self, "_external_memory", None)),
("long term", getattr(self, "_long_term_memory", None)),
("task output", getattr(self, "_task_output_handler", None)),
("knowledge", getattr(self, "knowledge", None)),
@@ -1366,6 +1388,7 @@ class Crew(BaseModel):
"entity": (self._entity_memory, "entity"),
"knowledge": (self.knowledge, "knowledge"),
"kickoff_outputs": (self._task_output_handler, "task output"),
"external": (self._external_memory, "external"),
}
memory_system, name = reset_functions[memory_type]

View File

@@ -2,5 +2,12 @@ from .entity.entity_memory import EntityMemory
from .long_term.long_term_memory import LongTermMemory
from .short_term.short_term_memory import ShortTermMemory
from .user.user_memory import UserMemory
from .external.external_memory import ExternalMemory
__all__ = ["UserMemory", "EntityMemory", "LongTermMemory", "ShortTermMemory"]
__all__ = [
"UserMemory",
"EntityMemory",
"LongTermMemory",
"ShortTermMemory",
"ExternalMemory",
]

View File

@@ -1,6 +1,12 @@
from typing import Any, Dict, Optional
from crewai.memory import EntityMemory, LongTermMemory, ShortTermMemory, UserMemory
from crewai.memory import (
EntityMemory,
ExternalMemory,
LongTermMemory,
ShortTermMemory,
UserMemory,
)
class ContextualMemory:
@@ -11,6 +17,7 @@ class ContextualMemory:
ltm: LongTermMemory,
em: EntityMemory,
um: UserMemory,
exm: ExternalMemory,
):
if memory_config is not None:
self.memory_provider = memory_config.get("provider")
@@ -20,6 +27,7 @@ class ContextualMemory:
self.ltm = ltm
self.em = em
self.um = um
self.exm = exm
def build_context_for_task(self, task, context) -> str:
"""
@@ -35,6 +43,7 @@ class ContextualMemory:
context.append(self._fetch_ltm_context(task.description))
context.append(self._fetch_stm_context(query))
context.append(self._fetch_entity_context(query))
context.append(self._fetch_external_context(query))
if self.memory_provider == "mem0":
context.append(self._fetch_user_context(query))
return "\n".join(filter(None, context))
@@ -106,3 +115,24 @@ class ContextualMemory:
f"- {result['memory']}" for result in user_memories
)
return f"User memories/preferences:\n{formatted_memories}"
def _fetch_external_context(self, query: str) -> str:
"""
Fetches and formats relevant information from External Memory.
Args:
query (str): The search query to find relevant information.
Returns:
str: Formatted information as bullet points, or an empty string if none found.
"""
if self.exm is None:
return ""
external_memories = self.exm.search(query)
if not external_memories:
return ""
formatted_memories = "\n".join(
f"- {result['memory']}" for result in external_memories
)
return f"External memories:\n{formatted_memories}"

View File

View File

@@ -0,0 +1,61 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Self
from crewai.memory.external.external_memory_item import ExternalMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.storage.interface import Storage
if TYPE_CHECKING:
from crewai.memory.storage.mem0_storage import Mem0Storage
class ExternalMemory(Memory):
def __init__(self, storage: Optional[Storage] = None, **data: Any):
super().__init__(storage=storage, **data)
@staticmethod
def _configure_mem0(crew: Any, config: Dict[str, Any]) -> "Mem0Storage":
from crewai.memory.storage.mem0_storage import Mem0Storage
return Mem0Storage(type="external", crew=crew, config=config)
@staticmethod
def external_supported_storages() -> Dict[str, Any]:
return {
"mem0": ExternalMemory._configure_mem0,
}
@staticmethod
def create_storage(crew: Any, embedder_config: Optional[Dict[str, Any]]) -> Storage:
if not embedder_config:
raise ValueError("embedder_config is required")
if "provider" not in embedder_config:
raise ValueError("embedder_config must include a 'provider' key")
provider = embedder_config["provider"]
supported_storages = ExternalMemory.external_supported_storages()
if provider not in supported_storages:
raise ValueError(f"Provider {provider} not supported")
return supported_storages[provider](crew, embedder_config.get("config", {}))
def save(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""Saves a value into the external storage."""
item = ExternalMemoryItem(value=value, metadata=metadata, agent=agent)
super().save(value=item.value, metadata=item.metadata, agent=item.agent)
def reset(self) -> None:
self.storage.reset()
def set_crew(self, crew: Any) -> Self:
super().set_crew(crew)
if not self.storage:
self.storage = self.create_storage(crew, self.embedder_config)
return self

View File

@@ -0,0 +1,13 @@
from typing import Any, Dict, Optional
class ExternalMemoryItem:
def __init__(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
):
self.value = value
self.metadata = metadata
self.agent = agent

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Self
from pydantic import BaseModel
@@ -9,6 +9,7 @@ class Memory(BaseModel):
"""
embedder_config: Optional[Dict[str, Any]] = None
crew: Optional[Any] = None
storage: Any
@@ -36,3 +37,7 @@ class Memory(BaseModel):
return self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
)
def set_crew(self, crew: Any) -> Self:
self.crew = crew
return self

View File

@@ -11,15 +11,20 @@ class Mem0Storage(Storage):
Extends Storage to handle embedding and searching across entities using Mem0.
"""
def __init__(self, type, crew=None):
def __init__(self, type, crew=None, config=None):
super().__init__()
if type not in ["user", "short_term", "long_term", "entities"]:
raise ValueError("Invalid type for Mem0Storage. Must be 'user' or 'agent'.")
supported_types = ["user", "short_term", "long_term", "entities", "external"]
if type not in supported_types:
raise ValueError(
f"Invalid type '{type}' for Mem0Storage. Must be one of: "
+ ", ".join(supported_types)
)
self.memory_type = type
self.crew = crew
self.memory_config = crew.memory_config
self.config = config or {}
# TODO: Memory config will be removed in the future the config will be passed as a parameter
self.memory_config = self.config or getattr(crew, "memory_config", {}) or {}
# User ID is required for user memory type "user" since it's used as a unique identifier for the user.
user_id = self._get_user_id()
@@ -27,7 +32,7 @@ class Mem0Storage(Storage):
raise ValueError("User ID is required for user memory type")
# API key in memory config overrides the environment variable
config = self.memory_config.get("config", {})
config = self._get_config()
mem0_api_key = config.get("api_key") or os.getenv("MEM0_API_KEY")
mem0_org_id = config.get("org_id")
mem0_project_id = config.get("project_id")
@@ -56,26 +61,34 @@ class Mem0Storage(Storage):
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
user_id = self._get_user_id()
agent_name = self._get_agent_name()
if self.memory_type == "user":
self.memory.add(value, user_id=user_id, metadata={**metadata})
elif self.memory_type == "short_term":
agent_name = self._get_agent_name()
self.memory.add(
value, agent_id=agent_name, metadata={"type": "short_term", **metadata}
)
params = None
if self.memory_type == "short_term":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "short_term", **metadata},
}
elif self.memory_type == "long_term":
agent_name = self._get_agent_name()
self.memory.add(
value,
agent_id=agent_name,
infer=False,
metadata={"type": "long_term", **metadata},
)
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "long_term", **metadata},
}
elif self.memory_type == "entities":
entity_name = self._get_agent_name()
self.memory.add(
value, user_id=entity_name, metadata={"type": "entity", **metadata}
)
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "entity", **metadata},
}
elif self.memory_type == "external":
params = {
"user_id": user_id,
"agent_id": agent_name,
"metadata": {"type": "external", **metadata},
}
if params:
self.memory.add(value, **params | {"output_format": "v1.1"})
def search(
self,
@@ -84,41 +97,43 @@ class Mem0Storage(Storage):
score_threshold: float = 0.35,
) -> List[Any]:
params = {"query": query, "limit": limit}
if self.memory_type == "user":
user_id = self._get_user_id()
if user_id := self._get_user_id():
params["user_id"] = user_id
elif self.memory_type == "short_term":
agent_name = self._get_agent_name()
agent_name = self._get_agent_name()
if self.memory_type == "short_term":
params["agent_id"] = agent_name
params["metadata"] = {"type": "short_term"}
elif self.memory_type == "long_term":
agent_name = self._get_agent_name()
params["agent_id"] = agent_name
params["metadata"] = {"type": "long_term"}
elif self.memory_type == "entities":
agent_name = self._get_agent_name()
params["agent_id"] = agent_name
params["metadata"] = {"type": "entity"}
elif self.memory_type == "external":
params["agent_id"] = agent_name
params["metadata"] = {"type": "external"}
# Discard the filters for now since we create the filters
# automatically when the crew is created.
results = self.memory.search(**params)
return [r for r in results if r["score"] >= score_threshold]
def _get_user_id(self):
if self.memory_type == "user":
if hasattr(self, "memory_config") and self.memory_config is not None:
return self.memory_config.get("config", {}).get("user_id")
else:
return None
return None
def _get_user_id(self) -> str:
return self._get_config().get("user_id", "")
def _get_agent_name(self):
agents = self.crew.agents if self.crew else []
def _get_agent_name(self) -> str:
if not self.crew:
return ""
agents = self.crew.agents
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = "_".join(agents)
return agents
def _get_config(self) -> Dict[str, Any]:
return self.config or getattr(self, "memory_config", {}).get("config", {}) or {}
def reset(self):
if self.memory:
self.memory.reset()

View File

@@ -1,3 +1,4 @@
import warnings
from typing import Any, Dict, Optional
from crewai.memory.memory import Memory
@@ -12,6 +13,12 @@ class UserMemory(Memory):
"""
def __init__(self, crew=None):
warnings.warn(
"UserMemory is deprecated and will be removed in a future version. "
"Please use ExternalMemory instead.",
DeprecationWarning,
stacklevel=2,
)
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
@@ -48,6 +55,4 @@ class UserMemory(Memory):
try:
self.storage.reset()
except Exception as e:
raise Exception(
f"An error occurred while resetting the user memory: {e}"
)
raise Exception(f"An error occurred while resetting the user memory: {e}")