Compare commits

...

2 Commits

4 changed files with 198 additions and 2 deletions

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import PrivateAttr
@@ -52,10 +53,34 @@ class ShortTermMemory(Memory):
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""
Save a memory item to the storage.
Args:
value: The data to save.
metadata: Optional metadata to associate with the memory.
agent: Optional agent identifier.
Raises:
ValueError: If the item's timestamp is in the future.
"""
import logging
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if item.timestamp > datetime.now():
raise ValueError("Cannot save memory item with future timestamp")
logging.debug(f"Saving memory item with timestamp: {item.timestamp}")
if self._memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
# Include timestamp in metadata
if item.metadata is None:
item.metadata = {}
item.metadata["timestamp"] = item.timestamp.isoformat()
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
def search(

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Any, Dict, Optional
@@ -7,7 +8,11 @@ class ShortTermMemoryItem:
data: Any,
agent: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
timestamp: Optional[datetime] = None,
):
if timestamp is not None and timestamp > datetime.now():
raise ValueError("Timestamp cannot be in the future")
self.data = data
self.agent = agent
self.metadata = metadata if metadata is not None else {}
self.timestamp = timestamp if timestamp is not None else datetime.now()

View File

@@ -114,13 +114,32 @@ class RAGStorage(BaseRAGStorage):
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
recency_weight: float = 0.3,
time_decay_days: float = 1.0,
) -> List[Any]:
"""
Search for entries in the storage based on semantic similarity and recency.
Args:
query: The search query string.
limit: Maximum number of results to return.
filter: Optional filter to apply to the search.
score_threshold: Minimum score threshold for results.
recency_weight: Weight given to recency vs. semantic similarity (0.0-1.0).
Higher values prioritize recent memories more strongly.
time_decay_days: Number of days over which recency factor decays to zero.
Smaller values make older memories lose relevance faster.
Returns:
List of search results, each containing id, metadata, context, and score.
Results are sorted by combined semantic similarity and recency score.
"""
if not hasattr(self, "app"):
self._initialize_app()
try:
with suppress_logging():
response = self.collection.query(query_texts=query, n_results=limit)
response = self.collection.query(query_texts=query, n_results=limit * 2) # Get more results to allow for recency filtering
results = []
for i in range(len(response["ids"][0])):
@@ -130,10 +149,27 @@ class RAGStorage(BaseRAGStorage):
"context": response["documents"][0][i],
"score": response["distances"][0][i],
}
# Apply recency boost if timestamp exists in metadata
if "timestamp" in result["metadata"]:
try:
from datetime import datetime
timestamp = datetime.fromisoformat(result["metadata"]["timestamp"])
now = datetime.now()
# Calculate recency factor (newer = higher score)
time_diff_seconds = (now - timestamp).total_seconds()
recency_factor = max(0, 1 - (time_diff_seconds / (time_decay_days * 24 * 60 * 60)))
# Adjust score with recency factor
result["score"] = result["score"] * (1 - recency_weight) + recency_factor * recency_weight
except (ValueError, TypeError):
pass # If timestamp parsing fails, use original score
if result["score"] >= score_threshold:
results.append(result)
return results
# Sort by adjusted score (higher is better)
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit] # Return only the requested number of results
except Exception as e:
logging.error(f"Error during {self.type} search: {str(e)}")
return []

View File

@@ -0,0 +1,130 @@
import time
from datetime import datetime, timedelta
from unittest.mock import patch
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.task import Task
@pytest.fixture
def short_term_memory():
"""Fixture to create a ShortTermMemory instance"""
agent = Agent(
role="Tutor",
goal="Teach programming concepts",
backstory="You are a programming tutor helping students learn.",
tools=[],
verbose=True,
)
task = Task(
description="Explain programming concepts to students.",
expected_output="Clear explanations of programming concepts.",
agent=agent,
)
return ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
def test_memory_prioritizes_recent_topic(short_term_memory):
"""Test that memory retrieval prioritizes the most recent topic in a conversation."""
# First topic: Python variables
topic1_data = "Variables in Python are dynamically typed. You can assign any value to a variable without declaring its type."
topic1_timestamp = datetime.now() - timedelta(minutes=10) # Older memory
# Second topic: Python abstract classes
topic2_data = "Abstract classes in Python are created using the ABC module. They cannot be instantiated and are used as a blueprint for other classes."
topic2_timestamp = datetime.now() # More recent memory
# Mock search results to simulate what would be returned by RAGStorage
mock_results = [
{
"id": "2",
"metadata": {
"agent": "Tutor",
"topic": "python_abstract_classes",
"timestamp": topic2_timestamp.isoformat()
},
"context": topic2_data,
"score": 0.85, # Higher score due to recency boost
},
{
"id": "1",
"metadata": {
"agent": "Tutor",
"topic": "python_variables",
"timestamp": topic1_timestamp.isoformat()
},
"context": topic1_data,
"score": 0.75, # Lower score due to being older
}
]
# Mock the search method to return our predefined results
with patch.object(RAGStorage, 'search', return_value=mock_results):
# Query that could match both topics but should prioritize the more recent one
query = "Can you give me another example of that?"
# Search with recency consideration
results = short_term_memory.search(query)
# Verify that the most recent topic (abstract classes) is prioritized
assert len(results) > 0, "No search results returned"
# The first result should be about abstract classes (the more recent topic)
assert "abstract classes" in results[0]["context"].lower(), "Recent topic (abstract classes) not prioritized"
# If there are multiple results, check if the older topic is also returned but with lower priority
if len(results) > 1:
assert "variables" in results[1]["context"].lower(), "Older topic should be second"
# Verify that the scores reflect the recency prioritization
assert results[0]["score"] > results[1]["score"], "Recent topic should have higher score"
def test_future_timestamp_validation():
"""Test that ShortTermMemoryItem raises ValueError for future timestamps."""
# Setup agent and task for memory
agent = Agent(
role="Tutor",
goal="Teach programming concepts",
backstory="You are a programming tutor helping students learn.",
tools=[],
verbose=True,
)
task = Task(
description="Explain programming concepts to students.",
expected_output="Clear explanations of programming concepts.",
agent=agent,
)
# Create a future timestamp
future_timestamp = datetime.now() + timedelta(days=1)
# Test constructor validation
with pytest.raises(ValueError, match="Timestamp cannot be in the future"):
ShortTermMemoryItem(data="Test data", timestamp=future_timestamp)
# Test save method validation
memory = ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
# Create a memory item with a future timestamp
future_data = "Test data with future timestamp"
# We need to pass the data directly to the save method
# The save method will create a ShortTermMemoryItem internally
# and then we'll modify its timestamp before it's saved
# Mock datetime.now to return a fixed time
with patch('crewai.memory.short_term.short_term_memory_item.datetime') as mock_datetime:
# Set up the mock to return our future timestamp when now() is called
mock_datetime.now.return_value = future_timestamp
with pytest.raises(ValueError, match="Cannot save memory item with future timestamp"):
memory.save(value=future_data)