Introduce MemoryEvents to monitor their usage (#3098)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

* feat: emit events about memory usage

* test: add tests about memory events usage

* fixed linter issues

* test: use scoped_handlers while listener Memory events
This commit is contained in:
Lucas Gomide
2025-07-01 23:50:39 -03:00
committed by GitHub
parent 640e1a7bc2
commit ab39753a75
17 changed files with 3264 additions and 40 deletions

View File

@@ -1,5 +1,12 @@
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, patch, ANY
from collections import defaultdict
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
)
import pytest
from mem0.memory.main import Memory
@@ -10,7 +17,6 @@ from crewai.memory.external.external_memory_item import ExternalMemoryItem
from crewai.memory.storage.interface import Storage
from crewai.task import Task
@pytest.fixture
def mock_mem0_memory():
mock_memory = MagicMock(spec=Memory)
@@ -188,7 +194,8 @@ def test_crew_external_memory_save_using_crew_without_memory_flag(
assert mock_method.call_count > 0
def test_external_memory_custom_storage(crew_with_external_memory):
@pytest.fixture
def custom_storage():
class CustomStorage(Storage):
def __init__(self):
self.memories = []
@@ -203,6 +210,9 @@ def test_external_memory_custom_storage(crew_with_external_memory):
self.memories = []
custom_storage = CustomStorage()
return custom_storage
def test_external_memory_custom_storage(custom_storage, crew_with_external_memory):
external_memory = ExternalMemory(storage=custom_storage)
# by ensuring the crew is set, we can test that the storage is used
@@ -221,3 +231,101 @@ def test_external_memory_custom_storage(crew_with_external_memory):
external_memory.reset()
results = external_memory.search("test")
assert len(results) == 0
def test_external_memory_search_events(custom_storage, external_memory_with_mocked_config):
events = defaultdict(list)
external_memory_with_mocked_config.storage = custom_storage
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemoryQueryStartedEvent)
def on_search_started(source, event):
events["MemoryQueryStartedEvent"].append(event)
@crewai_event_bus.on(MemoryQueryCompletedEvent)
def on_search_completed(source, event):
events["MemoryQueryCompletedEvent"].append(event)
external_memory_with_mocked_config.search(
query="test value",
limit=3,
score_threshold=0.35,
)
assert len(events["MemoryQueryStartedEvent"]) == 1
assert len(events["MemoryQueryCompletedEvent"]) == 1
assert len(events["MemoryQueryFailedEvent"]) == 0
assert dict(events["MemoryQueryStartedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_started',
'source_fingerprint': None,
'source_type': 'external_memory',
'fingerprint_metadata': None,
'query': 'test value',
'limit': 3,
'score_threshold': 0.35
}
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_completed',
'source_fingerprint': None,
'source_type': 'external_memory',
'fingerprint_metadata': None,
'query': 'test value',
'results': [],
'limit': 3,
'score_threshold': 0.35,
'query_time_ms': ANY
}
def test_external_memory_save_events(custom_storage, external_memory_with_mocked_config):
events = defaultdict(list)
external_memory_with_mocked_config.storage = custom_storage
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_save_started(source, event):
events["MemorySaveStartedEvent"].append(event)
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_save_completed(source, event):
events["MemorySaveCompletedEvent"].append(event)
external_memory_with_mocked_config.save(
value="saving value",
metadata={"task": "test_task"},
agent="test_agent",
)
assert len(events["MemorySaveStartedEvent"]) == 1
assert len(events["MemorySaveCompletedEvent"]) == 1
assert len(events["MemorySaveFailedEvent"]) == 0
assert dict(events["MemorySaveStartedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_save_started',
'source_fingerprint': None,
'source_type': 'external_memory',
'fingerprint_metadata': None,
'value': 'saving value',
'metadata': {'task': 'test_task'},
'agent_role': "test_agent"
}
assert dict(events["MemorySaveCompletedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_save_completed',
'source_fingerprint': None,
'source_type': 'external_memory',
'fingerprint_metadata': None,
'value': 'saving value',
'metadata': {'task': 'test_task', 'agent': 'test_agent'},
'agent_role': "test_agent",
'save_time_ms': ANY
}

View File

@@ -1,8 +1,15 @@
import pytest
from unittest.mock import ANY
from collections import defaultdict
from crewai.utilities.events import crewai_event_bus
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.utilities.events.memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
)
@pytest.fixture
def long_term_memory():
@@ -10,6 +17,103 @@ def long_term_memory():
return LongTermMemory()
def test_long_term_memory_save_events(long_term_memory):
events = defaultdict(list)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_save_started(source, event):
events["MemorySaveStartedEvent"].append(event)
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_save_completed(source, event):
events["MemorySaveCompletedEvent"].append(event)
memory = LongTermMemoryItem(
agent="test_agent",
task="test_task",
expected_output="test_output",
datetime="test_datetime",
quality=0.5,
metadata={"task": "test_task", "quality": 0.5},
)
long_term_memory.save(memory)
assert len(events["MemorySaveStartedEvent"]) == 1
assert len(events["MemorySaveCompletedEvent"]) == 1
assert len(events["MemorySaveFailedEvent"]) == 0
assert dict(events["MemorySaveStartedEvent"][0]) == {
"timestamp": ANY,
"type": "memory_save_started",
"source_fingerprint": None,
"source_type": "long_term_memory",
"fingerprint_metadata": None,
"value": "test_task",
"metadata": {"task": "test_task", "quality": 0.5},
"agent_role": "test_agent",
}
assert dict(events["MemorySaveCompletedEvent"][0]) == {
"timestamp": ANY,
"type": "memory_save_completed",
"source_fingerprint": None,
"source_type": "long_term_memory",
"fingerprint_metadata": None,
"value": "test_task",
"metadata": {"task": "test_task", "quality": 0.5, "agent": "test_agent", "expected_output": "test_output"},
"agent_role": "test_agent",
"save_time_ms": ANY,
}
def test_long_term_memory_search_events(long_term_memory):
events = defaultdict(list)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemoryQueryStartedEvent)
def on_search_started(source, event):
events["MemoryQueryStartedEvent"].append(event)
@crewai_event_bus.on(MemoryQueryCompletedEvent)
def on_search_completed(source, event):
events["MemoryQueryCompletedEvent"].append(event)
test_query = "test query"
long_term_memory.search(
test_query,
latest_n=5
)
assert len(events["MemoryQueryStartedEvent"]) == 1
assert len(events["MemoryQueryCompletedEvent"]) == 1
assert len(events["MemoryQueryFailedEvent"]) == 0
assert dict(events["MemoryQueryStartedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_started',
'source_fingerprint': None,
'source_type': 'long_term_memory',
'fingerprint_metadata': None,
'query': 'test query',
'limit': 5,
'score_threshold': None
}
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_completed',
'source_fingerprint': None,
'source_type': 'long_term_memory',
'fingerprint_metadata': None,
'query': 'test query',
'results': None,
'limit': 5,
'score_threshold': None,
'query_time_ms': ANY
}
def test_save_and_search(long_term_memory):
memory = LongTermMemoryItem(
agent="test_agent",

View File

@@ -1,5 +1,5 @@
from unittest.mock import patch
from unittest.mock import patch, ANY
from collections import defaultdict
import pytest
from crewai.agent import Agent
@@ -7,6 +7,13 @@ from crewai.crew import Crew
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.task import Task
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
)
@pytest.fixture
@@ -28,6 +35,98 @@ def short_term_memory():
return ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
def test_short_term_memory_search_events(short_term_memory):
events = defaultdict(list)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemoryQueryStartedEvent)
def on_search_started(source, event):
events["MemoryQueryStartedEvent"].append(event)
@crewai_event_bus.on(MemoryQueryCompletedEvent)
def on_search_completed(source, event):
events["MemoryQueryCompletedEvent"].append(event)
# Call the save method
short_term_memory.search(
query="test value",
limit=3,
score_threshold=0.35,
)
assert len(events["MemoryQueryStartedEvent"]) == 1
assert len(events["MemoryQueryCompletedEvent"]) == 1
assert len(events["MemoryQueryFailedEvent"]) == 0
assert dict(events["MemoryQueryStartedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_started',
'source_fingerprint': None,
'source_type': 'short_term_memory',
'fingerprint_metadata': None,
'query': 'test value',
'limit': 3,
'score_threshold': 0.35
}
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_completed',
'source_fingerprint': None,
'source_type': 'short_term_memory',
'fingerprint_metadata': None,
'query': 'test value',
'results': [],
'limit': 3,
'score_threshold': 0.35,
'query_time_ms': ANY
}
def test_short_term_memory_save_events(short_term_memory):
events = defaultdict(list)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_save_started(source, event):
events["MemorySaveStartedEvent"].append(event)
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_save_completed(source, event):
events["MemorySaveCompletedEvent"].append(event)
short_term_memory.save(
value="test value",
metadata={"task": "test_task"},
agent="test_agent",
)
assert len(events["MemorySaveStartedEvent"]) == 1
assert len(events["MemorySaveCompletedEvent"]) == 1
assert len(events["MemorySaveFailedEvent"]) == 0
assert dict(events["MemorySaveStartedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_save_started',
'source_fingerprint': None,
'source_type': 'short_term_memory',
'fingerprint_metadata': None,
'value': 'test value',
'metadata': {'task': 'test_task'},
'agent_role': "test_agent"
}
assert dict(events["MemorySaveCompletedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_save_completed',
'source_fingerprint': None,
'source_type': 'short_term_memory',
'fingerprint_metadata': None,
'value': 'test value',
'metadata': {'task': 'test_task', 'agent': 'test_agent'},
'agent_role': "test_agent",
'save_time_ms': ANY
}
def test_save_and_search(short_term_memory):
memory = ShortTermMemoryItem(
data="""test value test value test value test value test value test value