feat: use emit decorator on external, improve typing

This commit is contained in:
Greyson Lalonde
2025-10-22 13:48:37 -04:00
parent 8d8772d607
commit 68c9990eef
2 changed files with 55 additions and 117 deletions

View File

@@ -1,17 +1,9 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Any
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, cast
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryQueryStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemorySaveStartedEvent,
)
from crewai.events.lifecycle_decorator import with_lifecycle_events
from crewai.memory.external.external_memory_item import ExternalMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.storage.interface import Storage
@@ -19,29 +11,31 @@ from crewai.rag.embeddings.types import ProviderSpec
if TYPE_CHECKING:
from crewai.memory.storage.mem0_storage import Mem0Storage
from crewai.crew import Crew
class ExternalMemory(Memory):
def __init__(self, storage: Storage | None = None, **data: Any):
def __init__(self, storage: Storage | None = None, **data: Any) -> None:
super().__init__(storage=storage, **data)
@staticmethod
def _configure_mem0(crew: Any, config: dict[str, Any]) -> Mem0Storage:
from crewai.memory.storage.mem0_storage import Mem0Storage
def _configure_mem0(crew: Crew, config: dict[str, Any]) -> Storage:
from crewai.memory.storage.mem0_storage import Mem0Config, Mem0Storage
return Mem0Storage(type="external", crew=crew, config=config)
return Mem0Storage(
type="external", crew=crew, config=cast(Mem0Config, cast(object, config))
)
@staticmethod
def external_supported_storages() -> dict[str, Any]:
def external_supported_storages() -> dict[
str, Callable[[Crew, dict[str, Any]], Storage]
]:
return {
"mem0": ExternalMemory._configure_mem0,
}
@staticmethod
def create_storage(
crew: Any, embedder_config: dict[str, Any] | ProviderSpec | None
) -> Storage:
def create_storage(crew: Crew, embedder_config: ProviderSpec | None) -> Storage:
if not embedder_config:
raise ValueError("embedder_config is required")
@@ -53,115 +47,59 @@ class ExternalMemory(Memory):
if provider not in supported_storages:
raise ValueError(f"Provider {provider} not supported")
return supported_storages[provider](crew, embedder_config.get("config", {}))
config = embedder_config.get("config", {})
return supported_storages[provider](crew, cast(dict[str, Any], config))
@with_lifecycle_events(
"memory_save",
args_map={"value": "value", "metadata": "metadata"},
context={
"source_type": "external_memory",
"from_agent": lambda self: self.agent,
"from_task": lambda self: self.task,
},
elapsed_name="save_time_ms",
)
def save(
self,
value: Any,
metadata: dict[str, Any] | None = None,
) -> None:
"""Saves a value into the external storage."""
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
value=value,
metadata=metadata,
source_type="external_memory",
from_agent=self.agent,
from_task=self.task,
),
item = ExternalMemoryItem(
value=value,
metadata=metadata,
agent=self.agent.role if self.agent else None,
)
super().save(value=item.value, metadata=item.metadata)
start_time = time.time()
try:
item = ExternalMemoryItem(
value=value,
metadata=metadata,
agent=self.agent.role if self.agent else None,
)
super().save(value=item.value, metadata=item.metadata)
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=value,
metadata=metadata,
save_time_ms=(time.time() - start_time) * 1000,
source_type="external_memory",
from_agent=self.agent,
from_task=self.task,
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=MemorySaveFailedEvent(
value=value,
metadata=metadata,
error=str(e),
source_type="external_memory",
from_agent=self.agent,
from_task=self.task,
),
)
raise
@with_lifecycle_events(
"memory_query",
args_map={
"query": "query",
"limit": "limit",
"score_threshold": "score_threshold",
},
context={
"source_type": "external_memory",
"from_agent": lambda self: self.agent,
"from_task": lambda self: self.task,
},
result_name="results",
elapsed_name="query_time_ms",
)
def search(
self,
query: str,
limit: int = 5,
score_threshold: float = 0.6,
):
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
source_type="external_memory",
from_agent=self.agent,
from_task=self.task,
),
)
start_time = time.time()
try:
results = super().search(
query=query, limit=limit, score_threshold=score_threshold
)
crewai_event_bus.emit(
self,
event=MemoryQueryCompletedEvent(
query=query,
results=results,
limit=limit,
score_threshold=score_threshold,
query_time_ms=(time.time() - start_time) * 1000,
source_type="external_memory",
from_agent=self.agent,
from_task=self.task,
),
)
return results
except Exception as e:
crewai_event_bus.emit(
self,
event=MemoryQueryFailedEvent(
query=query,
limit=limit,
score_threshold=score_threshold,
error=str(e),
source_type="external_memory",
),
)
raise
) -> Any:
return super().search(query=query, limit=limit, score_threshold=score_threshold)
def reset(self) -> None:
self.storage.reset()
def set_crew(self, crew: Any) -> ExternalMemory:
def set_crew(self, crew: Crew) -> ExternalMemory:
super().set_crew(crew)
if not self.storage:

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast
from pydantic import BaseModel
@@ -24,9 +24,6 @@ class Memory(BaseModel):
_agent: Agent | None = None
_task: Task | None = None
def __init__(self, storage: Any, **data: Any):
super().__init__(storage=storage, **data)
@property
def task(self) -> Task | None:
"""Get the current task associated with this memory."""
@@ -62,8 +59,11 @@ class Memory(BaseModel):
limit: int = 5,
score_threshold: float = 0.6,
) -> list[Any]:
return self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
return cast(
list[Any],
self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
),
)
def set_crew(self, crew: Any) -> Memory: