mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-11 00:58:30 +00:00
test: add tests about memory events usage
This commit is contained in:
1969
tests/cassettes/test_memory_events_are_emitted.yaml
Normal file
1969
tests/cassettes/test_memory_events_are_emitted.yaml
Normal file
File diff suppressed because one or more lines are too long
@@ -5,6 +5,7 @@ import json
|
||||
from concurrent.futures import Future
|
||||
from unittest import mock
|
||||
from unittest.mock import ANY, MagicMock, patch
|
||||
from collections import defaultdict
|
||||
|
||||
import pydantic_core
|
||||
import pytest
|
||||
@@ -40,6 +41,16 @@ from crewai.utilities.events.event_listener import EventListener
|
||||
from crewai.utilities.rpm_controller import RPMController
|
||||
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
|
||||
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemorySaveFailedEvent,
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
MemoryQueryFailedEvent,
|
||||
MemoryRetrievalStartedEvent,
|
||||
MemoryRetrievalCompletedEvent,
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def ceo():
|
||||
@@ -2483,6 +2494,73 @@ def test_using_contextual_memory():
|
||||
contextual_mem.assert_called_once()
|
||||
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_memory_events_are_emitted():
|
||||
events = defaultdict(list)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveStartedEvent)
|
||||
def handle_llm_failed(source, event):
|
||||
events["MemorySaveStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
||||
def handle_llm_started(source, event):
|
||||
events["MemorySaveCompletedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveFailedEvent)
|
||||
def handle_llm_completed(source, event):
|
||||
events["MemorySaveFailedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryStartedEvent)
|
||||
def handle_llm_failed(source, event):
|
||||
events["MemoryQueryStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryCompletedEvent)
|
||||
def handle_llm_started(source, event):
|
||||
events["MemoryQueryCompletedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryFailedEvent)
|
||||
def handle_llm_completed(source, event):
|
||||
events["MemoryQueryFailedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryRetrievalStartedEvent)
|
||||
def handle_llm_completed(source, event):
|
||||
events["MemoryRetrievalStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryRetrievalCompletedEvent)
|
||||
def handle_llm_completed(source, event):
|
||||
events["MemoryRetrievalCompletedEvent"].append(event)
|
||||
|
||||
math_researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="You research about math.",
|
||||
backstory="You're an expert in research and you love to learn new things.",
|
||||
allow_delegation=False,
|
||||
)
|
||||
|
||||
task1 = Task(
|
||||
description="Research a topic to teach a kid aged 6 about math.",
|
||||
expected_output="A topic, explanation, angle, and examples.",
|
||||
agent=math_researcher,
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[math_researcher],
|
||||
tasks=[task1],
|
||||
memory=True,
|
||||
)
|
||||
|
||||
crew.kickoff()
|
||||
|
||||
assert len(events["MemorySaveStartedEvent"]) == 6
|
||||
assert len(events["MemorySaveCompletedEvent"]) == 6
|
||||
assert len(events["MemorySaveFailedEvent"]) == 0
|
||||
assert len(events["MemoryQueryStartedEvent"]) == 3
|
||||
assert len(events["MemoryQueryCompletedEvent"]) == 3
|
||||
assert len(events["MemoryQueryFailedEvent"]) == 0
|
||||
assert len(events["MemoryRetrievalStartedEvent"]) == 1
|
||||
assert len(events["MemoryRetrievalCompletedEvent"]) == 1
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_using_contextual_memory_with_long_term_memory():
|
||||
from unittest.mock import patch
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from unittest.mock import MagicMock, patch, ANY
|
||||
from collections import defaultdict
|
||||
from crewai.utilities.events import crewai_event_bus
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
)
|
||||
import pytest
|
||||
from mem0.memory.main import Memory
|
||||
|
||||
@@ -10,7 +17,6 @@ from crewai.memory.external.external_memory_item import ExternalMemoryItem
|
||||
from crewai.memory.storage.interface import Storage
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_mem0_memory():
|
||||
mock_memory = MagicMock(spec=Memory)
|
||||
@@ -188,7 +194,8 @@ def test_crew_external_memory_save_using_crew_without_memory_flag(
|
||||
assert mock_method.call_count > 0
|
||||
|
||||
|
||||
def test_external_memory_custom_storage(crew_with_external_memory):
|
||||
@pytest.fixture
|
||||
def custom_storage():
|
||||
class CustomStorage(Storage):
|
||||
def __init__(self):
|
||||
self.memories = []
|
||||
@@ -203,6 +210,9 @@ def test_external_memory_custom_storage(crew_with_external_memory):
|
||||
self.memories = []
|
||||
|
||||
custom_storage = CustomStorage()
|
||||
return custom_storage
|
||||
|
||||
def test_external_memory_custom_storage(custom_storage, crew_with_external_memory):
|
||||
external_memory = ExternalMemory(storage=custom_storage)
|
||||
|
||||
# by ensuring the crew is set, we can test that the storage is used
|
||||
@@ -221,3 +231,103 @@ def test_external_memory_custom_storage(crew_with_external_memory):
|
||||
external_memory.reset()
|
||||
results = external_memory.search("test")
|
||||
assert len(results) == 0
|
||||
|
||||
|
||||
|
||||
def test_external_memory_search_events(custom_storage, external_memory_with_mocked_config):
|
||||
events = defaultdict(list)
|
||||
|
||||
external_memory_with_mocked_config.storage = custom_storage
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryStartedEvent)
|
||||
def on_search_started(source, event):
|
||||
events["MemoryQueryStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryCompletedEvent)
|
||||
def on_search_completed(source, event):
|
||||
events["MemoryQueryCompletedEvent"].append(event)
|
||||
|
||||
external_memory_with_mocked_config.search(
|
||||
query="test value",
|
||||
limit=3,
|
||||
score_threshold=0.35,
|
||||
)
|
||||
|
||||
assert len(events["MemoryQueryStartedEvent"]) == 1
|
||||
assert len(events["MemoryQueryCompletedEvent"]) == 1
|
||||
assert len(events["MemoryQueryFailedEvent"]) == 0
|
||||
|
||||
assert dict(events["MemoryQueryStartedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_query_started',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'external_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'query': 'test value',
|
||||
'limit': 3,
|
||||
'score_threshold': 0.35
|
||||
}
|
||||
|
||||
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_query_completed',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'external_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'query': 'test value',
|
||||
'results': [],
|
||||
'limit': 3,
|
||||
'score_threshold': 0.35,
|
||||
'query_time_ms': ANY
|
||||
}
|
||||
|
||||
crewai_event_bus._handlers.clear()
|
||||
|
||||
|
||||
def test_external_memory_save_events(custom_storage, external_memory_with_mocked_config):
|
||||
events = defaultdict(list)
|
||||
|
||||
external_memory_with_mocked_config.storage = custom_storage
|
||||
|
||||
@crewai_event_bus.on(MemorySaveStartedEvent)
|
||||
def on_save_started(source, event):
|
||||
events["MemorySaveStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
||||
def on_save_completed(source, event):
|
||||
events["MemorySaveCompletedEvent"].append(event)
|
||||
|
||||
external_memory_with_mocked_config.save(
|
||||
value="saving value",
|
||||
metadata={"task": "test_task"},
|
||||
agent="test_agent",
|
||||
)
|
||||
|
||||
assert len(events["MemorySaveStartedEvent"]) == 1
|
||||
assert len(events["MemorySaveCompletedEvent"]) == 1
|
||||
assert len(events["MemorySaveFailedEvent"]) == 0
|
||||
|
||||
assert dict(events["MemorySaveStartedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_save_started',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'external_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'value': 'saving value',
|
||||
'metadata': {'task': 'test_task'},
|
||||
'agent_role': "test_agent"
|
||||
}
|
||||
|
||||
assert dict(events["MemorySaveCompletedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_save_completed',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'external_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'value': 'saving value',
|
||||
'metadata': {'task': 'test_task', 'agent': 'test_agent'},
|
||||
'agent_role': "test_agent",
|
||||
'save_time_ms': ANY
|
||||
}
|
||||
|
||||
crewai_event_bus._handlers.clear()
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
import pytest
|
||||
|
||||
from unittest.mock import ANY
|
||||
from collections import defaultdict
|
||||
from crewai.utilities.events import crewai_event_bus
|
||||
from crewai.memory.long_term.long_term_memory import LongTermMemory
|
||||
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
|
||||
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def long_term_memory():
|
||||
@@ -10,6 +17,104 @@ def long_term_memory():
|
||||
return LongTermMemory()
|
||||
|
||||
|
||||
def test_long_term_memory_save_events(long_term_memory):
|
||||
events = defaultdict(list)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveStartedEvent)
|
||||
def on_save_started(source, event):
|
||||
events["MemorySaveStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
||||
def on_save_completed(source, event):
|
||||
events["MemorySaveCompletedEvent"].append(event)
|
||||
|
||||
memory = LongTermMemoryItem(
|
||||
agent="test_agent",
|
||||
task="test_task",
|
||||
expected_output="test_output",
|
||||
datetime="test_datetime",
|
||||
quality=0.5,
|
||||
metadata={"task": "test_task", "quality": 0.5},
|
||||
)
|
||||
long_term_memory.save(memory)
|
||||
|
||||
assert len(events["MemorySaveStartedEvent"]) == 1
|
||||
assert len(events["MemorySaveCompletedEvent"]) == 1
|
||||
assert len(events["MemorySaveFailedEvent"]) == 0
|
||||
|
||||
assert dict(events["MemorySaveStartedEvent"][0]) == {
|
||||
"timestamp": ANY,
|
||||
"type": "memory_save_started",
|
||||
"source_fingerprint": None,
|
||||
"source_type": "long_term_memory",
|
||||
"fingerprint_metadata": None,
|
||||
"value": "test_task",
|
||||
"metadata": {"task": "test_task", "quality": 0.5},
|
||||
"agent_role": "test_agent",
|
||||
}
|
||||
assert dict(events["MemorySaveCompletedEvent"][0]) == {
|
||||
"timestamp": ANY,
|
||||
"type": "memory_save_completed",
|
||||
"source_fingerprint": None,
|
||||
"source_type": "long_term_memory",
|
||||
"fingerprint_metadata": None,
|
||||
"value": "test_task",
|
||||
"metadata": {"task": "test_task", "quality": 0.5, "agent": "test_agent", "expected_output": "test_output"},
|
||||
"agent_role": "test_agent",
|
||||
"save_time_ms": ANY,
|
||||
}
|
||||
|
||||
crewai_event_bus._handlers.clear()
|
||||
|
||||
|
||||
def test_long_term_memory_search_events(long_term_memory):
|
||||
events = defaultdict(list)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryStartedEvent)
|
||||
def on_search_started(source, event):
|
||||
events["MemoryQueryStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryCompletedEvent)
|
||||
def on_search_completed(source, event):
|
||||
events["MemoryQueryCompletedEvent"].append(event)
|
||||
|
||||
test_query = "test query"
|
||||
|
||||
long_term_memory.search(
|
||||
test_query,
|
||||
latest_n=5
|
||||
)
|
||||
|
||||
assert len(events["MemoryQueryStartedEvent"]) == 1
|
||||
assert len(events["MemoryQueryCompletedEvent"]) == 1
|
||||
assert len(events["MemoryQueryFailedEvent"]) == 0
|
||||
|
||||
assert dict(events["MemoryQueryStartedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_query_started',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'long_term_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'query': 'test query',
|
||||
'limit': 5,
|
||||
'score_threshold': None
|
||||
}
|
||||
|
||||
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_query_completed',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'long_term_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'query': 'test query',
|
||||
'results': None,
|
||||
'limit': 5,
|
||||
'score_threshold': None,
|
||||
'query_time_ms': ANY
|
||||
}
|
||||
|
||||
crewai_event_bus._handlers.clear()
|
||||
|
||||
def test_save_and_search(long_term_memory):
|
||||
memory = LongTermMemoryItem(
|
||||
agent="test_agent",
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
from unittest.mock import patch, ANY
|
||||
from collections import defaultdict
|
||||
import pytest
|
||||
|
||||
from crewai.agent import Agent
|
||||
@@ -7,6 +7,13 @@ from crewai.crew import Crew
|
||||
from crewai.memory.short_term.short_term_memory import ShortTermMemory
|
||||
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
|
||||
from crewai.task import Task
|
||||
from crewai.utilities.events import crewai_event_bus
|
||||
from crewai.utilities.events.memory_events import (
|
||||
MemorySaveStartedEvent,
|
||||
MemorySaveCompletedEvent,
|
||||
MemoryQueryStartedEvent,
|
||||
MemoryQueryCompletedEvent,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -28,6 +35,101 @@ def short_term_memory():
|
||||
return ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
|
||||
|
||||
|
||||
def test_short_term_memory_search_events(short_term_memory):
|
||||
events = defaultdict(list)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryStartedEvent)
|
||||
def on_search_started(source, event):
|
||||
events["MemoryQueryStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemoryQueryCompletedEvent)
|
||||
def on_search_completed(source, event):
|
||||
events["MemoryQueryCompletedEvent"].append(event)
|
||||
|
||||
# Call the save method
|
||||
short_term_memory.search(
|
||||
query="test value",
|
||||
limit=3,
|
||||
score_threshold=0.35,
|
||||
)
|
||||
|
||||
assert len(events["MemoryQueryStartedEvent"]) == 1
|
||||
assert len(events["MemoryQueryCompletedEvent"]) == 1
|
||||
assert len(events["MemoryQueryFailedEvent"]) == 0
|
||||
|
||||
assert dict(events["MemoryQueryStartedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_query_started',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'short_term_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'query': 'test value',
|
||||
'limit': 3,
|
||||
'score_threshold': 0.35
|
||||
}
|
||||
|
||||
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_query_completed',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'short_term_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'query': 'test value',
|
||||
'results': [],
|
||||
'limit': 3,
|
||||
'score_threshold': 0.35,
|
||||
'query_time_ms': ANY
|
||||
}
|
||||
|
||||
crewai_event_bus._handlers.clear()
|
||||
|
||||
|
||||
def test_short_term_memory_save_events(short_term_memory):
|
||||
events = defaultdict(list)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveStartedEvent)
|
||||
def on_save_started(source, event):
|
||||
events["MemorySaveStartedEvent"].append(event)
|
||||
|
||||
@crewai_event_bus.on(MemorySaveCompletedEvent)
|
||||
def on_save_completed(source, event):
|
||||
events["MemorySaveCompletedEvent"].append(event)
|
||||
|
||||
short_term_memory.save(
|
||||
value="test value",
|
||||
metadata={"task": "test_task"},
|
||||
agent="test_agent",
|
||||
)
|
||||
|
||||
assert len(events["MemorySaveStartedEvent"]) == 1
|
||||
assert len(events["MemorySaveCompletedEvent"]) == 1
|
||||
assert len(events["MemorySaveFailedEvent"]) == 0
|
||||
|
||||
assert dict(events["MemorySaveStartedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_save_started',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'short_term_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'value': 'test value',
|
||||
'metadata': {'task': 'test_task'},
|
||||
'agent_role': "test_agent"
|
||||
}
|
||||
|
||||
assert dict(events["MemorySaveCompletedEvent"][0]) == {
|
||||
'timestamp': ANY,
|
||||
'type': 'memory_save_completed',
|
||||
'source_fingerprint': None,
|
||||
'source_type': 'short_term_memory',
|
||||
'fingerprint_metadata': None,
|
||||
'value': 'test value',
|
||||
'metadata': {'task': 'test_task', 'agent': 'test_agent'},
|
||||
'agent_role': "test_agent",
|
||||
'save_time_ms': ANY
|
||||
}
|
||||
|
||||
crewai_event_bus._handlers.clear()
|
||||
|
||||
def test_save_and_search(short_term_memory):
|
||||
memory = ShortTermMemoryItem(
|
||||
data="""test value test value test value test value test value test value
|
||||
|
||||
Reference in New Issue
Block a user