fix(valkey): post-filter metadata in _vector_search instead of @key FT clauses

The memory_index FT schema only defines fields for embedding, scope,
categories, created_at and importance. Building @{key}:{value} clauses
from metadata_filter referenced fields that are not indexed, causing
FT.SEARCH to error or silently return wrong results.

Apply metadata_filter as a Python post-filter on the deserialized
records (matching the pattern used by LanceDBStorage) and oversample
the KNN limit (limit * 3) when a metadata_filter is supplied so we
don't under-return after filtering. Updated tests that previously
asserted the buggy server-side query and added regression tests for
the post-filter behaviour.

Closes #5795

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2026-05-13 17:31:40 +00:00
parent c5a9a8da50
commit 91656dc476
2 changed files with 349 additions and 26 deletions

View File

@@ -1277,25 +1277,28 @@ class ValkeyStorage:
cat_query = "|".join(escaped_categories)
query_parts.append(f"@categories:{{{cat_query}}}")
# Metadata filters (AND logic)
# Format: @{key}:{value}
if metadata_filter:
for key, value in metadata_filter.items():
# Escape key and value
escaped_key = self._escape_search_query(key)
escaped_value = self._escape_search_query(str(value))
query_parts.append(f"@{escaped_key}:{{{escaped_value}}}")
# Metadata filters are NOT included in the FT.SEARCH query because the
# `memory_index` schema only defines a fixed set of fields
# (embedding/scope/categories/created_at/importance). Referencing
# arbitrary metadata keys via @{key}:{value} would cause FT.SEARCH to
# error or silently return wrong results. They are applied as a
# post-filter on the deserialized records below.
# Combine filters
filter_query = " ".join(query_parts) if query_parts else "*"
# When a post-filter (metadata) will be applied, oversample the KNN
# results so we don't under-return after filtering. Mirrors the
# approach used by `LanceDBStorage`.
knn_limit = limit * 3 if metadata_filter else limit
# Build KNN query with filters
# Format: (filter)=>[KNN limit @field $BLOB AS score]
# Note: Don't wrap single "*" in parentheses
if filter_query == "*":
query = f"{filter_query}=>[KNN {limit} @embedding $BLOB AS score]"
query = f"{filter_query}=>[KNN {knn_limit} @embedding $BLOB AS score]"
else:
query = f"({filter_query})=>[KNN {limit} @embedding $BLOB AS score]"
query = f"({filter_query})=>[KNN {knn_limit} @embedding $BLOB AS score]"
# Prepare embedding blob for PARAMS
embedding_blob = self._embedding_to_bytes(query_embedding)
@@ -1320,7 +1323,7 @@ class ValkeyStorage:
search_options = FtSearchOptions(
return_fields=return_fields,
params={"BLOB": embedding_blob},
limit=FtSearchLimit(0, limit),
limit=FtSearchLimit(0, knn_limit),
)
try:
@@ -1365,6 +1368,22 @@ class ValkeyStorage:
if rec.scope == normalized or rec.scope.startswith(f"{normalized}/")
]
# Post-filter on metadata. The FT index does not materialize
# arbitrary metadata fields, so we apply the filter in Python
# against the deserialized records. Compare directly first, then
# fall back to a string comparison so callers can pass either the
# native value (e.g. 42) or its string form ("42").
if metadata_filter:
records = [
(rec, score)
for rec, score in records
if self._metadata_matches(rec.metadata, metadata_filter)
]
# Trim to the originally requested limit after post-filtering.
if len(records) > limit:
records = records[:limit]
return records
except Exception as e:
@@ -1432,6 +1451,37 @@ class ValkeyStorage:
return None
return (record, score)
@staticmethod
def _metadata_matches(
record_metadata: dict[str, Any],
metadata_filter: dict[str, Any],
) -> bool:
"""Check whether a record's metadata satisfies all filter pairs.
Performs an equality check on each key; values are compared directly
first, then as strings to allow callers to pass either the native
value or its string form (matching the behaviour of the metadata
index used by ``_find_records_by_metadata``).
Args:
record_metadata: Metadata dict from the stored record.
metadata_filter: Key-value pairs that must all match (AND logic).
Returns:
True if every key in ``metadata_filter`` is present in
``record_metadata`` with an equal value, False otherwise.
"""
for key, expected in metadata_filter.items():
if key not in record_metadata:
return False
actual = record_metadata[key]
if actual == expected:
continue
if str(actual) == str(expected):
continue
return False
return True
def _escape_search_query(self, text: str) -> str:
"""Escape special characters in Valkey Search query.

View File

@@ -248,12 +248,15 @@ class TestValkeyStorageVectorSearch:
limit=10
)
# Verify query contains metadata filters (AND logic)
# Metadata filters must NOT appear in the FT.SEARCH query because the
# `memory_index` schema does not index arbitrary metadata fields.
# They are applied as a Python post-filter instead. See issue #5795.
call_args = mock_ft_search.call_args
query = call_args[0][2]
assert "@agent_id:{agent\\-1}" in query or "@agent_id:{agent-1}" in query
assert "@priority:{high}" in query
assert "=>[KNN 10 @embedding $BLOB AS score]" in query
assert "@agent_id" not in query
assert "@priority" not in query
# KNN limit should be oversampled (limit * 3) when post-filtering.
assert "=>[KNN 30 @embedding $BLOB AS score]" in query
# Verify results
assert len(results) == 1
@@ -290,13 +293,15 @@ class TestValkeyStorageVectorSearch:
limit=10
)
# Verify query contains all filters
# Scope/category filters are server-side; metadata is post-filtered
# (see issue #5795) so it must NOT appear in the FT query.
call_args = mock_ft_search.call_args
query = call_args[0][2]
assert "@scope:{/agent*}" in query
assert "@categories:{planning}" in query
assert "@agent_id:{agent\\-1}" in query or "@agent_id:{agent-1}" in query
assert "=>[KNN 10 @embedding $BLOB AS score]" in query
assert "@agent_id" not in query
# Metadata filter triggers oversampling of the KNN limit (limit * 3).
assert "=>[KNN 30 @embedding $BLOB AS score]" in query
# Verify results
assert len(results) == 1
@@ -539,11 +544,17 @@ class TestValkeyStorageVectorSearch:
limit=10
)
# Verify query contains string-converted metadata values
# Metadata filters (including numeric values) must not leak into the
# FT.SEARCH query — they are applied as a Python post-filter.
# See issue #5795.
call_args = mock_ft_search.call_args
query = call_args[0][2]
assert "@count:{42}" in query
assert "@score:{3" in query and "14}" in query
assert "@count" not in query
assert "@score" not in query
# Verify post-filter accepts both native and string-converted values.
assert len(results) == 1
assert results[0][0].id == "record-1"
@pytest.mark.asyncio
@patch("crewai.memory.storage.valkey_storage.ft.search")
@@ -902,12 +913,13 @@ class TestValkeyStorageVectorSearch:
limit=10
)
# Verify query contains AND logic for metadata
# Metadata filters must NOT be in the FT query (see issue #5795).
# AND logic is enforced by the Python post-filter.
call_args = mock_ft_search.call_args
query = call_args[0][2]
assert "@agent_id:" in query
assert "@priority:" in query
assert "@status:" in query
assert "@agent_id" not in query
assert "@priority" not in query
assert "@status" not in query
# Verify record matching all metadata is returned
assert len(results) == 1
@@ -995,4 +1007,265 @@ class TestValkeyStorageVectorSearch:
# Verify all results are returned
assert len(results) == 2
assert results[0][0].id == "record-1"
assert results[1][0].id == "record-2"
assert results[1][0].id == "record-2"
class TestValkeyStorageMetadataFilterPostFilter:
"""Regression tests for issue #5795.
The `memory_index` FT schema only defines fields for
`embedding`, `scope`, `categories`, `created_at`, and `importance`.
`metadata_filter` cannot be applied via `@key:{value}` clauses in
FT.SEARCH because the referenced metadata fields are not indexed; doing
so causes the query to error or return wrong results on a real Valkey
Search backend. Instead, `metadata_filter` is applied as a Python
post-filter on the deserialized records. These tests guard that
behaviour.
"""
@pytest.mark.asyncio
@patch("crewai.memory.storage.valkey_storage.ft.search")
@patch("crewai.memory.storage.valkey_storage.ft.list")
async def test_metadata_filter_does_not_reference_unindexed_fields(
self, mock_ft_list: AsyncMock, mock_ft_search: AsyncMock,
valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock,
) -> None:
"""FT.SEARCH query must not contain @<metadata_key> clauses.
The `memory_index` schema only indexes scope/categories/created_at/
importance/embedding. Referencing arbitrary metadata keys would make
FT.SEARCH error or silently return wrong results.
"""
record = MemoryRecord(
id="record-1",
content="indexed record",
scope="/test",
metadata={"agent_id": "agent-1", "page": 5, "tag-name": "v1"},
embedding=[0.1, 0.2, 0.3, 0.4],
)
mock_ft_list.return_value = [b"memory_index"]
mock_ft_search.return_value = create_mock_ft_search_response(
[(record, 0.9)]
)
await valkey_storage.asearch(
[0.1, 0.2, 0.3, 0.4],
metadata_filter={"agent_id": "agent-1", "page": 5, "tag-name": "v1"},
limit=10,
)
query = mock_ft_search.call_args[0][2]
# No metadata key should leak into the FT query.
assert "@agent_id" not in query
assert "@page" not in query
assert "@tag" not in query
# The KNN clause itself must still be present.
assert "[KNN" in query and "@embedding $BLOB" in query
@pytest.mark.asyncio
@patch("crewai.memory.storage.valkey_storage.ft.search")
@patch("crewai.memory.storage.valkey_storage.ft.list")
async def test_metadata_filter_excludes_non_matching_records(
self, mock_ft_list: AsyncMock, mock_ft_search: AsyncMock,
valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock,
) -> None:
"""Records that don't satisfy metadata_filter must be excluded.
Prior to the fix the filter was silently ignored on the server side,
so both records would be returned. The Python post-filter must
exclude `record-2` because its `agent_id` differs.
"""
match = MemoryRecord(
id="record-1",
content="should match",
scope="/test",
metadata={"agent_id": "agent-1", "priority": "high"},
embedding=[0.1, 0.2, 0.3, 0.4],
)
not_match = MemoryRecord(
id="record-2",
content="wrong agent",
scope="/test",
metadata={"agent_id": "agent-2", "priority": "high"},
embedding=[0.1, 0.2, 0.3, 0.4],
)
mock_ft_list.return_value = [b"memory_index"]
mock_ft_search.return_value = create_mock_ft_search_response(
[(match, 0.95), (not_match, 0.94)]
)
results = await valkey_storage.asearch(
[0.1, 0.2, 0.3, 0.4],
metadata_filter={"agent_id": "agent-1"},
limit=10,
)
assert [r.id for r, _ in results] == ["record-1"]
@pytest.mark.asyncio
@patch("crewai.memory.storage.valkey_storage.ft.search")
@patch("crewai.memory.storage.valkey_storage.ft.list")
async def test_metadata_filter_excludes_records_missing_the_key(
self, mock_ft_list: AsyncMock, mock_ft_search: AsyncMock,
valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock,
) -> None:
"""Records that don't have the filter key at all must be excluded."""
has_key = MemoryRecord(
id="record-1",
content="has the key",
scope="/test",
metadata={"agent_id": "agent-1"},
embedding=[0.1, 0.2, 0.3, 0.4],
)
missing_key = MemoryRecord(
id="record-2",
content="missing key",
scope="/test",
metadata={"other": "value"},
embedding=[0.1, 0.2, 0.3, 0.4],
)
mock_ft_list.return_value = [b"memory_index"]
mock_ft_search.return_value = create_mock_ft_search_response(
[(has_key, 0.9), (missing_key, 0.89)]
)
results = await valkey_storage.asearch(
[0.1, 0.2, 0.3, 0.4],
metadata_filter={"agent_id": "agent-1"},
limit=10,
)
assert [r.id for r, _ in results] == ["record-1"]
@pytest.mark.asyncio
@patch("crewai.memory.storage.valkey_storage.ft.search")
@patch("crewai.memory.storage.valkey_storage.ft.list")
async def test_metadata_filter_numeric_value_post_filter(
self, mock_ft_list: AsyncMock, mock_ft_search: AsyncMock,
valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock,
) -> None:
"""Numeric metadata values are post-filtered (native or string form)."""
match = MemoryRecord(
id="record-1",
content="numeric match",
scope="/test",
metadata={"count": 42},
embedding=[0.1, 0.2, 0.3, 0.4],
)
not_match = MemoryRecord(
id="record-2",
content="numeric mismatch",
scope="/test",
metadata={"count": 7},
embedding=[0.1, 0.2, 0.3, 0.4],
)
mock_ft_list.return_value = [b"memory_index"]
mock_ft_search.return_value = create_mock_ft_search_response(
[(match, 0.95), (not_match, 0.94)]
)
# Caller passes the native value.
results_native = await valkey_storage.asearch(
[0.1, 0.2, 0.3, 0.4],
metadata_filter={"count": 42},
limit=10,
)
assert [r.id for r, _ in results_native] == ["record-1"]
# Caller passes the string form — must still match because the
# post-filter compares both native and stringified values.
mock_ft_search.return_value = create_mock_ft_search_response(
[(match, 0.95), (not_match, 0.94)]
)
results_str = await valkey_storage.asearch(
[0.1, 0.2, 0.3, 0.4],
metadata_filter={"count": "42"},
limit=10,
)
assert [r.id for r, _ in results_str] == ["record-1"]
@pytest.mark.asyncio
@patch("crewai.memory.storage.valkey_storage.ft.search")
@patch("crewai.memory.storage.valkey_storage.ft.list")
async def test_metadata_filter_oversamples_knn_limit(
self, mock_ft_list: AsyncMock, mock_ft_search: AsyncMock,
valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock,
) -> None:
"""KNN limit must be oversampled when metadata_filter is set.
Without oversampling, post-filtering can under-return: a caller asking
for top-K may get fewer than K matching records even though more
exist. The implementation multiplies `limit` by 3 (matching the
approach used by `LanceDBStorage`).
"""
mock_ft_list.return_value = [b"memory_index"]
mock_ft_search.return_value = [0]
# Without metadata_filter: KNN limit equals limit.
await valkey_storage.asearch(
[0.1, 0.2, 0.3, 0.4], limit=5,
)
query_no_filter = mock_ft_search.call_args[0][2]
assert "[KNN 5 @embedding" in query_no_filter
# With metadata_filter: KNN limit is oversampled to limit * 3.
await valkey_storage.asearch(
[0.1, 0.2, 0.3, 0.4],
metadata_filter={"agent_id": "agent-1"},
limit=5,
)
query_with_filter = mock_ft_search.call_args[0][2]
assert "[KNN 15 @embedding" in query_with_filter
@pytest.mark.asyncio
@patch("crewai.memory.storage.valkey_storage.ft.search")
@patch("crewai.memory.storage.valkey_storage.ft.list")
async def test_metadata_filter_trims_results_to_limit(
self, mock_ft_list: AsyncMock, mock_ft_search: AsyncMock,
valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock,
) -> None:
"""After post-filtering, results are trimmed back to ``limit``."""
records: list[tuple[MemoryRecord, float]] = []
for i in range(6):
records.append(
(
MemoryRecord(
id=f"record-{i}",
content=f"record {i}",
scope="/test",
metadata={"agent_id": "agent-1"},
embedding=[0.1, 0.2, 0.3, 0.4],
),
0.99 - i * 0.01,
)
)
mock_ft_list.return_value = [b"memory_index"]
mock_ft_search.return_value = create_mock_ft_search_response(records)
results = await valkey_storage.asearch(
[0.1, 0.2, 0.3, 0.4],
metadata_filter={"agent_id": "agent-1"},
limit=3,
)
# All 6 records satisfy the filter, but the caller asked for 3.
assert len(results) == 3
assert [r.id for r, _ in results] == ["record-0", "record-1", "record-2"]
def test_metadata_matches_helper_handles_types(self) -> None:
"""`_metadata_matches` should accept native and string-form values."""
helper = ValkeyStorage._metadata_matches
assert helper({"a": 1}, {"a": 1}) is True
assert helper({"a": 1}, {"a": "1"}) is True
assert helper({"a": "1"}, {"a": 1}) is True
assert helper({"a": True}, {"a": True}) is True
assert helper({"a": 1}, {"a": 2}) is False
# AND logic across multiple keys.
assert helper({"a": 1, "b": 2}, {"a": 1, "b": 2}) is True
assert helper({"a": 1, "b": 2}, {"a": 1, "b": 3}) is False
# Missing keys must fail the match (not be silently ignored).
assert helper({"a": 1}, {"a": 1, "b": 2}) is False
# Empty filter trivially matches.
assert helper({"a": 1}, {}) is True