feat: emit events about memory usage

This commit is contained in:
Lucas Gomide
2025-07-01 12:47:00 -03:00
parent 640e1a7bc2
commit 071bc4faea
12 changed files with 844 additions and 30 deletions

View File

@@ -1,5 +1,9 @@
import shutil
import subprocess
import os
import re
import sys
import time
import traceback
import uuid
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Type, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -32,6 +36,10 @@ from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
)
from crewai.utilities.events.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
@@ -302,6 +310,15 @@ class Agent(BaseAgent):
)
if self._is_any_available_memory():
crewai_event_bus.emit(
self,
event=MemoryRetrievalStartedEvent(
task_id=str(task.id) if task else None,
source_type="agent",
),
)
start_time = time.time()
contextual_memory = ContextualMemory(
self.crew.memory_config,
self.crew._short_term_memory,
@@ -313,6 +330,16 @@ class Agent(BaseAgent):
memory = contextual_memory.build_context_for_task(task, context)
if memory.strip() != "":
task_prompt += self.i18n.slice("memory").format(memory=memory)
crewai_event_bus.emit(
self,
event=MemoryRetrievalCompletedEvent(
task_id=str(task.id) if task else None,
memory_content=memory,
retrieval_time_ms=(time.time() - start_time) * 1000,
source_type="agent",
),
)
knowledge_config = (
self.knowledge_config.model_dump() if self.knowledge_config else {}
)

View File

@@ -1,10 +1,20 @@
from typing import Optional
import time
from pydantic import PrivateAttr
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
class EntityMemory(Memory):
@@ -48,6 +58,16 @@ class EntityMemory(Memory):
def save(self, item: EntityMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
"""Saves an entity item into the SQLite storage."""
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
metadata=item.metadata,
source_type="entity_memory",
),
)
start_time = time.time()
try:
if self._memory_provider == "mem0":
data = f"""
Remember details about the following entity:
@@ -57,8 +77,78 @@ class EntityMemory(Memory):
"""
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
# Emit memory save completed event
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=data,
metadata=item.metadata,
save_time_ms=(time.time() - start_time) * 1000,
source_type="entity_memory",
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=MemorySaveFailedEvent(
metadata=item.metadata,
error=str(e),
source_type="entity_memory",
),
)
raise
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
):
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
source_type="entity_memory",
),
)
start_time = time.time()
try:
results = super().search(
query=query, limit=limit, score_threshold=score_threshold
)
crewai_event_bus.emit(
self,
event=MemoryQueryCompletedEvent(
query=query,
results=results,
limit=limit,
score_threshold=score_threshold,
query_time_ms=(time.time() - start_time) * 1000,
source_type="entity_memory",
),
)
return results
except Exception as e:
crewai_event_bus.emit(
self,
event=MemoryQueryFailedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
error=str(e),
source_type="entity_memory",
),
)
raise
def reset(self) -> None:
try:
self.storage.reset()

View File

@@ -1,8 +1,18 @@
from typing import TYPE_CHECKING, Any, Dict, Optional
import time
from crewai.memory.external.external_memory_item import ExternalMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.storage.interface import Storage
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
if TYPE_CHECKING:
from crewai.memory.storage.mem0_storage import Mem0Storage
@@ -46,9 +56,92 @@ class ExternalMemory(Memory):
agent: Optional[str] = None,
) -> None:
"""Saves a value into the external storage."""
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
value=value,
metadata=metadata,
agent_role=agent,
source_type="external_memory",
),
)
start_time = time.time()
try:
item = ExternalMemoryItem(value=value, metadata=metadata, agent=agent)
super().save(value=item.value, metadata=item.metadata, agent=item.agent)
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=value,
metadata=metadata,
agent_role=agent,
save_time_ms=(time.time() - start_time) * 1000,
source_type="external_memory",
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=MemorySaveFailedEvent(
value=value,
metadata=metadata,
agent_role=agent,
error=str(e),
source_type="external_memory",
),
)
raise
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
):
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
source_type="external_memory",
),
)
start_time = time.time()
try:
results = super().search(
query=query, limit=limit, score_threshold=score_threshold
)
crewai_event_bus.emit(
self,
event=MemoryQueryCompletedEvent(
query=query,
results=results,
limit=limit,
score_threshold=score_threshold,
query_time_ms=(time.time() - start_time) * 1000,
source_type="external_memory",
),
)
return results
except Exception as e:
crewai_event_bus.emit(
self,
event=MemoryQueryFailedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
error=str(e),
source_type="external_memory",
),
)
raise
def reset(self) -> None:
self.storage.reset()

View File

@@ -1,7 +1,17 @@
from typing import Any, Dict, List
import time
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.memory import Memory
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage
@@ -20,6 +30,18 @@ class LongTermMemory(Memory):
super().__init__(storage=storage)
def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
value=item.task,
metadata=item.metadata,
agent_role=item.agent,
source_type="long_term_memory",
),
)
start_time = time.time()
try:
metadata = item.metadata
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
@@ -29,8 +51,66 @@ class LongTermMemory(Memory):
datetime=item.datetime,
)
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=item.task,
metadata=item.metadata,
agent_role=item.agent,
save_time_ms=(time.time() - start_time) * 1000,
source_type="long_term_memory",
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=MemorySaveFailedEvent(
value=item.task,
metadata=item.metadata,
agent_role=item.agent,
error=str(e),
source_type="long_term_memory",
),
)
raise
def search(self, task: str, latest_n: int = 3) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory"
return self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(
query=task,
limit=latest_n,
source_type="long_term_memory",
),
)
start_time = time.time()
try:
results = self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
crewai_event_bus.emit(
self,
event=MemoryQueryCompletedEvent(
query=task,
results=results,
limit=latest_n,
query_time_ms=(time.time() - start_time) * 1000,
source_type="long_term_memory",
),
)
return results
except Exception as e:
crewai_event_bus.emit(
self,
event=MemoryQueryFailedEvent(
query=task,
limit=latest_n,
error=str(e),
source_type="long_term_memory",
),
)
raise
def reset(self) -> None:
self.storage.reset()

View File

@@ -1,10 +1,20 @@
from typing import Any, Dict, Optional
import time
from pydantic import PrivateAttr
from crewai.memory.memory import Memory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
class ShortTermMemory(Memory):
@@ -52,22 +62,95 @@ class ShortTermMemory(Memory):
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
value=value,
metadata=metadata,
agent_role=agent,
source_type="short_term_memory",
),
)
start_time = time.time()
try:
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if self._memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=value,
metadata=metadata,
agent=agent,
save_time_ms=(time.time() - start_time) * 1000,
source_type="short_term_memory",
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=MemorySaveFailedEvent(
value=value,
metadata=metadata,
agent=agent,
error=str(e),
source_type="short_term_memory",
),
)
raise
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
):
return self.storage.search(
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
source_type="short_term_memory",
),
)
start_time = time.time()
try:
results = self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
) # type: ignore # BUG? The reference is to the parent class, but the parent class does not have this parameters
crewai_event_bus.emit(
self,
event=MemoryQueryCompletedEvent(
query=query,
results=results,
limit=limit,
score_threshold=score_threshold,
query_time_ms=(time.time() - start_time) * 1000,
source_type="short_term_memory",
),
)
return results
except Exception as e:
crewai_event_bus.emit(
self,
event=MemoryQueryFailedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
error=str(e),
source_type="short_term_memory",
),
)
raise
def reset(self) -> None:
try:
self.storage.reset()

View File

@@ -51,6 +51,17 @@ from .llm_events import (
LLMStreamChunkEvent,
)
from .memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
)
# events
from .event_listener import EventListener
from .third_party.agentops_listener import agentops_listener

View File

@@ -12,7 +12,7 @@ class BaseEvent(BaseModel):
timestamp: datetime = Field(default_factory=datetime.now)
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew"
source_type: Optional[str] = None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"
fingerprint_metadata: Optional[Dict[str, Any]] = None # Any relevant metadata
def to_json(self, exclude: set[str] | None = None):

View File

@@ -57,6 +57,17 @@ from .knowledge_events import (
KnowledgeSearchQueryFailedEvent,
)
from .memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
)
EventTypes = Union[
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
@@ -96,4 +107,12 @@ EventTypes = Union[
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeSearchQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
]

View File

@@ -0,0 +1,78 @@
from typing import Any, Dict, Optional
from crewai.utilities.events.base_events import BaseEvent
class MemoryQueryStartedEvent(BaseEvent):
"""Event emitted when a memory query is started"""
type: str = "memory_query_started"
query: str
limit: int
score_threshold: Optional[float] = None
class MemoryQueryCompletedEvent(BaseEvent):
"""Event emitted when a memory query is completed successfully"""
type: str = "memory_query_completed"
query: str
results: Any
limit: int
score_threshold: Optional[float] = None
query_time_ms: float
class MemoryQueryFailedEvent(BaseEvent):
"""Event emitted when a memory query fails"""
type: str = "memory_query_failed"
query: str
limit: int
score_threshold: Optional[float] = None
error: str
class MemorySaveStartedEvent(BaseEvent):
"""Event emitted when a memory save operation is started"""
type: str = "memory_save_started"
value: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
agent: Optional[str] = None
class MemorySaveCompletedEvent(BaseEvent):
"""Event emitted when a memory save operation is completed successfully"""
type: str = "memory_save_completed"
value: str
metadata: Optional[Dict[str, Any]] = None
agent: Optional[str] = None
save_time_ms: float
class MemorySaveFailedEvent(BaseEvent):
"""Event emitted when a memory save operation fails"""
type: str = "memory_save_failed"
value: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
agent: Optional[str] = None
error: str
class MemoryRetrievalStartedEvent(BaseEvent):
"""Event emitted when memory retrieval for a task prompt starts"""
type: str = "memory_retrieval_started"
task_id: Optional[str] = None
class MemoryRetrievalCompletedEvent(BaseEvent):
"""Event emitted when memory retrieval for a task prompt completes successfully"""
type: str = "memory_retrieval_completed"
task_id: Optional[str] = None
memory_content: str
retrieval_time_ms: float

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long