diff --git a/lib/crewai/src/crewai/memory/external/external_memory.py b/lib/crewai/src/crewai/memory/external/external_memory.py index c48ffd1e3..a92005c5e 100644 --- a/lib/crewai/src/crewai/memory/external/external_memory.py +++ b/lib/crewai/src/crewai/memory/external/external_memory.py @@ -1,17 +1,9 @@ from __future__ import annotations -import time -from typing import TYPE_CHECKING, Any +from collections.abc import Callable +from typing import TYPE_CHECKING, Any, cast -from crewai.events.event_bus import crewai_event_bus -from crewai.events.types.memory_events import ( - MemoryQueryCompletedEvent, - MemoryQueryFailedEvent, - MemoryQueryStartedEvent, - MemorySaveCompletedEvent, - MemorySaveFailedEvent, - MemorySaveStartedEvent, -) +from crewai.events.lifecycle_decorator import with_lifecycle_events from crewai.memory.external.external_memory_item import ExternalMemoryItem from crewai.memory.memory import Memory from crewai.memory.storage.interface import Storage @@ -19,29 +11,31 @@ from crewai.rag.embeddings.types import ProviderSpec if TYPE_CHECKING: - from crewai.memory.storage.mem0_storage import Mem0Storage + from crewai.crew import Crew class ExternalMemory(Memory): - def __init__(self, storage: Storage | None = None, **data: Any): + def __init__(self, storage: Storage | None = None, **data: Any) -> None: super().__init__(storage=storage, **data) @staticmethod - def _configure_mem0(crew: Any, config: dict[str, Any]) -> Mem0Storage: - from crewai.memory.storage.mem0_storage import Mem0Storage + def _configure_mem0(crew: Crew, config: dict[str, Any]) -> Storage: + from crewai.memory.storage.mem0_storage import Mem0Config, Mem0Storage - return Mem0Storage(type="external", crew=crew, config=config) + return Mem0Storage( + type="external", crew=crew, config=cast(Mem0Config, cast(object, config)) + ) @staticmethod - def external_supported_storages() -> dict[str, Any]: + def external_supported_storages() -> dict[ + str, Callable[[Crew, dict[str, Any]], Storage] + ]: return { "mem0": ExternalMemory._configure_mem0, } @staticmethod - def create_storage( - crew: Any, embedder_config: dict[str, Any] | ProviderSpec | None - ) -> Storage: + def create_storage(crew: Crew, embedder_config: ProviderSpec | None) -> Storage: if not embedder_config: raise ValueError("embedder_config is required") @@ -53,115 +47,59 @@ class ExternalMemory(Memory): if provider not in supported_storages: raise ValueError(f"Provider {provider} not supported") - return supported_storages[provider](crew, embedder_config.get("config", {})) + config = embedder_config.get("config", {}) + return supported_storages[provider](crew, cast(dict[str, Any], config)) + @with_lifecycle_events( + "memory_save", + args_map={"value": "value", "metadata": "metadata"}, + context={ + "source_type": "external_memory", + "from_agent": lambda self: self.agent, + "from_task": lambda self: self.task, + }, + elapsed_name="save_time_ms", + ) def save( self, value: Any, metadata: dict[str, Any] | None = None, ) -> None: """Saves a value into the external storage.""" - crewai_event_bus.emit( - self, - event=MemorySaveStartedEvent( - value=value, - metadata=metadata, - source_type="external_memory", - from_agent=self.agent, - from_task=self.task, - ), + item = ExternalMemoryItem( + value=value, + metadata=metadata, + agent=self.agent.role if self.agent else None, ) + super().save(value=item.value, metadata=item.metadata) - start_time = time.time() - try: - item = ExternalMemoryItem( - value=value, - metadata=metadata, - agent=self.agent.role if self.agent else None, - ) - super().save(value=item.value, metadata=item.metadata) - - crewai_event_bus.emit( - self, - event=MemorySaveCompletedEvent( - value=value, - metadata=metadata, - save_time_ms=(time.time() - start_time) * 1000, - source_type="external_memory", - from_agent=self.agent, - from_task=self.task, - ), - ) - except Exception as e: - crewai_event_bus.emit( - self, - event=MemorySaveFailedEvent( - value=value, - metadata=metadata, - error=str(e), - source_type="external_memory", - from_agent=self.agent, - from_task=self.task, - ), - ) - raise - + @with_lifecycle_events( + "memory_query", + args_map={ + "query": "query", + "limit": "limit", + "score_threshold": "score_threshold", + }, + context={ + "source_type": "external_memory", + "from_agent": lambda self: self.agent, + "from_task": lambda self: self.task, + }, + result_name="results", + elapsed_name="query_time_ms", + ) def search( self, query: str, limit: int = 5, score_threshold: float = 0.6, - ): - crewai_event_bus.emit( - self, - event=MemoryQueryStartedEvent( - query=query, - limit=limit, - score_threshold=score_threshold, - source_type="external_memory", - from_agent=self.agent, - from_task=self.task, - ), - ) - - start_time = time.time() - try: - results = super().search( - query=query, limit=limit, score_threshold=score_threshold - ) - - crewai_event_bus.emit( - self, - event=MemoryQueryCompletedEvent( - query=query, - results=results, - limit=limit, - score_threshold=score_threshold, - query_time_ms=(time.time() - start_time) * 1000, - source_type="external_memory", - from_agent=self.agent, - from_task=self.task, - ), - ) - - return results - except Exception as e: - crewai_event_bus.emit( - self, - event=MemoryQueryFailedEvent( - query=query, - limit=limit, - score_threshold=score_threshold, - error=str(e), - source_type="external_memory", - ), - ) - raise + ) -> Any: + return super().search(query=query, limit=limit, score_threshold=score_threshold) def reset(self) -> None: self.storage.reset() - def set_crew(self, crew: Any) -> ExternalMemory: + def set_crew(self, crew: Crew) -> ExternalMemory: super().set_crew(crew) if not self.storage: diff --git a/lib/crewai/src/crewai/memory/memory.py b/lib/crewai/src/crewai/memory/memory.py index 74297f9e4..fbc72760a 100644 --- a/lib/crewai/src/crewai/memory/memory.py +++ b/lib/crewai/src/crewai/memory/memory.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast from pydantic import BaseModel @@ -24,9 +24,6 @@ class Memory(BaseModel): _agent: Agent | None = None _task: Task | None = None - def __init__(self, storage: Any, **data: Any): - super().__init__(storage=storage, **data) - @property def task(self) -> Task | None: """Get the current task associated with this memory.""" @@ -62,8 +59,11 @@ class Memory(BaseModel): limit: int = 5, score_threshold: float = 0.6, ) -> list[Any]: - return self.storage.search( - query=query, limit=limit, score_threshold=score_threshold + return cast( + list[Any], + self.storage.search( + query=query, limit=limit, score_threshold=score_threshold + ), ) def set_crew(self, crew: Any) -> Memory: