Compare commits

...

4 Commits

Author SHA1 Message Date
Lucas Gomide
174215980b test: use scoped_handlers while listener Memory events 2025-07-01 23:30:35 -03:00
Lucas Gomide
15c5654a70 fixed linter issues 2025-07-01 23:30:35 -03:00
Lucas Gomide
41906badd7 test: add tests about memory events usage 2025-07-01 23:30:35 -03:00
Lucas Gomide
071bc4faea feat: emit events about memory usage 2025-07-01 23:30:35 -03:00
17 changed files with 3264 additions and 40 deletions

View File

@@ -1,5 +1,6 @@
import shutil
import subprocess
import time
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Type, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -32,6 +33,10 @@ from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
)
from crewai.utilities.events.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
@@ -302,6 +307,15 @@ class Agent(BaseAgent):
)
if self._is_any_available_memory():
crewai_event_bus.emit(
self,
event=MemoryRetrievalStartedEvent(
task_id=str(task.id) if task else None,
source_type="agent",
),
)
start_time = time.time()
contextual_memory = ContextualMemory(
self.crew.memory_config,
self.crew._short_term_memory,
@@ -313,6 +327,16 @@ class Agent(BaseAgent):
memory = contextual_memory.build_context_for_task(task, context)
if memory.strip() != "":
task_prompt += self.i18n.slice("memory").format(memory=memory)
crewai_event_bus.emit(
self,
event=MemoryRetrievalCompletedEvent(
task_id=str(task.id) if task else None,
memory_content=memory,
retrieval_time_ms=(time.time() - start_time) * 1000,
source_type="agent",
),
)
knowledge_config = (
self.knowledge_config.model_dump() if self.knowledge_config else {}
)

View File

@@ -1,10 +1,20 @@
from typing import Optional
import time
from pydantic import PrivateAttr
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
class EntityMemory(Memory):
@@ -48,16 +58,96 @@ class EntityMemory(Memory):
def save(self, item: EntityMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
"""Saves an entity item into the SQLite storage."""
if self._memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
metadata=item.metadata,
source_type="entity_memory",
),
)
start_time = time.time()
try:
if self._memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
# Emit memory save completed event
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=data,
metadata=item.metadata,
save_time_ms=(time.time() - start_time) * 1000,
source_type="entity_memory",
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=MemorySaveFailedEvent(
metadata=item.metadata,
error=str(e),
source_type="entity_memory",
),
)
raise
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
):
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
source_type="entity_memory",
),
)
start_time = time.time()
try:
results = super().search(
query=query, limit=limit, score_threshold=score_threshold
)
crewai_event_bus.emit(
self,
event=MemoryQueryCompletedEvent(
query=query,
results=results,
limit=limit,
score_threshold=score_threshold,
query_time_ms=(time.time() - start_time) * 1000,
source_type="entity_memory",
),
)
return results
except Exception as e:
crewai_event_bus.emit(
self,
event=MemoryQueryFailedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
error=str(e),
source_type="entity_memory",
),
)
raise
def reset(self) -> None:
try:

View File

@@ -1,8 +1,18 @@
from typing import TYPE_CHECKING, Any, Dict, Optional
import time
from crewai.memory.external.external_memory_item import ExternalMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.storage.interface import Storage
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
if TYPE_CHECKING:
from crewai.memory.storage.mem0_storage import Mem0Storage
@@ -46,8 +56,91 @@ class ExternalMemory(Memory):
agent: Optional[str] = None,
) -> None:
"""Saves a value into the external storage."""
item = ExternalMemoryItem(value=value, metadata=metadata, agent=agent)
super().save(value=item.value, metadata=item.metadata, agent=item.agent)
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
value=value,
metadata=metadata,
agent_role=agent,
source_type="external_memory",
),
)
start_time = time.time()
try:
item = ExternalMemoryItem(value=value, metadata=metadata, agent=agent)
super().save(value=item.value, metadata=item.metadata, agent=item.agent)
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=value,
metadata=metadata,
agent_role=agent,
save_time_ms=(time.time() - start_time) * 1000,
source_type="external_memory",
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=MemorySaveFailedEvent(
value=value,
metadata=metadata,
agent_role=agent,
error=str(e),
source_type="external_memory",
),
)
raise
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
):
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
source_type="external_memory",
),
)
start_time = time.time()
try:
results = super().search(
query=query, limit=limit, score_threshold=score_threshold
)
crewai_event_bus.emit(
self,
event=MemoryQueryCompletedEvent(
query=query,
results=results,
limit=limit,
score_threshold=score_threshold,
query_time_ms=(time.time() - start_time) * 1000,
source_type="external_memory",
),
)
return results
except Exception as e:
crewai_event_bus.emit(
self,
event=MemoryQueryFailedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
error=str(e),
source_type="external_memory",
),
)
raise
def reset(self) -> None:
self.storage.reset()

View File

@@ -1,7 +1,17 @@
from typing import Any, Dict, List
import time
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.memory import Memory
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage
@@ -20,17 +30,87 @@ class LongTermMemory(Memory):
super().__init__(storage=storage)
def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
metadata = item.metadata
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
task_description=item.task,
score=metadata["quality"],
metadata=metadata,
datetime=item.datetime,
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
value=item.task,
metadata=item.metadata,
agent_role=item.agent,
source_type="long_term_memory",
),
)
start_time = time.time()
try:
metadata = item.metadata
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
task_description=item.task,
score=metadata["quality"],
metadata=metadata,
datetime=item.datetime,
)
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=item.task,
metadata=item.metadata,
agent_role=item.agent,
save_time_ms=(time.time() - start_time) * 1000,
source_type="long_term_memory",
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=MemorySaveFailedEvent(
value=item.task,
metadata=item.metadata,
agent_role=item.agent,
error=str(e),
source_type="long_term_memory",
),
)
raise
def search(self, task: str, latest_n: int = 3) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory"
return self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(
query=task,
limit=latest_n,
source_type="long_term_memory",
),
)
start_time = time.time()
try:
results = self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
crewai_event_bus.emit(
self,
event=MemoryQueryCompletedEvent(
query=task,
results=results,
limit=latest_n,
query_time_ms=(time.time() - start_time) * 1000,
source_type="long_term_memory",
),
)
return results
except Exception as e:
crewai_event_bus.emit(
self,
event=MemoryQueryFailedEvent(
query=task,
limit=latest_n,
error=str(e),
source_type="long_term_memory",
),
)
raise
def reset(self) -> None:
self.storage.reset()

View File

@@ -1,10 +1,20 @@
from typing import Any, Dict, Optional
import time
from pydantic import PrivateAttr
from crewai.memory.memory import Memory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
class ShortTermMemory(Memory):
@@ -52,11 +62,46 @@ class ShortTermMemory(Memory):
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if self._memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
value=value,
metadata=metadata,
agent_role=agent,
source_type="short_term_memory",
),
)
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
start_time = time.time()
try:
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if self._memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=value,
metadata=metadata,
agent_role=agent,
save_time_ms=(time.time() - start_time) * 1000,
source_type="short_term_memory",
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=MemorySaveFailedEvent(
value=value,
metadata=metadata,
agent_role=agent,
error=str(e),
source_type="short_term_memory",
),
)
raise
def search(
self,
@@ -64,9 +109,47 @@ class ShortTermMemory(Memory):
limit: int = 3,
score_threshold: float = 0.35,
):
return self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
) # type: ignore # BUG? The reference is to the parent class, but the parent class does not have this parameters
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
source_type="short_term_memory",
),
)
start_time = time.time()
try:
results = self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
) # type: ignore # BUG? The reference is to the parent class, but the parent class does not have this parameters
crewai_event_bus.emit(
self,
event=MemoryQueryCompletedEvent(
query=query,
results=results,
limit=limit,
score_threshold=score_threshold,
query_time_ms=(time.time() - start_time) * 1000,
source_type="short_term_memory",
),
)
return results
except Exception as e:
crewai_event_bus.emit(
self,
event=MemoryQueryFailedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
error=str(e),
source_type="short_term_memory",
),
)
raise
def reset(self) -> None:
try:

View File

@@ -51,6 +51,71 @@ from .llm_events import (
LLMStreamChunkEvent,
)
from .memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
)
# events
from .event_listener import EventListener
from .third_party.agentops_listener import agentops_listener
__all__ = [
"EventListener",
"agentops_listener",
"CrewAIEventsBus",
"crewai_event_bus",
"AgentExecutionStartedEvent",
"AgentExecutionCompletedEvent",
"AgentExecutionErrorEvent",
"TaskStartedEvent",
"TaskCompletedEvent",
"TaskFailedEvent",
"TaskEvaluationEvent",
"FlowCreatedEvent",
"FlowStartedEvent",
"FlowFinishedEvent",
"FlowPlotEvent",
"MethodExecutionStartedEvent",
"MethodExecutionFinishedEvent",
"MethodExecutionFailedEvent",
"LLMCallCompletedEvent",
"LLMCallFailedEvent",
"LLMCallStartedEvent",
"LLMCallType",
"LLMStreamChunkEvent",
"MemorySaveStartedEvent",
"MemorySaveCompletedEvent",
"MemorySaveFailedEvent",
"MemoryQueryStartedEvent",
"MemoryQueryCompletedEvent",
"MemoryQueryFailedEvent",
"MemoryRetrievalStartedEvent",
"MemoryRetrievalCompletedEvent",
"EventListener",
"agentops_listener",
"CrewKickoffStartedEvent",
"CrewKickoffCompletedEvent",
"CrewKickoffFailedEvent",
"CrewTrainStartedEvent",
"CrewTrainCompletedEvent",
"CrewTrainFailedEvent",
"CrewTestStartedEvent",
"CrewTestCompletedEvent",
"CrewTestFailedEvent",
"LLMGuardrailCompletedEvent",
"LLMGuardrailStartedEvent",
"ToolUsageFinishedEvent",
"ToolUsageErrorEvent",
"ToolUsageStartedEvent",
"ToolExecutionErrorEvent",
"ToolSelectionErrorEvent",
"ToolUsageEvent",
"ToolValidateInputErrorEvent",
]

View File

@@ -12,7 +12,7 @@ class BaseEvent(BaseModel):
timestamp: datetime = Field(default_factory=datetime.now)
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew"
source_type: Optional[str] = None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"
fingerprint_metadata: Optional[Dict[str, Any]] = None # Any relevant metadata
def to_json(self, exclude: set[str] | None = None):

View File

@@ -57,6 +57,17 @@ from .knowledge_events import (
KnowledgeSearchQueryFailedEvent,
)
from .memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
)
EventTypes = Union[
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
@@ -96,4 +107,12 @@ EventTypes = Union[
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeSearchQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
]

View File

@@ -0,0 +1,78 @@
from typing import Any, Dict, Optional
from crewai.utilities.events.base_events import BaseEvent
class MemoryQueryStartedEvent(BaseEvent):
"""Event emitted when a memory query is started"""
type: str = "memory_query_started"
query: str
limit: int
score_threshold: Optional[float] = None
class MemoryQueryCompletedEvent(BaseEvent):
"""Event emitted when a memory query is completed successfully"""
type: str = "memory_query_completed"
query: str
results: Any
limit: int
score_threshold: Optional[float] = None
query_time_ms: float
class MemoryQueryFailedEvent(BaseEvent):
"""Event emitted when a memory query fails"""
type: str = "memory_query_failed"
query: str
limit: int
score_threshold: Optional[float] = None
error: str
class MemorySaveStartedEvent(BaseEvent):
"""Event emitted when a memory save operation is started"""
type: str = "memory_save_started"
value: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
agent_role: Optional[str] = None
class MemorySaveCompletedEvent(BaseEvent):
"""Event emitted when a memory save operation is completed successfully"""
type: str = "memory_save_completed"
value: str
metadata: Optional[Dict[str, Any]] = None
agent_role: Optional[str] = None
save_time_ms: float
class MemorySaveFailedEvent(BaseEvent):
"""Event emitted when a memory save operation fails"""
type: str = "memory_save_failed"
value: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
agent_role: Optional[str] = None
error: str
class MemoryRetrievalStartedEvent(BaseEvent):
"""Event emitted when memory retrieval for a task prompt starts"""
type: str = "memory_retrieval_started"
task_id: Optional[str] = None
class MemoryRetrievalCompletedEvent(BaseEvent):
"""Event emitted when memory retrieval for a task prompt completes successfully"""
type: str = "memory_retrieval_completed"
task_id: Optional[str] = None
memory_content: str
retrieval_time_ms: float

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -5,6 +5,7 @@ import json
from concurrent.futures import Future
from unittest import mock
from unittest.mock import ANY, MagicMock, patch
from collections import defaultdict
import pydantic_core
import pytest
@@ -40,6 +41,16 @@ from crewai.utilities.events.event_listener import EventListener
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.events.memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
)
@pytest.fixture
def ceo():
@@ -2478,11 +2489,79 @@ def test_using_contextual_memory():
memory=True,
)
with patch.object(ContextualMemory, "build_context_for_task") as contextual_mem:
with patch.object(ContextualMemory, "build_context_for_task", return_value="") as contextual_mem:
crew.kickoff()
contextual_mem.assert_called_once()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_memory_events_are_emitted():
events = defaultdict(list)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemorySaveStartedEvent)
def handle_memory_save_started(source, event):
events["MemorySaveStartedEvent"].append(event)
@crewai_event_bus.on(MemorySaveCompletedEvent)
def handle_memory_save_completed(source, event):
events["MemorySaveCompletedEvent"].append(event)
@crewai_event_bus.on(MemorySaveFailedEvent)
def handle_memory_save_failed(source, event):
events["MemorySaveFailedEvent"].append(event)
@crewai_event_bus.on(MemoryQueryStartedEvent)
def handle_memory_query_started(source, event):
events["MemoryQueryStartedEvent"].append(event)
@crewai_event_bus.on(MemoryQueryCompletedEvent)
def handle_memory_query_completed(source, event):
events["MemoryQueryCompletedEvent"].append(event)
@crewai_event_bus.on(MemoryQueryFailedEvent)
def handle_memory_query_failed(source, event):
events["MemoryQueryFailedEvent"].append(event)
@crewai_event_bus.on(MemoryRetrievalStartedEvent)
def handle_memory_retrieval_started(source, event):
events["MemoryRetrievalStartedEvent"].append(event)
@crewai_event_bus.on(MemoryRetrievalCompletedEvent)
def handle_memory_retrieval_completed(source, event):
events["MemoryRetrievalCompletedEvent"].append(event)
math_researcher = Agent(
role="Researcher",
goal="You research about math.",
backstory="You're an expert in research and you love to learn new things.",
allow_delegation=False,
)
task1 = Task(
description="Research a topic to teach a kid aged 6 about math.",
expected_output="A topic, explanation, angle, and examples.",
agent=math_researcher,
)
crew = Crew(
agents=[math_researcher],
tasks=[task1],
memory=True,
)
crew.kickoff()
assert len(events["MemorySaveStartedEvent"]) == 6
assert len(events["MemorySaveCompletedEvent"]) == 6
assert len(events["MemorySaveFailedEvent"]) == 0
assert len(events["MemoryQueryStartedEvent"]) == 3
assert len(events["MemoryQueryCompletedEvent"]) == 3
assert len(events["MemoryQueryFailedEvent"]) == 0
assert len(events["MemoryRetrievalStartedEvent"]) == 1
assert len(events["MemoryRetrievalCompletedEvent"]) == 1
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory_with_long_term_memory():
from unittest.mock import patch
@@ -2506,7 +2585,7 @@ def test_using_contextual_memory_with_long_term_memory():
long_term_memory=LongTermMemory(),
)
with patch.object(ContextualMemory, "build_context_for_task") as contextual_mem:
with patch.object(ContextualMemory, "build_context_for_task", return_value="") as contextual_mem:
crew.kickoff()
contextual_mem.assert_called_once()
assert crew.memory is False
@@ -2607,7 +2686,7 @@ def test_using_contextual_memory_with_short_term_memory():
short_term_memory=ShortTermMemory(),
)
with patch.object(ContextualMemory, "build_context_for_task") as contextual_mem:
with patch.object(ContextualMemory, "build_context_for_task", return_value="") as contextual_mem:
crew.kickoff()
contextual_mem.assert_called_once()
assert crew.memory is False
@@ -2636,7 +2715,7 @@ def test_disabled_memory_using_contextual_memory():
memory=False,
)
with patch.object(ContextualMemory, "build_context_for_task") as contextual_mem:
with patch.object(ContextualMemory, "build_context_for_task", return_value="") as contextual_mem:
crew.kickoff()
contextual_mem.assert_not_called()

View File

@@ -1,5 +1,12 @@
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, patch, ANY
from collections import defaultdict
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
)
import pytest
from mem0.memory.main import Memory
@@ -10,7 +17,6 @@ from crewai.memory.external.external_memory_item import ExternalMemoryItem
from crewai.memory.storage.interface import Storage
from crewai.task import Task
@pytest.fixture
def mock_mem0_memory():
mock_memory = MagicMock(spec=Memory)
@@ -188,7 +194,8 @@ def test_crew_external_memory_save_using_crew_without_memory_flag(
assert mock_method.call_count > 0
def test_external_memory_custom_storage(crew_with_external_memory):
@pytest.fixture
def custom_storage():
class CustomStorage(Storage):
def __init__(self):
self.memories = []
@@ -203,6 +210,9 @@ def test_external_memory_custom_storage(crew_with_external_memory):
self.memories = []
custom_storage = CustomStorage()
return custom_storage
def test_external_memory_custom_storage(custom_storage, crew_with_external_memory):
external_memory = ExternalMemory(storage=custom_storage)
# by ensuring the crew is set, we can test that the storage is used
@@ -221,3 +231,101 @@ def test_external_memory_custom_storage(crew_with_external_memory):
external_memory.reset()
results = external_memory.search("test")
assert len(results) == 0
def test_external_memory_search_events(custom_storage, external_memory_with_mocked_config):
events = defaultdict(list)
external_memory_with_mocked_config.storage = custom_storage
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemoryQueryStartedEvent)
def on_search_started(source, event):
events["MemoryQueryStartedEvent"].append(event)
@crewai_event_bus.on(MemoryQueryCompletedEvent)
def on_search_completed(source, event):
events["MemoryQueryCompletedEvent"].append(event)
external_memory_with_mocked_config.search(
query="test value",
limit=3,
score_threshold=0.35,
)
assert len(events["MemoryQueryStartedEvent"]) == 1
assert len(events["MemoryQueryCompletedEvent"]) == 1
assert len(events["MemoryQueryFailedEvent"]) == 0
assert dict(events["MemoryQueryStartedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_started',
'source_fingerprint': None,
'source_type': 'external_memory',
'fingerprint_metadata': None,
'query': 'test value',
'limit': 3,
'score_threshold': 0.35
}
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_completed',
'source_fingerprint': None,
'source_type': 'external_memory',
'fingerprint_metadata': None,
'query': 'test value',
'results': [],
'limit': 3,
'score_threshold': 0.35,
'query_time_ms': ANY
}
def test_external_memory_save_events(custom_storage, external_memory_with_mocked_config):
events = defaultdict(list)
external_memory_with_mocked_config.storage = custom_storage
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_save_started(source, event):
events["MemorySaveStartedEvent"].append(event)
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_save_completed(source, event):
events["MemorySaveCompletedEvent"].append(event)
external_memory_with_mocked_config.save(
value="saving value",
metadata={"task": "test_task"},
agent="test_agent",
)
assert len(events["MemorySaveStartedEvent"]) == 1
assert len(events["MemorySaveCompletedEvent"]) == 1
assert len(events["MemorySaveFailedEvent"]) == 0
assert dict(events["MemorySaveStartedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_save_started',
'source_fingerprint': None,
'source_type': 'external_memory',
'fingerprint_metadata': None,
'value': 'saving value',
'metadata': {'task': 'test_task'},
'agent_role': "test_agent"
}
assert dict(events["MemorySaveCompletedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_save_completed',
'source_fingerprint': None,
'source_type': 'external_memory',
'fingerprint_metadata': None,
'value': 'saving value',
'metadata': {'task': 'test_task', 'agent': 'test_agent'},
'agent_role': "test_agent",
'save_time_ms': ANY
}

View File

@@ -1,8 +1,15 @@
import pytest
from unittest.mock import ANY
from collections import defaultdict
from crewai.utilities.events import crewai_event_bus
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.utilities.events.memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
)
@pytest.fixture
def long_term_memory():
@@ -10,6 +17,103 @@ def long_term_memory():
return LongTermMemory()
def test_long_term_memory_save_events(long_term_memory):
events = defaultdict(list)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_save_started(source, event):
events["MemorySaveStartedEvent"].append(event)
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_save_completed(source, event):
events["MemorySaveCompletedEvent"].append(event)
memory = LongTermMemoryItem(
agent="test_agent",
task="test_task",
expected_output="test_output",
datetime="test_datetime",
quality=0.5,
metadata={"task": "test_task", "quality": 0.5},
)
long_term_memory.save(memory)
assert len(events["MemorySaveStartedEvent"]) == 1
assert len(events["MemorySaveCompletedEvent"]) == 1
assert len(events["MemorySaveFailedEvent"]) == 0
assert dict(events["MemorySaveStartedEvent"][0]) == {
"timestamp": ANY,
"type": "memory_save_started",
"source_fingerprint": None,
"source_type": "long_term_memory",
"fingerprint_metadata": None,
"value": "test_task",
"metadata": {"task": "test_task", "quality": 0.5},
"agent_role": "test_agent",
}
assert dict(events["MemorySaveCompletedEvent"][0]) == {
"timestamp": ANY,
"type": "memory_save_completed",
"source_fingerprint": None,
"source_type": "long_term_memory",
"fingerprint_metadata": None,
"value": "test_task",
"metadata": {"task": "test_task", "quality": 0.5, "agent": "test_agent", "expected_output": "test_output"},
"agent_role": "test_agent",
"save_time_ms": ANY,
}
def test_long_term_memory_search_events(long_term_memory):
events = defaultdict(list)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemoryQueryStartedEvent)
def on_search_started(source, event):
events["MemoryQueryStartedEvent"].append(event)
@crewai_event_bus.on(MemoryQueryCompletedEvent)
def on_search_completed(source, event):
events["MemoryQueryCompletedEvent"].append(event)
test_query = "test query"
long_term_memory.search(
test_query,
latest_n=5
)
assert len(events["MemoryQueryStartedEvent"]) == 1
assert len(events["MemoryQueryCompletedEvent"]) == 1
assert len(events["MemoryQueryFailedEvent"]) == 0
assert dict(events["MemoryQueryStartedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_started',
'source_fingerprint': None,
'source_type': 'long_term_memory',
'fingerprint_metadata': None,
'query': 'test query',
'limit': 5,
'score_threshold': None
}
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_completed',
'source_fingerprint': None,
'source_type': 'long_term_memory',
'fingerprint_metadata': None,
'query': 'test query',
'results': None,
'limit': 5,
'score_threshold': None,
'query_time_ms': ANY
}
def test_save_and_search(long_term_memory):
memory = LongTermMemoryItem(
agent="test_agent",

View File

@@ -1,5 +1,5 @@
from unittest.mock import patch
from unittest.mock import patch, ANY
from collections import defaultdict
import pytest
from crewai.agent import Agent
@@ -7,6 +7,13 @@ from crewai.crew import Crew
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.task import Task
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
)
@pytest.fixture
@@ -28,6 +35,98 @@ def short_term_memory():
return ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
def test_short_term_memory_search_events(short_term_memory):
events = defaultdict(list)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemoryQueryStartedEvent)
def on_search_started(source, event):
events["MemoryQueryStartedEvent"].append(event)
@crewai_event_bus.on(MemoryQueryCompletedEvent)
def on_search_completed(source, event):
events["MemoryQueryCompletedEvent"].append(event)
# Call the save method
short_term_memory.search(
query="test value",
limit=3,
score_threshold=0.35,
)
assert len(events["MemoryQueryStartedEvent"]) == 1
assert len(events["MemoryQueryCompletedEvent"]) == 1
assert len(events["MemoryQueryFailedEvent"]) == 0
assert dict(events["MemoryQueryStartedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_started',
'source_fingerprint': None,
'source_type': 'short_term_memory',
'fingerprint_metadata': None,
'query': 'test value',
'limit': 3,
'score_threshold': 0.35
}
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_completed',
'source_fingerprint': None,
'source_type': 'short_term_memory',
'fingerprint_metadata': None,
'query': 'test value',
'results': [],
'limit': 3,
'score_threshold': 0.35,
'query_time_ms': ANY
}
def test_short_term_memory_save_events(short_term_memory):
events = defaultdict(list)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_save_started(source, event):
events["MemorySaveStartedEvent"].append(event)
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_save_completed(source, event):
events["MemorySaveCompletedEvent"].append(event)
short_term_memory.save(
value="test value",
metadata={"task": "test_task"},
agent="test_agent",
)
assert len(events["MemorySaveStartedEvent"]) == 1
assert len(events["MemorySaveCompletedEvent"]) == 1
assert len(events["MemorySaveFailedEvent"]) == 0
assert dict(events["MemorySaveStartedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_save_started',
'source_fingerprint': None,
'source_type': 'short_term_memory',
'fingerprint_metadata': None,
'value': 'test value',
'metadata': {'task': 'test_task'},
'agent_role': "test_agent"
}
assert dict(events["MemorySaveCompletedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_save_completed',
'source_fingerprint': None,
'source_type': 'short_term_memory',
'fingerprint_metadata': None,
'value': 'test value',
'metadata': {'task': 'test_task', 'agent': 'test_agent'},
'agent_role': "test_agent",
'save_time_ms': ANY
}
def test_save_and_search(short_term_memory):
memory = ShortTermMemoryItem(
data="""test value test value test value test value test value test value