Compare commits

...

2 Commits

Author SHA1 Message Date
Devin AI
65c57fc43b refactor: Improve memory_verbose implementation per PR feedback (#2859)
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-18 18:01:38 +00:00
Devin AI
ce68323815 feat: Add memory_verbose flag to provide visibility into memory operations (#2858)
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-18 17:48:51 +00:00
7 changed files with 541 additions and 68 deletions

View File

@@ -113,6 +113,10 @@ class Crew(BaseModel):
default=False,
description="Whether the crew should use memory to store memories of it's execution",
)
memory_verbose: bool = Field(
default=False,
description="Whether to show verbose logs about memory operations",
)
memory_config: Optional[Dict[str, Any]] = Field(
default=None,
description="Configuration for the memory to be used for the crew.",
@@ -257,7 +261,7 @@ class Crew(BaseModel):
"""Set private attributes."""
if self.memory:
self._long_term_memory = (
self.long_term_memory if self.long_term_memory else LongTermMemory()
self.long_term_memory if self.long_term_memory else LongTermMemory(memory_verbose=self.memory_verbose)
)
self._short_term_memory = (
self.short_term_memory
@@ -265,16 +269,17 @@ class Crew(BaseModel):
else ShortTermMemory(
crew=self,
embedder_config=self.embedder,
memory_verbose=self.memory_verbose,
)
)
self._entity_memory = (
self.entity_memory
if self.entity_memory
else EntityMemory(crew=self, embedder_config=self.embedder)
else EntityMemory(crew=self, embedder_config=self.embedder, memory_verbose=self.memory_verbose)
)
if hasattr(self, "memory_config") and self.memory_config is not None:
self._user_memory = (
self.user_memory if self.user_memory else UserMemory(crew=self)
self.user_memory if self.user_memory else UserMemory(crew=self, memory_verbose=self.memory_verbose)
)
else:
self._user_memory = None

View File

@@ -1,5 +1,7 @@
from typing import Any, Dict, List, Optional
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.memory import Memory, MemoryOperationError
from crewai.memory.storage.rag_storage import RAGStorage
@@ -8,9 +10,24 @@ class EntityMemory(Memory):
EntityMemory class for managing structured information about entities
and their relationships using SQLite storage.
Inherits from the Memory class.
Attributes:
memory_provider: The memory provider to use, if any.
storage: The storage backend for the memory.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
def __init__(self, crew=None, embedder_config=None, storage=None, path=None, memory_verbose=False):
"""
Initialize an EntityMemory instance.
Args:
crew: The crew to associate with this memory.
embedder_config: Configuration for the embedder.
storage: The storage backend for the memory.
path: Path to the storage file, if any.
memory_verbose: Whether to log memory operations.
"""
if hasattr(crew, "memory_config") and crew.memory_config is not None:
self.memory_provider = crew.memory_config.get("provider")
else:
@@ -36,23 +53,48 @@ class EntityMemory(Memory):
path=path,
)
)
super().__init__(storage)
super().__init__(storage, memory_verbose=memory_verbose)
def save(self, item: EntityMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
"""Saves an entity item into the SQLite storage."""
if self.memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
"""
Saves an entity item into storage.
Args:
item: The entity memory item to save.
Raises:
MemoryOperationError: If there's an error saving the entity to memory.
"""
try:
if self.memory_verbose:
self._log_operation("Saving entity", f"{item.name} ({item.type})")
self._log_operation("Description", item.description)
if self.memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
except Exception as e:
if self.memory_verbose:
self._log_operation("Error saving entity", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "save entity", self.__class__.__name__)
def reset(self) -> None:
"""
Reset the entity memory.
Raises:
MemoryOperationError: If there's an error resetting the memory.
"""
try:
self.storage.reset()
except Exception as e:
raise Exception(f"An error occurred while resetting the entity memory: {e}")
if self.memory_verbose:
self._log_operation("Error resetting", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "reset", self.__class__.__name__)

View File

@@ -1,7 +1,7 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.memory import Memory, MemoryOperationError
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage
@@ -12,25 +12,90 @@ class LongTermMemory(Memory):
Inherits from the Memory class and utilizes an instance of a class that
adheres to the Storage for data storage, specifically working with
LongTermMemoryItem instances.
Attributes:
storage: The storage backend for the memory.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, storage=None, path=None):
def __init__(self, storage=None, path=None, memory_verbose=False):
"""
Initialize a LongTermMemory instance.
Args:
storage: The storage backend for the memory.
path: Path to the storage file, if any.
memory_verbose: Whether to log memory operations.
"""
if not storage:
storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage()
super().__init__(storage)
super().__init__(storage, memory_verbose=memory_verbose)
def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
metadata = item.metadata
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
task_description=item.task,
score=metadata["quality"],
metadata=metadata,
datetime=item.datetime,
)
"""
Save a long-term memory item to storage.
Args:
item: The long-term memory item to save.
Raises:
MemoryOperationError: If there's an error saving the item to memory.
"""
try:
if self.memory_verbose:
self._log_operation("Saving task", item.task)
self._log_operation("Agent", item.agent)
self._log_operation("Quality", str(item.metadata.get('quality')))
metadata = item.metadata
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
task_description=item.task,
score=metadata["quality"],
metadata=metadata,
datetime=item.datetime,
)
except Exception as e:
if self.memory_verbose:
self._log_operation("Error saving task", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "save task", self.__class__.__name__)
def search(self, task: str, latest_n: int = 3) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory"
return self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
"""
Search for long-term memories related to a task.
Args:
task: The task description to search for.
latest_n: Maximum number of results to return.
Returns:
A list of matching long-term memories.
Raises:
MemoryOperationError: If there's an error searching memory.
"""
try:
if self.memory_verbose:
self._log_operation("Searching for task", task)
results = self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
if self.memory_verbose and results:
self._log_operation("Found", f"{len(results)} results")
return results
except Exception as e:
if self.memory_verbose:
self._log_operation("Error searching", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "search", self.__class__.__name__)
def reset(self) -> None:
self.storage.reset()
"""
Reset the long-term memory.
Raises:
MemoryOperationError: If there's an error resetting the memory.
"""
try:
self.storage.reset()
except Exception as e:
if self.memory_verbose:
self._log_operation("Error resetting", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "reset", self.__class__.__name__)

View File

@@ -1,15 +1,67 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.utilities.logger import Logger
class MemoryOperationError(Exception):
"""
Exception raised for errors in memory operations.
Attributes:
message: Explanation of the error
operation: The operation that failed (e.g., "save", "search")
memory_type: The type of memory where the error occurred
"""
def __init__(self, message: str, operation: str, memory_type: str):
self.operation = operation
self.memory_type = memory_type
super().__init__(f"{memory_type} {operation} error: {message}")
class Memory:
"""
Base class for memory, now supporting agent tags and generic metadata.
Attributes:
storage: The storage backend for the memory.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, storage: RAGStorage):
def __init__(self, storage: RAGStorage, memory_verbose: bool = False):
"""
Initialize a Memory instance.
Args:
storage: The storage backend for the memory.
memory_verbose: Whether to log memory operations.
"""
self.storage = storage
self.memory_verbose = memory_verbose
self._logger = Logger(verbose=memory_verbose)
def _log_operation(self, operation: str, details: str, agent: Optional[str] = None, level: str = "info", color: str = "cyan") -> None:
"""
Log a memory operation if memory_verbose is enabled.
Args:
operation: The type of operation (e.g., "Saving", "Searching").
details: Details about the operation.
agent: The agent performing the operation, if any.
level: The log level.
color: The color to use for the log message.
"""
if not self.memory_verbose:
return
sanitized_details = str(details)
if len(sanitized_details) > 100:
sanitized_details = f"{sanitized_details[:100]}..."
memory_type = self.__class__.__name__
agent_info = f" from agent '{agent}'" if agent else ""
self._logger.log(level, f"{memory_type}: {operation}{agent_info}: {sanitized_details}", color=color)
def save(
self,
@@ -17,11 +69,30 @@ class Memory:
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""
Save a value to memory.
Args:
value: The value to save.
metadata: Additional metadata to store with the value.
agent: The agent saving the value, if any.
Raises:
MemoryOperationError: If there's an error saving the value to memory.
"""
metadata = metadata or {}
if agent:
metadata["agent"] = agent
if self.memory_verbose:
self._log_operation("Saving", str(value), agent)
self.storage.save(value, metadata)
try:
self.storage.save(value, metadata)
except Exception as e:
if self.memory_verbose:
self._log_operation("Error saving", str(e), agent, level="error", color="red")
raise MemoryOperationError(str(e), "save", self.__class__.__name__)
def search(
self,
@@ -29,6 +100,33 @@ class Memory:
limit: int = 3,
score_threshold: float = 0.35,
) -> List[Any]:
return self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
)
"""
Search for values in memory.
Args:
query: The search query.
limit: Maximum number of results to return.
score_threshold: Minimum similarity score for results.
Returns:
A list of matching values.
Raises:
MemoryOperationError: If there's an error searching memory.
"""
if self.memory_verbose:
self._log_operation("Searching for", query)
try:
results = self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
)
if self.memory_verbose and results:
self._log_operation("Found", f"{len(results)} results")
return results
except Exception as e:
if self.memory_verbose:
self._log_operation("Error searching", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "search", self.__class__.__name__)

View File

@@ -1,6 +1,6 @@
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from crewai.memory.memory import Memory
from crewai.memory.memory import Memory, MemoryOperationError
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.memory.storage.rag_storage import RAGStorage
@@ -12,9 +12,24 @@ class ShortTermMemory(Memory):
Inherits from the Memory class and utilizes an instance of a class that
adheres to the Storage for data storage, specifically working with
MemoryItem instances.
Attributes:
memory_provider: The memory provider to use, if any.
storage: The storage backend for the memory.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
def __init__(self, crew=None, embedder_config=None, storage=None, path=None, memory_verbose=False):
"""
Initialize a ShortTermMemory instance.
Args:
crew: The crew to associate with this memory.
embedder_config: Configuration for the embedder.
storage: The storage backend for the memory.
path: Path to the storage file, if any.
memory_verbose: Whether to log memory operations.
"""
if hasattr(crew, "memory_config") and crew.memory_config is not None:
self.memory_provider = crew.memory_config.get("provider")
else:
@@ -39,7 +54,7 @@ class ShortTermMemory(Memory):
path=path,
)
)
super().__init__(storage)
super().__init__(storage, memory_verbose=memory_verbose)
def save(
self,
@@ -47,26 +62,68 @@ class ShortTermMemory(Memory):
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if self.memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
"""
Save a value to short-term memory.
Args:
value: The value to save.
metadata: Additional metadata to store with the value.
agent: The agent saving the value, if any.
Raises:
MemoryOperationError: If there's an error saving to memory.
"""
try:
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if self.memory_verbose:
self._log_operation("Saving item", str(item.data), agent)
if self.memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
except Exception as e:
if self.memory_verbose:
self._log_operation("Error saving item", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "save", self.__class__.__name__)
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
):
return self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
) # type: ignore # BUG? The reference is to the parent class, but the parent class does not have this parameters
) -> List[Any]:
"""
Search for values in short-term memory.
Args:
query: The search query.
limit: Maximum number of results to return.
score_threshold: Minimum similarity score for results.
Returns:
A list of matching values.
Raises:
MemoryOperationError: If there's an error searching memory.
"""
try:
return super().search(query=query, limit=limit, score_threshold=score_threshold)
except Exception as e:
if self.memory_verbose:
self._log_operation("Error searching", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "search", self.__class__.__name__)
def reset(self) -> None:
"""
Reset the short-term memory.
Raises:
MemoryOperationError: If there's an error resetting the memory.
"""
try:
self.storage.reset()
except Exception as e:
raise Exception(
f"An error occurred while resetting the short-term memory: {e}"
)
if self.memory_verbose:
self._log_operation("Error resetting", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "reset", self.__class__.__name__)

View File

@@ -1,6 +1,6 @@
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from crewai.memory.memory import Memory
from crewai.memory.memory import Memory, MemoryOperationError
class UserMemory(Memory):
@@ -9,9 +9,23 @@ class UserMemory(Memory):
Inherits from the Memory class and utilizes an instance of a class that
adheres to the Storage for data storage, specifically working with
MemoryItem instances.
Attributes:
storage: The storage backend for the memory.
memory_verbose: Whether to log memory operations.
"""
def __init__(self, crew=None):
def __init__(self, crew=None, memory_verbose=False):
"""
Initialize a UserMemory instance.
Args:
crew: The crew to associate with this memory.
memory_verbose: Whether to log memory operations.
Raises:
ImportError: If Mem0 is not installed.
"""
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
@@ -19,27 +33,72 @@ class UserMemory(Memory):
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
storage = Mem0Storage(type="user", crew=crew)
super().__init__(storage)
super().__init__(storage, memory_verbose=memory_verbose)
def save(
self,
value,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
# TODO: Change this function since we want to take care of the case where we save memories for the usr
data = f"Remember the details about the user: {value}"
super().save(data, metadata)
"""
Save user memory.
Args:
value: The value to save.
metadata: Additional metadata to store with the value.
agent: The agent saving the value, if any.
Raises:
MemoryOperationError: If there's an error saving to memory.
"""
try:
if self.memory_verbose:
self._log_operation("Saving user memory", str(value))
# TODO: Change this function since we want to take care of the case where we save memories for the usr
data = f"Remember the details about the user: {value}"
super().save(data, metadata)
except Exception as e:
if self.memory_verbose:
self._log_operation("Error saving user memory", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "save", self.__class__.__name__)
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
):
results = self.storage.search(
query=query,
limit=limit,
score_threshold=score_threshold,
)
return results
) -> List[Any]:
"""
Search for user memories.
Args:
query: The search query.
limit: Maximum number of results to return.
score_threshold: Minimum similarity score for results.
Returns:
A list of matching user memories.
Raises:
MemoryOperationError: If there's an error searching memory.
"""
try:
if self.memory_verbose:
self._log_operation("Searching user memory", query)
results = self.storage.search(
query=query,
limit=limit,
score_threshold=score_threshold,
)
if self.memory_verbose and results:
self._log_operation("Found", f"{len(results)} results")
return results
except Exception as e:
if self.memory_verbose:
self._log_operation("Error searching user memory", str(e), level="error", color="red")
raise MemoryOperationError(str(e), "search", self.__class__.__name__)

View File

@@ -0,0 +1,147 @@
from unittest.mock import patch, MagicMock
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.memory import Memory, MemoryOperationError
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.task import Task
from crewai.utilities.logger import Logger
def test_memory_verbose_flag_in_crew():
"""Test that memory_verbose flag is correctly set in Crew"""
agent = Agent(
role="Researcher",
goal="Research goal",
backstory="Researcher backstory",
)
task = Task(
description="Test task",
expected_output="Test output",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], memory=True, memory_verbose=True)
assert crew.memory_verbose is True
def test_memory_verbose_logging_in_memory():
"""Test that memory operations are logged when memory_verbose is enabled"""
storage = MagicMock()
mock_logger = MagicMock(spec=Logger)
memory = Memory(storage=storage, memory_verbose=True)
memory._logger = mock_logger
memory.save("test value", {"test": "metadata"}, "test_agent")
mock_logger.log.assert_called_once()
args = mock_logger.log.call_args[0]
assert args[0] == "info"
assert "Saving" in args[1]
mock_logger.log.reset_mock()
memory.search("test query")
assert mock_logger.log.call_count == 2
first_call_args = mock_logger.log.call_args_list[0][0]
assert first_call_args[0] == "info"
assert "Searching" in first_call_args[1]
second_call_args = mock_logger.log.call_args_list[1][0]
assert "Found" in second_call_args[1]
def test_no_logging_when_memory_verbose_disabled():
"""Test that no logging occurs when memory_verbose is disabled"""
storage = MagicMock()
mock_logger = MagicMock(spec=Logger)
memory = Memory(storage=storage, memory_verbose=False)
memory._logger = mock_logger
memory.save("test value", {"test": "metadata"}, "test_agent")
mock_logger.log.assert_not_called()
memory.search("test query")
mock_logger.log.assert_not_called()
def test_memory_verbose_in_short_term_memory():
"""Test that memory_verbose flag is correctly passed to ShortTermMemory"""
with patch('crewai.memory.short_term.short_term_memory.RAGStorage') as mock_storage_class:
mock_storage = MagicMock()
mock_storage_class.return_value = mock_storage
memory = ShortTermMemory(memory_verbose=True)
assert memory.memory_verbose is True
mock_logger = MagicMock()
memory._logger = mock_logger
memory.save("test value", {"test": "metadata"}, "test_agent")
assert mock_logger.log.call_count >= 1
def test_memory_verbose_passed_from_crew_to_memory():
"""Test that memory_verbose flag is correctly passed from Crew to memory instances"""
with patch('crewai.crew.LongTermMemory') as mock_ltm, \
patch('crewai.crew.ShortTermMemory') as mock_stm, \
patch('crewai.crew.EntityMemory') as mock_em, \
patch('crewai.crew.UserMemory') as mock_um:
mock_ltm_instance = MagicMock()
mock_stm_instance = MagicMock()
mock_em_instance = MagicMock()
mock_um_instance = MagicMock()
mock_ltm.return_value = mock_ltm_instance
mock_stm.return_value = mock_stm_instance
mock_em.return_value = mock_em_instance
mock_um.return_value = mock_um_instance
agent = Agent(
role="Researcher",
goal="Research goal",
backstory="Researcher backstory",
)
task = Task(
description="Test task",
expected_output="Test output",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], memory=True, memory_verbose=True, memory_config={})
mock_ltm.assert_called_once_with(memory_verbose=True)
mock_stm.assert_called_with(crew=crew, embedder_config=None, memory_verbose=True)
mock_em.assert_called_with(crew=crew, embedder_config=None, memory_verbose=True)
mock_um.assert_called_with(crew=crew, memory_verbose=True)
def test_memory_verbose_error_handling():
"""Test that memory operations errors are properly handled when memory_verbose is enabled"""
storage = MagicMock()
storage.save.side_effect = Exception("Test error")
storage.search.side_effect = Exception("Test error")
mock_logger = MagicMock()
with patch('crewai.memory.memory.Logger', return_value=mock_logger):
memory = Memory(storage=storage, memory_verbose=True)
with pytest.raises(MemoryOperationError) as exc_info:
memory.save("test value", {"test": "metadata"}, "test_agent")
assert "save" in str(exc_info.value)
assert "Test error" in str(exc_info.value)
assert "Memory" in str(exc_info.value)
with pytest.raises(MemoryOperationError) as exc_info:
memory.search("test query")
assert "search" in str(exc_info.value)
assert "Test error" in str(exc_info.value)