Compare commits

..

4 Commits

Author SHA1 Message Date
Devin AI
ddcc5bb2e9 Fix type-checker and lint issues in rag_storage.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 07:44:21 +00:00
Devin AI
d9c5ebe1cf Address PR comments: Improve code quality and add validation
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 07:39:27 +00:00
Devin AI
0427da467a Fix linting issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 07:32:52 +00:00
Devin AI
7a21564743 Fix #2271: Handle SQLite3 version check gracefully for ChromaDB
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 07:28:09 +00:00
18 changed files with 283 additions and 753 deletions

View File

@@ -1,151 +0,0 @@
# Custom Memory Storage
CrewAI supports custom memory storage implementations for different memory types. You can provide your own storage implementation by extending the `Storage` interface and passing it to the memory instances or through the `memory_config` parameter.
## Implementing a Custom Storage
To create a custom storage implementation, you need to extend the `Storage` interface and implement the required methods:
```python
from typing import Any, Dict, List
from crewai.memory.storage.interface import Storage
class CustomStorage(Storage):
"""Custom storage implementation."""
def __init__(self):
# Initialize your storage backend
self.data = []
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
"""Save a value with metadata to the storage."""
# Implement your save logic
self.data.append({"value": value, "metadata": metadata})
def search(
self, query: str, limit: int = 3, score_threshold: float = 0.35
) -> List[Any]:
"""Search for values in the storage."""
# Implement your search logic
return [{"context": item["value"], "metadata": item["metadata"]} for item in self.data]
def reset(self) -> None:
"""Reset the storage."""
# Implement your reset logic
self.data = []
```
## Using Custom Storage
There are two ways to provide custom storage implementations to CrewAI:
### 1. Pass Custom Storage to Memory Instances
You can create memory instances with custom storage and pass them to the Crew:
```python
from crewai import Crew, Agent
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.user.user_memory import UserMemory
# Create custom storage instances
short_term_storage = CustomStorage()
long_term_storage = CustomStorage()
entity_storage = CustomStorage()
user_storage = CustomStorage()
# Create memory instances with custom storage
short_term_memory = ShortTermMemory(storage=short_term_storage)
long_term_memory = LongTermMemory(storage=long_term_storage)
entity_memory = EntityMemory(storage=entity_storage)
user_memory = UserMemory(storage=user_storage)
# Create a crew with custom memory instances
crew = Crew(
agents=[Agent(role="researcher", goal="research", backstory="I am a researcher")],
memory=True,
short_term_memory=short_term_memory,
long_term_memory=long_term_memory,
entity_memory=entity_memory,
memory_config={"user_memory": user_memory},
)
```
### 2. Pass Custom Storage through Memory Config
You can also provide custom storage implementations through the `memory_config` parameter:
```python
from crewai import Crew, Agent
# Create a crew with custom storage in memory_config
crew = Crew(
agents=[Agent(role="researcher", goal="research", backstory="I am a researcher")],
memory=True,
memory_config={
"storage": {
"short_term": CustomStorage(),
"long_term": CustomStorage(),
"entity": CustomStorage(),
"user": CustomStorage(),
}
},
)
```
## Example: Redis Storage
Here's an example of a custom storage implementation using Redis:
```python
import json
import redis
from typing import Any, Dict, List
from crewai.memory.storage.interface import Storage
class RedisStorage(Storage):
"""Redis-based storage implementation."""
def __init__(self, redis_url="redis://localhost:6379/0", prefix="crewai"):
self.redis = redis.from_url(redis_url)
self.prefix = prefix
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
"""Save a value with metadata to Redis."""
key = f"{self.prefix}:{len(self.redis.keys(f'{self.prefix}:*'))}"
data = {"value": value, "metadata": metadata}
self.redis.set(key, json.dumps(data))
def search(
self, query: str, limit: int = 3, score_threshold: float = 0.35
) -> List[Any]:
"""Search for values in Redis."""
# This is a simple implementation that returns all values
# In a real implementation, you would use Redis search capabilities
results = []
for key in self.redis.keys(f"{self.prefix}:*"):
data = json.loads(self.redis.get(key))
results.append({"context": data["value"], "metadata": data["metadata"]})
if len(results) >= limit:
break
return results
def reset(self) -> None:
"""Reset the Redis storage."""
for key in self.redis.keys(f"{self.prefix}:*"):
self.redis.delete(key)
```
## Benefits of Custom Storage
Using custom storage implementations allows you to:
1. Store memory data in external databases or services
2. Implement custom search algorithms
3. Share memory between different crews or applications
4. Persist memory across application restarts
5. Implement custom memory retention policies
By extending the `Storage` interface, you can integrate CrewAI with any storage backend that suits your needs.

View File

@@ -262,19 +262,8 @@ class Crew(BaseModel):
def create_crew_memory(self) -> "Crew":
"""Set private attributes."""
if self.memory:
from crewai.memory.storage.rag_storage import RAGStorage
# Create default storage instances for each memory type if needed
long_term_storage = RAGStorage(type="long_term", crew=self, embedder_config=self.embedder)
short_term_storage = RAGStorage(type="short_term", crew=self, embedder_config=self.embedder)
entity_storage = RAGStorage(type="entity", crew=self, embedder_config=self.embedder)
self._long_term_memory = (
self.long_term_memory if self.long_term_memory else LongTermMemory(
crew=self,
embedder_config=self.embedder,
storage=long_term_storage
)
self.long_term_memory if self.long_term_memory else LongTermMemory()
)
self._short_term_memory = (
self.short_term_memory
@@ -282,17 +271,12 @@ class Crew(BaseModel):
else ShortTermMemory(
crew=self,
embedder_config=self.embedder,
storage=short_term_storage
)
)
self._entity_memory = (
self.entity_memory
if self.entity_memory
else EntityMemory(
crew=self,
embedder_config=self.embedder,
storage=entity_storage
)
else EntityMemory(crew=self, embedder_config=self.embedder)
)
if (
self.memory_config and "user_memory" in self.memory_config

View File

@@ -83,28 +83,42 @@ class KnowledgeStorage(BaseKnowledgeStorage):
raise Exception("Collection not initialized")
def initialize_knowledge_storage(self):
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
"""Initialize the knowledge storage with ChromaDB.
Handles SQLite3 version incompatibility gracefully by logging a warning
and continuing without ChromaDB functionality.
"""
try:
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
collection_name = (
f"knowledge_{self.collection_name}"
if self.collection_name
else "knowledge"
)
if self.app:
self.collection = self.app.get_or_create_collection(
name=collection_name, embedding_function=self.embedder
)
else:
if not self.app:
raise Exception("Vector Database Client not initialized")
except Exception:
raise Exception("Failed to create or get collection")
self.collection = self.app.get_or_create_collection(
name=collection_name, embedding_function=self.embedder
)
except RuntimeError as e:
if "unsupported version of sqlite3" in str(e).lower():
# Log a warning but continue without ChromaDB
logging.warning("ChromaDB requires SQLite3 >= 3.35.0. Current version is too old. Some features may be limited. Error: %s", e)
self.app = None
self.collection = None
else:
raise
except Exception as e:
raise Exception(f"Failed to create or get collection: {e}")
def reset(self):
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)

View File

@@ -47,7 +47,7 @@ class ContextualMemory:
stm_results = self.stm.search(query)
formatted_results = "\n".join(
[
f"- {result.get('memory', result.get('context', ''))}"
f"- {result['memory'] if self.memory_provider == 'mem0' else result['context']}"
for result in stm_results
]
)
@@ -58,7 +58,7 @@ class ContextualMemory:
Fetches historical data or insights from LTM that are relevant to the task's description and expected_output,
formatted as bullet points.
"""
ltm_results = self.ltm.search(query=task, limit=2)
ltm_results = self.ltm.search(task, latest_n=2)
if not ltm_results:
return None
@@ -80,9 +80,9 @@ class ContextualMemory:
em_results = self.em.search(query)
formatted_results = "\n".join(
[
f"- {result.get('memory', result.get('context', ''))}"
f"- {result['memory'] if self.memory_provider == 'mem0' else result['context']}"
for result in em_results
]
] # type: ignore # Invalid index type "str" for "str"; expected type "SupportsIndex | slice"
)
return f"Entities:\n{formatted_results}" if em_results else ""
@@ -99,6 +99,6 @@ class ContextualMemory:
return ""
formatted_memories = "\n".join(
f"- {result.get('memory', result.get('context', ''))}" for result in user_memories
f"- {result['memory']}" for result in user_memories
)
return f"User memories/preferences:\n{formatted_memories}"

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional
from typing import Optional
from pydantic import PrivateAttr
@@ -17,71 +17,47 @@ class EntityMemory(Memory):
_memory_provider: Optional[str] = PrivateAttr()
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
memory_provider = None
memory_config = None
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_config = crew.memory_config
memory_provider = memory_config.get("provider")
# If no storage is provided, try to create one
if storage is None:
memory_provider = crew.memory_config.get("provider")
else:
memory_provider = None
if memory_provider == "mem0":
try:
# Try to select storage using helper method
storage = self._select_storage(
storage=storage,
memory_config=memory_config,
storage_type="entity",
crew=crew,
path=path,
default_storage_factory=lambda path, crew: RAGStorage(
type="entities",
allow_reset=True,
crew=crew,
embedder_config=embedder_config,
path=path,
)
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
except ValueError:
# Fallback to default storage
storage = RAGStorage(
storage = Mem0Storage(type="entities", crew=crew)
else:
storage = (
storage
if storage
else RAGStorage(
type="entities",
allow_reset=True,
crew=crew,
embedder_config=embedder_config,
crew=crew,
path=path,
)
# Initialize with parameters
super().__init__(
storage=storage,
embedder_config=embedder_config,
memory_provider=memory_provider
)
)
def save(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""Saves an entity item or value into the storage."""
if isinstance(value, EntityMemoryItem):
item = value
if self.memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
super().__init__(storage=storage)
self._memory_provider = memory_provider
def save(self, item: EntityMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
"""Saves an entity item into the SQLite storage."""
if self._memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
else:
# Handle regular value and metadata
super().save(value, metadata, agent)
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
def reset(self) -> None:
try:

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.memory import Memory
@@ -14,77 +14,23 @@ class LongTermMemory(Memory):
LongTermMemoryItem instances.
"""
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
memory_provider = None
memory_config = None
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_config = crew.memory_config
memory_provider = memory_config.get("provider")
# Initialize with basic parameters
super().__init__(
storage=storage,
embedder_config=embedder_config,
memory_provider=memory_provider
def __init__(self, storage=None, path=None):
if not storage:
storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage()
super().__init__(storage=storage)
def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
metadata = item.metadata
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
task_description=item.task,
score=metadata["quality"],
metadata=metadata,
datetime=item.datetime,
)
try:
# Try to select storage using helper method
self.storage = self._select_storage(
storage=storage,
memory_config=memory_config,
storage_type="long_term",
crew=crew,
path=path,
default_storage_factory=lambda path, crew: LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage()
)
except ValueError:
# Fallback to default storage
self.storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage()
def save(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""Saves a value into the memory."""
if isinstance(value, LongTermMemoryItem):
item = value
item_metadata = item.metadata or {}
item_metadata.update({"agent": item.agent, "expected_output": item.expected_output})
# Handle special storage types like Mem0Storage
if hasattr(self.storage, "save") and callable(getattr(self.storage, "save")) and hasattr(self.storage.save, "__code__") and "task_description" in self.storage.save.__code__.co_varnames:
self.storage.save(
task_description=item.task,
score=item_metadata.get("quality", 0),
metadata=item_metadata,
datetime=item.datetime,
)
else:
# Use standard storage interface
self.storage.save(item.task, item_metadata)
else:
# Handle regular value and metadata
super().save(value, metadata, agent)
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
) -> List[Any]:
"""Search for values in the memory."""
# Try to use the standard storage interface first
if hasattr(self.storage, "search") and callable(getattr(self.storage, "search")):
return self.storage.search(query=query, limit=limit, score_threshold=score_threshold)
# Fall back to load method for backward compatibility
elif hasattr(self.storage, "load") and callable(getattr(self.storage, "load")):
return self.storage.load(query, limit)
else:
raise AttributeError("Storage does not implement search or load method")
def search(self, task: str, latest_n: int = 3) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory"
return self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
def reset(self) -> None:
self.storage.reset()

View File

@@ -1,62 +1,20 @@
from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, cast
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel
from crewai.memory.storage.interface import SearchResult, Storage
T = TypeVar('T', bound=Storage)
class Memory(BaseModel, Generic[T]):
class Memory(BaseModel):
"""
Base class for memory, now supporting agent tags and generic metadata.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
embedder_config: Optional[Dict[str, Any]] = None
storage: T
memory_provider: Optional[str] = Field(default=None, exclude=True)
def __init__(self, storage: T, **data: Any):
storage: Any
def __init__(self, storage: Any, **data: Any):
super().__init__(storage=storage, **data)
def _select_storage(
self,
storage: Optional[T] = None,
memory_config: Optional[Dict[str, Any]] = None,
storage_type: str = "",
crew=None,
path: Optional[str] = None,
default_storage_factory: Optional[Callable] = None,
) -> T:
"""Helper method to select the appropriate storage based on configuration"""
# Use the provided storage if available
if storage:
return storage
# Use storage from memory_config if available
if memory_config and "storage" in memory_config:
storage_config = memory_config.get("storage", {})
if storage_type in storage_config and storage_config[storage_type]:
return cast(T, storage_config[storage_type])
# Use Mem0Storage if specified in memory_config
if memory_config and memory_config.get("provider") == "mem0":
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
return cast(T, Mem0Storage(type=storage_type, crew=crew))
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
# Use default storage if provided
if default_storage_factory:
return cast(T, default_storage_factory(path=path, crew=crew))
# Fallback to empty storage
raise ValueError(f"No storage available for {storage_type}")
def save(
self,
value: Any,
@@ -67,19 +25,14 @@ class Memory(BaseModel, Generic[T]):
if agent:
metadata["agent"] = agent
if self.storage:
self.storage.save(value, metadata)
else:
raise ValueError("Storage is not initialized")
self.storage.save(value, metadata)
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
) -> List[SearchResult]:
if not self.storage:
raise ValueError("Storage is not initialized")
) -> List[Any]:
return self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
)

View File

@@ -19,43 +19,32 @@ class ShortTermMemory(Memory):
_memory_provider: Optional[str] = PrivateAttr()
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
memory_provider = None
memory_config = None
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_config = crew.memory_config
memory_provider = memory_config.get("provider")
# Initialize with basic parameters
super().__init__(
storage=storage,
embedder_config=embedder_config,
memory_provider=memory_provider
)
try:
# Try to select storage using helper method
self.storage = self._select_storage(
storage=storage,
memory_config=memory_config,
storage_type="short_term",
crew=crew,
path=path,
default_storage_factory=lambda path, crew: RAGStorage(
memory_provider = crew.memory_config.get("provider")
else:
memory_provider = None
if memory_provider == "mem0":
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
storage = Mem0Storage(type="short_term", crew=crew)
else:
storage = (
storage
if storage
else RAGStorage(
type="short_term",
crew=crew,
embedder_config=embedder_config,
crew=crew,
path=path,
)
)
except ValueError:
# Fallback to default storage
self.storage = RAGStorage(
type="short_term",
crew=crew,
embedder_config=embedder_config,
path=path,
)
super().__init__(storage=storage)
self._memory_provider = memory_provider
def save(
self,
@@ -64,7 +53,7 @@ class ShortTermMemory(Memory):
agent: Optional[str] = None,
) -> None:
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if self.memory_provider == "mem0":
if self._memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
super().save(value=item.data, metadata=item.metadata, agent=item.agent)

View File

@@ -1,10 +1,8 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from crewai.memory.storage.interface import SearchResult, Storage
class BaseRAGStorage(Storage[Any], ABC):
class BaseRAGStorage(ABC):
"""
Base class for RAG-based Storage implementations.
"""
@@ -46,8 +44,9 @@ class BaseRAGStorage(Storage[Any], ABC):
self,
query: str,
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[SearchResult]:
) -> List[Any]:
"""Search for entries in the storage."""
pass

View File

@@ -1,39 +1,16 @@
from abc import ABC, abstractmethod
from typing import Any, ClassVar, Dict, Generic, List, Protocol, TypeVar, TypedDict, runtime_checkable
from typing import Any, Dict, List
from pydantic import BaseModel, ConfigDict
class SearchResult(TypedDict, total=False):
"""Type definition for search results"""
context: str
metadata: Dict[str, Any]
score: float
memory: str # For Mem0Storage compatibility
T = TypeVar('T')
@runtime_checkable
class StorageProtocol(Protocol):
"""Protocol defining the storage interface"""
def save(self, value: Any, metadata: Dict[str, Any]) -> None: ...
def search(self, query: str, limit: int, score_threshold: float) -> List[Any]: ...
def reset(self) -> None: ...
class Storage(ABC, Generic[T]):
class Storage:
"""Abstract base class defining the storage interface"""
model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True)
@abstractmethod
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
pass
@abstractmethod
def search(
self, query: str, limit: int, score_threshold: float
) -> List[SearchResult]:
pass
) -> Dict[str, Any] | List[Any]:
return {}
@abstractmethod
def reset(self) -> None:
pass

View File

@@ -111,9 +111,3 @@ class Mem0Storage(Storage):
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = "_".join(agents)
return agents
def reset(self) -> None:
"""Reset the storage by clearing all memories."""
# Mem0 doesn't have a direct reset method, but we can implement
# this in the future if needed. For now, we'll just pass.
pass

View File

@@ -4,16 +4,19 @@ import logging
import os
import shutil
import uuid
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union, Collection as TypeCollection
from chromadb.api import ClientAPI
from chromadb.api.models.Collection import Collection as ChromaCollection
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.memory.storage.interface import SearchResult
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path
# Constants
SQLITE_VERSION_ERROR = "ChromaDB requires SQLite3 >= 3.35.0. Current version is too old. Some features may be limited. Error: {}"
@contextlib.contextmanager
def suppress_logging(
@@ -38,7 +41,7 @@ class RAGStorage(BaseRAGStorage):
search efficiency.
"""
app: Optional[ClientAPI] = None
app: ClientAPI | None = None
def __init__(
self, type, allow_reset=True, embedder_config=None, crew=None, path=None
@@ -61,26 +64,41 @@ class RAGStorage(BaseRAGStorage):
self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self):
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
if self.embedder_config is None:
# ChromaDB is not available, skip initialization
self.app = None
self.collection = None
return
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
)
except RuntimeError as e:
if "unsupported version of sqlite3" in str(e).lower():
# Log a warning but continue without ChromaDB
logging.warning(SQLITE_VERSION_ERROR.format(e))
self.app = None
self.collection = None
else:
raise
def _sanitize_role(self, role: str) -> str:
"""
Sanitizes agent roles to ensure valid directory names.
@@ -113,8 +131,9 @@ class RAGStorage(BaseRAGStorage):
self,
query: str,
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[SearchResult]:
) -> List[Any]:
if not hasattr(self, "app"):
self._initialize_app()
@@ -124,7 +143,8 @@ class RAGStorage(BaseRAGStorage):
results = []
for i in range(len(response["ids"][0])):
result: SearchResult = {
result = {
"id": response["ids"][0][i],
"metadata": response["metadatas"][0][i],
"context": response["documents"][0][i],
"score": response["distances"][0][i],
@@ -137,7 +157,7 @@ class RAGStorage(BaseRAGStorage):
logging.error(f"Error during {self.type} search: {str(e)}")
return []
def _generate_embedding(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> Any:
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> None: # type: ignore
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()

View File

@@ -11,46 +11,15 @@ class UserMemory(Memory):
MemoryItem instances.
"""
def __init__(self, crew=None, embedder_config=None, storage=None, path=None, **kwargs):
memory_provider = None
memory_config = None
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_config = crew.memory_config
memory_provider = memory_config.get("provider")
# Initialize with basic parameters
super().__init__(
storage=storage,
embedder_config=embedder_config,
memory_provider=memory_provider
)
def __init__(self, crew=None):
try:
# Try to select storage using helper method
from crewai.memory.storage.rag_storage import RAGStorage
self.storage = self._select_storage(
storage=storage,
memory_config=memory_config,
storage_type="user",
crew=crew,
path=path,
default_storage_factory=lambda path, crew: RAGStorage(
type="user",
crew=crew,
embedder_config=embedder_config,
path=path,
)
)
except ValueError:
# Fallback to default storage
from crewai.memory.storage.rag_storage import RAGStorage
self.storage = RAGStorage(
type="user",
crew=crew,
embedder_config=embedder_config,
path=path,
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
storage = Mem0Storage(type="user", crew=crew)
super().__init__(storage)
def save(
self,
@@ -74,9 +43,3 @@ class UserMemory(Memory):
score_threshold=score_threshold,
)
return results
def reset(self) -> None:
try:
self.storage.reset()
except Exception as e:
raise Exception(f"An error occurred while resetting the user memory: {e}")

View File

@@ -1,12 +1,31 @@
import logging
import os
from typing import Any, Dict, Optional, cast
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
# Import chromadb conditionally to handle SQLite3 version errors
try:
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
CHROMADB_AVAILABLE = True
except RuntimeError as e:
if "unsupported version of sqlite3" in str(e).lower():
logging.warning(f"ChromaDB requires SQLite3 >= 3.35.0. Current version is too old. Some features may be limited. Error: {e}")
CHROMADB_AVAILABLE = False
# Define placeholder types for type hints
Documents = Any
EmbeddingFunction = Any
Embeddings = Any
validate_embedding_function = lambda x: x # noqa: E731
else:
raise
class EmbeddingConfigurator:
def __init__(self):
if not CHROMADB_AVAILABLE:
self.embedding_functions = {}
return
self.embedding_functions = {
"openai": self._configure_openai,
"azure": self._configure_azure,
@@ -21,13 +40,45 @@ class EmbeddingConfigurator:
"custom": self._configure_custom,
}
def _validate_config(self, config: Dict[str, Any]) -> bool:
"""Validates that the configuration contains the required keys.
Args:
config: The configuration dictionary to validate
Returns:
bool: True if the configuration is valid, False otherwise
"""
if not config:
return False
required_keys = {'provider'}
return all(key in config for key in required_keys)
def configure_embedder(
self,
embedder_config: Optional[Dict[str, Any]] = None,
) -> EmbeddingFunction:
"""Configures and returns an embedding function based on the provided config."""
) -> Optional[EmbeddingFunction]:
"""Configures and returns an embedding function based on the provided config.
Args:
embedder_config: Configuration dictionary for the embedder
Returns:
Optional[EmbeddingFunction]: The configured embedding function or None if ChromaDB is not available
Raises:
ValueError: If the configuration is invalid
Exception: If the provider is not supported
"""
if not CHROMADB_AVAILABLE:
return None
if embedder_config is None:
return self._create_default_embedding_function()
if not self._validate_config(embedder_config):
raise ValueError("Invalid embedder configuration: missing required keys")
provider = embedder_config.get("provider")
config = embedder_config.get("config", {})
@@ -47,6 +98,9 @@ class EmbeddingConfigurator:
@staticmethod
def _create_default_embedding_function():
if not CHROMADB_AVAILABLE:
return None
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)

View File

@@ -7,31 +7,7 @@ from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
@pytest.fixture
def long_term_memory():
"""Fixture to create a LongTermMemory instance"""
# Create a mock storage for testing
from crewai.memory.storage.interface import Storage
class MockStorage(Storage):
def __init__(self):
self.data = []
def save(self, value, metadata):
self.data.append({"value": value, "metadata": metadata})
def search(self, query, limit=3, score_threshold=0.35):
return [
{
"context": item["value"],
"metadata": item["metadata"],
"score": 0.5,
"datetime": item["metadata"].get("datetime", "test_datetime")
}
for item in self.data
]
def reset(self):
self.data = []
return LongTermMemory(storage=MockStorage())
return LongTermMemory()
def test_save_and_search(long_term_memory):
@@ -44,7 +20,7 @@ def test_save_and_search(long_term_memory):
metadata={"task": "test_task", "quality": 0.5},
)
long_term_memory.save(memory)
find = long_term_memory.search(query="test_task", limit=5)[0]
find = long_term_memory.search("test_task", latest_n=5)[0]
assert find["score"] == 0.5
assert find["datetime"] == "test_datetime"
assert find["metadata"]["agent"] == "test_agent"

View File

@@ -12,8 +12,6 @@ from crewai.task import Task
@pytest.fixture
def short_term_memory():
"""Fixture to create a ShortTermMemory instance"""
from crewai.memory.storage.rag_storage import RAGStorage
agent = Agent(
role="Researcher",
goal="Search relevant data and provide results",
@@ -27,10 +25,7 @@ def short_term_memory():
expected_output="A list of relevant URLs based on the search query.",
agent=agent,
)
storage = RAGStorage(type="short_term")
crew = Crew(agents=[agent], tasks=[task])
return ShortTermMemory(storage=storage, crew=crew)
return ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
def test_save_and_search(short_term_memory):

View File

@@ -1,211 +0,0 @@
from typing import Any, Dict, List
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.storage.interface import SearchResult, Storage
from crewai.memory.user.user_memory import UserMemory
class CustomStorage(Storage[Any]):
"""Custom storage implementation for testing."""
def __init__(self):
self.data = []
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
self.data.append({"value": value, "metadata": metadata})
def search(
self, query: str, limit: int = 3, score_threshold: float = 0.35
) -> List[SearchResult]:
return [{"context": item["value"], "metadata": item["metadata"], "score": 0.9} for item in self.data]
def reset(self) -> None:
self.data = []
def test_custom_storage_with_short_term_memory():
"""Test that custom storage works with short term memory."""
custom_storage = CustomStorage()
memory = ShortTermMemory(storage=custom_storage)
memory.save("test value", {"key": "value"})
results = memory.search("test")
assert len(results) > 0
assert results[0]["context"] == "test value"
assert results[0]["metadata"]["key"] == "value"
def test_custom_storage_with_long_term_memory():
"""Test that custom storage works with long term memory."""
custom_storage = CustomStorage()
memory = LongTermMemory(storage=custom_storage)
memory.save("test value", {"key": "value"})
results = memory.search("test")
assert len(results) > 0
assert results[0]["context"] == "test value"
assert results[0]["metadata"]["key"] == "value"
def test_custom_storage_with_entity_memory():
"""Test that custom storage works with entity memory."""
custom_storage = CustomStorage()
memory = EntityMemory(storage=custom_storage)
memory.save("test value", {"key": "value"})
results = memory.search("test")
assert len(results) > 0
assert results[0]["context"] == "test value"
assert results[0]["metadata"]["key"] == "value"
def test_custom_storage_with_user_memory():
"""Test that custom storage works with user memory."""
custom_storage = CustomStorage()
memory = UserMemory(storage=custom_storage)
memory.save("test value", {"key": "value"})
results = memory.search("test")
assert len(results) > 0
# UserMemory prepends "Remember the details about the user: " to the value
assert "test value" in results[0]["context"]
assert results[0]["metadata"]["key"] == "value"
def test_custom_storage_with_crew():
"""Test that custom storage works with crew."""
short_term_storage = CustomStorage()
long_term_storage = CustomStorage()
entity_storage = CustomStorage()
user_storage = CustomStorage()
# Create memory instances with custom storage
short_term_memory = ShortTermMemory(storage=short_term_storage)
long_term_memory = LongTermMemory(storage=long_term_storage)
entity_memory = EntityMemory(storage=entity_storage)
user_memory = UserMemory(storage=user_storage)
# Create a crew with custom memory instances
crew = Crew(
agents=[Agent(role="test", goal="test", backstory="test")],
memory=True,
short_term_memory=short_term_memory,
long_term_memory=long_term_memory,
entity_memory=entity_memory,
memory_config={"user_memory": user_memory},
)
# Test that the crew has the custom memory instances
assert crew._short_term_memory.storage == short_term_storage
assert crew._long_term_memory.storage == long_term_storage
assert crew._entity_memory.storage == entity_storage
assert crew._user_memory.storage == user_storage
def test_custom_storage_with_memory_config():
"""Test that custom storage works with memory_config."""
short_term_storage = CustomStorage()
long_term_memory = LongTermMemory(storage=CustomStorage())
entity_memory = EntityMemory(storage=CustomStorage())
user_memory = UserMemory(storage=CustomStorage())
# Create a crew with custom storage in memory_config
crew = Crew(
agents=[Agent(role="test", goal="test", backstory="test")],
memory=True,
short_term_memory=ShortTermMemory(storage=short_term_storage),
long_term_memory=long_term_memory,
entity_memory=entity_memory,
memory_config={
"user_memory": user_memory
},
)
# Test that the crew has the custom storage instances
assert crew._short_term_memory.storage == short_term_storage
assert crew._long_term_memory == long_term_memory
assert crew._entity_memory == entity_memory
assert crew._user_memory == user_memory
def test_custom_storage_error_handling():
"""Test error handling with custom storage."""
# Test exception propagation
class ErrorStorage(Storage[Any]):
"""Storage implementation that raises exceptions."""
def __init__(self):
self.data = []
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
raise ValueError("Save error")
def search(
self, query: str, limit: int = 3, score_threshold: float = 0.35
) -> List[SearchResult]:
raise ValueError("Search error")
def reset(self) -> None:
raise ValueError("Reset error")
storage = ErrorStorage()
memory = ShortTermMemory(storage=storage)
with pytest.raises(ValueError, match="Save error"):
memory.save("test", {})
with pytest.raises(ValueError, match="Search error"):
memory.search("test")
with pytest.raises(Exception, match="An error occurred while resetting the short-term memory: Reset error"):
memory.reset()
def test_custom_storage_edge_cases():
"""Test edge cases with custom storage."""
class EdgeCaseStorage(Storage[Any]):
"""Storage implementation for testing edge cases."""
def __init__(self):
self.data = []
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
self.data.append({"value": value, "metadata": metadata})
def search(
self, query: str, limit: int = 3, score_threshold: float = 0.35
) -> List[SearchResult]:
return [{"context": item["value"], "metadata": item["metadata"], "score": 0.5} for item in self.data]
def reset(self) -> None:
self.data = []
storage = EdgeCaseStorage()
memory = ShortTermMemory(storage=storage)
# Test empty query
memory.save("test value", {"key": "value"})
results = memory.search("")
assert len(results) > 0
# Test very large metadata
large_metadata = {"key" + str(i): "value" * 100 for i in range(100)}
memory.save("test value", large_metadata)
results = memory.search("test")
assert len(results) > 0
assert results[1]["metadata"] == large_metadata
# Test unicode and special characters
unicode_value = "测试值 with special chars: !@#$%^&*()"
memory.save(unicode_value, {"key": "value"})
results = memory.search("测试")
assert len(results) > 0
assert unicode_value in results[2]["context"]

View File

@@ -0,0 +1,52 @@
import unittest
from unittest.mock import MagicMock, patch
class TestEmbeddingConfigurator(unittest.TestCase):
@patch('crewai.utilities.embedding_configurator.CHROMADB_AVAILABLE', False)
def test_embedding_configurator_with_chromadb_unavailable(self):
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
# Create an instance of EmbeddingConfigurator
configurator = EmbeddingConfigurator()
# Verify that embedding_functions is empty
self.assertEqual(configurator.embedding_functions, {})
# Verify that configure_embedder returns None
self.assertIsNone(configurator.configure_embedder())
@patch('crewai.utilities.embedding_configurator.CHROMADB_AVAILABLE', True)
def test_embedding_configurator_with_chromadb_available(self):
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
# Create an instance of EmbeddingConfigurator
configurator = EmbeddingConfigurator()
# Verify that embedding_functions is not empty
self.assertNotEqual(configurator.embedding_functions, {})
# Mock the _create_default_embedding_function method
configurator._create_default_embedding_function = MagicMock(return_value="mock_embedding_function")
# Verify that configure_embedder returns the mock embedding function
self.assertEqual(configurator.configure_embedder(), "mock_embedding_function")
@patch('crewai.utilities.embedding_configurator.CHROMADB_AVAILABLE', True)
def test_embedding_configurator_with_invalid_config(self):
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
# Create an instance of EmbeddingConfigurator
configurator = EmbeddingConfigurator()
# Test with empty config
with self.assertRaises(ValueError):
configurator.configure_embedder({})
# Test with missing required keys
with self.assertRaises(ValueError):
configurator.configure_embedder({"config": {}})
# Test with unsupported provider
with self.assertRaises(Exception):
configurator.configure_embedder({"provider": "unsupported_provider", "config": {}})