diff --git a/lib/crewai/src/crewai/memory/storage/valkey_storage.py b/lib/crewai/src/crewai/memory/storage/valkey_storage.py index ceae25c3b..d30a3c59c 100644 --- a/lib/crewai/src/crewai/memory/storage/valkey_storage.py +++ b/lib/crewai/src/crewai/memory/storage/valkey_storage.py @@ -1277,25 +1277,28 @@ class ValkeyStorage: cat_query = "|".join(escaped_categories) query_parts.append(f"@categories:{{{cat_query}}}") - # Metadata filters (AND logic) - # Format: @{key}:{value} - if metadata_filter: - for key, value in metadata_filter.items(): - # Escape key and value - escaped_key = self._escape_search_query(key) - escaped_value = self._escape_search_query(str(value)) - query_parts.append(f"@{escaped_key}:{{{escaped_value}}}") + # Metadata filters are NOT included in the FT.SEARCH query because the + # `memory_index` schema only defines a fixed set of fields + # (embedding/scope/categories/created_at/importance). Referencing + # arbitrary metadata keys via @{key}:{value} would cause FT.SEARCH to + # error or silently return wrong results. They are applied as a + # post-filter on the deserialized records below. # Combine filters filter_query = " ".join(query_parts) if query_parts else "*" + # When a post-filter (metadata) will be applied, oversample the KNN + # results so we don't under-return after filtering. Mirrors the + # approach used by `LanceDBStorage`. + knn_limit = limit * 3 if metadata_filter else limit + # Build KNN query with filters # Format: (filter)=>[KNN limit @field $BLOB AS score] # Note: Don't wrap single "*" in parentheses if filter_query == "*": - query = f"{filter_query}=>[KNN {limit} @embedding $BLOB AS score]" + query = f"{filter_query}=>[KNN {knn_limit} @embedding $BLOB AS score]" else: - query = f"({filter_query})=>[KNN {limit} @embedding $BLOB AS score]" + query = f"({filter_query})=>[KNN {knn_limit} @embedding $BLOB AS score]" # Prepare embedding blob for PARAMS embedding_blob = self._embedding_to_bytes(query_embedding) @@ -1320,7 +1323,7 @@ class ValkeyStorage: search_options = FtSearchOptions( return_fields=return_fields, params={"BLOB": embedding_blob}, - limit=FtSearchLimit(0, limit), + limit=FtSearchLimit(0, knn_limit), ) try: @@ -1365,6 +1368,22 @@ class ValkeyStorage: if rec.scope == normalized or rec.scope.startswith(f"{normalized}/") ] + # Post-filter on metadata. The FT index does not materialize + # arbitrary metadata fields, so we apply the filter in Python + # against the deserialized records. Compare directly first, then + # fall back to a string comparison so callers can pass either the + # native value (e.g. 42) or its string form ("42"). + if metadata_filter: + records = [ + (rec, score) + for rec, score in records + if self._metadata_matches(rec.metadata, metadata_filter) + ] + + # Trim to the originally requested limit after post-filtering. + if len(records) > limit: + records = records[:limit] + return records except Exception as e: @@ -1432,6 +1451,37 @@ class ValkeyStorage: return None return (record, score) + @staticmethod + def _metadata_matches( + record_metadata: dict[str, Any], + metadata_filter: dict[str, Any], + ) -> bool: + """Check whether a record's metadata satisfies all filter pairs. + + Performs an equality check on each key; values are compared directly + first, then as strings to allow callers to pass either the native + value or its string form (matching the behaviour of the metadata + index used by ``_find_records_by_metadata``). + + Args: + record_metadata: Metadata dict from the stored record. + metadata_filter: Key-value pairs that must all match (AND logic). + + Returns: + True if every key in ``metadata_filter`` is present in + ``record_metadata`` with an equal value, False otherwise. + """ + for key, expected in metadata_filter.items(): + if key not in record_metadata: + return False + actual = record_metadata[key] + if actual == expected: + continue + if str(actual) == str(expected): + continue + return False + return True + def _escape_search_query(self, text: str) -> str: """Escape special characters in Valkey Search query. diff --git a/lib/crewai/tests/memory/storage/test_valkey_storage_search.py b/lib/crewai/tests/memory/storage/test_valkey_storage_search.py index 73ee98920..4c8fa9d47 100644 --- a/lib/crewai/tests/memory/storage/test_valkey_storage_search.py +++ b/lib/crewai/tests/memory/storage/test_valkey_storage_search.py @@ -248,12 +248,15 @@ class TestValkeyStorageVectorSearch: limit=10 ) - # Verify query contains metadata filters (AND logic) + # Metadata filters must NOT appear in the FT.SEARCH query because the + # `memory_index` schema does not index arbitrary metadata fields. + # They are applied as a Python post-filter instead. See issue #5795. call_args = mock_ft_search.call_args query = call_args[0][2] - assert "@agent_id:{agent\\-1}" in query or "@agent_id:{agent-1}" in query - assert "@priority:{high}" in query - assert "=>[KNN 10 @embedding $BLOB AS score]" in query + assert "@agent_id" not in query + assert "@priority" not in query + # KNN limit should be oversampled (limit * 3) when post-filtering. + assert "=>[KNN 30 @embedding $BLOB AS score]" in query # Verify results assert len(results) == 1 @@ -290,13 +293,15 @@ class TestValkeyStorageVectorSearch: limit=10 ) - # Verify query contains all filters + # Scope/category filters are server-side; metadata is post-filtered + # (see issue #5795) so it must NOT appear in the FT query. call_args = mock_ft_search.call_args query = call_args[0][2] assert "@scope:{/agent*}" in query assert "@categories:{planning}" in query - assert "@agent_id:{agent\\-1}" in query or "@agent_id:{agent-1}" in query - assert "=>[KNN 10 @embedding $BLOB AS score]" in query + assert "@agent_id" not in query + # Metadata filter triggers oversampling of the KNN limit (limit * 3). + assert "=>[KNN 30 @embedding $BLOB AS score]" in query # Verify results assert len(results) == 1 @@ -539,11 +544,17 @@ class TestValkeyStorageVectorSearch: limit=10 ) - # Verify query contains string-converted metadata values + # Metadata filters (including numeric values) must not leak into the + # FT.SEARCH query — they are applied as a Python post-filter. + # See issue #5795. call_args = mock_ft_search.call_args query = call_args[0][2] - assert "@count:{42}" in query - assert "@score:{3" in query and "14}" in query + assert "@count" not in query + assert "@score" not in query + + # Verify post-filter accepts both native and string-converted values. + assert len(results) == 1 + assert results[0][0].id == "record-1" @pytest.mark.asyncio @patch("crewai.memory.storage.valkey_storage.ft.search") @@ -902,12 +913,13 @@ class TestValkeyStorageVectorSearch: limit=10 ) - # Verify query contains AND logic for metadata + # Metadata filters must NOT be in the FT query (see issue #5795). + # AND logic is enforced by the Python post-filter. call_args = mock_ft_search.call_args query = call_args[0][2] - assert "@agent_id:" in query - assert "@priority:" in query - assert "@status:" in query + assert "@agent_id" not in query + assert "@priority" not in query + assert "@status" not in query # Verify record matching all metadata is returned assert len(results) == 1 @@ -995,4 +1007,265 @@ class TestValkeyStorageVectorSearch: # Verify all results are returned assert len(results) == 2 assert results[0][0].id == "record-1" - assert results[1][0].id == "record-2" \ No newline at end of file + assert results[1][0].id == "record-2" + + +class TestValkeyStorageMetadataFilterPostFilter: + """Regression tests for issue #5795. + + The `memory_index` FT schema only defines fields for + `embedding`, `scope`, `categories`, `created_at`, and `importance`. + `metadata_filter` cannot be applied via `@key:{value}` clauses in + FT.SEARCH because the referenced metadata fields are not indexed; doing + so causes the query to error or return wrong results on a real Valkey + Search backend. Instead, `metadata_filter` is applied as a Python + post-filter on the deserialized records. These tests guard that + behaviour. + """ + + @pytest.mark.asyncio + @patch("crewai.memory.storage.valkey_storage.ft.search") + @patch("crewai.memory.storage.valkey_storage.ft.list") + async def test_metadata_filter_does_not_reference_unindexed_fields( + self, mock_ft_list: AsyncMock, mock_ft_search: AsyncMock, + valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock, + ) -> None: + """FT.SEARCH query must not contain @ clauses. + + The `memory_index` schema only indexes scope/categories/created_at/ + importance/embedding. Referencing arbitrary metadata keys would make + FT.SEARCH error or silently return wrong results. + """ + record = MemoryRecord( + id="record-1", + content="indexed record", + scope="/test", + metadata={"agent_id": "agent-1", "page": 5, "tag-name": "v1"}, + embedding=[0.1, 0.2, 0.3, 0.4], + ) + mock_ft_list.return_value = [b"memory_index"] + mock_ft_search.return_value = create_mock_ft_search_response( + [(record, 0.9)] + ) + + await valkey_storage.asearch( + [0.1, 0.2, 0.3, 0.4], + metadata_filter={"agent_id": "agent-1", "page": 5, "tag-name": "v1"}, + limit=10, + ) + + query = mock_ft_search.call_args[0][2] + # No metadata key should leak into the FT query. + assert "@agent_id" not in query + assert "@page" not in query + assert "@tag" not in query + # The KNN clause itself must still be present. + assert "[KNN" in query and "@embedding $BLOB" in query + + @pytest.mark.asyncio + @patch("crewai.memory.storage.valkey_storage.ft.search") + @patch("crewai.memory.storage.valkey_storage.ft.list") + async def test_metadata_filter_excludes_non_matching_records( + self, mock_ft_list: AsyncMock, mock_ft_search: AsyncMock, + valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock, + ) -> None: + """Records that don't satisfy metadata_filter must be excluded. + + Prior to the fix the filter was silently ignored on the server side, + so both records would be returned. The Python post-filter must + exclude `record-2` because its `agent_id` differs. + """ + match = MemoryRecord( + id="record-1", + content="should match", + scope="/test", + metadata={"agent_id": "agent-1", "priority": "high"}, + embedding=[0.1, 0.2, 0.3, 0.4], + ) + not_match = MemoryRecord( + id="record-2", + content="wrong agent", + scope="/test", + metadata={"agent_id": "agent-2", "priority": "high"}, + embedding=[0.1, 0.2, 0.3, 0.4], + ) + mock_ft_list.return_value = [b"memory_index"] + mock_ft_search.return_value = create_mock_ft_search_response( + [(match, 0.95), (not_match, 0.94)] + ) + + results = await valkey_storage.asearch( + [0.1, 0.2, 0.3, 0.4], + metadata_filter={"agent_id": "agent-1"}, + limit=10, + ) + + assert [r.id for r, _ in results] == ["record-1"] + + @pytest.mark.asyncio + @patch("crewai.memory.storage.valkey_storage.ft.search") + @patch("crewai.memory.storage.valkey_storage.ft.list") + async def test_metadata_filter_excludes_records_missing_the_key( + self, mock_ft_list: AsyncMock, mock_ft_search: AsyncMock, + valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock, + ) -> None: + """Records that don't have the filter key at all must be excluded.""" + has_key = MemoryRecord( + id="record-1", + content="has the key", + scope="/test", + metadata={"agent_id": "agent-1"}, + embedding=[0.1, 0.2, 0.3, 0.4], + ) + missing_key = MemoryRecord( + id="record-2", + content="missing key", + scope="/test", + metadata={"other": "value"}, + embedding=[0.1, 0.2, 0.3, 0.4], + ) + mock_ft_list.return_value = [b"memory_index"] + mock_ft_search.return_value = create_mock_ft_search_response( + [(has_key, 0.9), (missing_key, 0.89)] + ) + + results = await valkey_storage.asearch( + [0.1, 0.2, 0.3, 0.4], + metadata_filter={"agent_id": "agent-1"}, + limit=10, + ) + + assert [r.id for r, _ in results] == ["record-1"] + + @pytest.mark.asyncio + @patch("crewai.memory.storage.valkey_storage.ft.search") + @patch("crewai.memory.storage.valkey_storage.ft.list") + async def test_metadata_filter_numeric_value_post_filter( + self, mock_ft_list: AsyncMock, mock_ft_search: AsyncMock, + valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock, + ) -> None: + """Numeric metadata values are post-filtered (native or string form).""" + match = MemoryRecord( + id="record-1", + content="numeric match", + scope="/test", + metadata={"count": 42}, + embedding=[0.1, 0.2, 0.3, 0.4], + ) + not_match = MemoryRecord( + id="record-2", + content="numeric mismatch", + scope="/test", + metadata={"count": 7}, + embedding=[0.1, 0.2, 0.3, 0.4], + ) + mock_ft_list.return_value = [b"memory_index"] + mock_ft_search.return_value = create_mock_ft_search_response( + [(match, 0.95), (not_match, 0.94)] + ) + + # Caller passes the native value. + results_native = await valkey_storage.asearch( + [0.1, 0.2, 0.3, 0.4], + metadata_filter={"count": 42}, + limit=10, + ) + assert [r.id for r, _ in results_native] == ["record-1"] + + # Caller passes the string form — must still match because the + # post-filter compares both native and stringified values. + mock_ft_search.return_value = create_mock_ft_search_response( + [(match, 0.95), (not_match, 0.94)] + ) + results_str = await valkey_storage.asearch( + [0.1, 0.2, 0.3, 0.4], + metadata_filter={"count": "42"}, + limit=10, + ) + assert [r.id for r, _ in results_str] == ["record-1"] + + @pytest.mark.asyncio + @patch("crewai.memory.storage.valkey_storage.ft.search") + @patch("crewai.memory.storage.valkey_storage.ft.list") + async def test_metadata_filter_oversamples_knn_limit( + self, mock_ft_list: AsyncMock, mock_ft_search: AsyncMock, + valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock, + ) -> None: + """KNN limit must be oversampled when metadata_filter is set. + + Without oversampling, post-filtering can under-return: a caller asking + for top-K may get fewer than K matching records even though more + exist. The implementation multiplies `limit` by 3 (matching the + approach used by `LanceDBStorage`). + """ + mock_ft_list.return_value = [b"memory_index"] + mock_ft_search.return_value = [0] + + # Without metadata_filter: KNN limit equals limit. + await valkey_storage.asearch( + [0.1, 0.2, 0.3, 0.4], limit=5, + ) + query_no_filter = mock_ft_search.call_args[0][2] + assert "[KNN 5 @embedding" in query_no_filter + + # With metadata_filter: KNN limit is oversampled to limit * 3. + await valkey_storage.asearch( + [0.1, 0.2, 0.3, 0.4], + metadata_filter={"agent_id": "agent-1"}, + limit=5, + ) + query_with_filter = mock_ft_search.call_args[0][2] + assert "[KNN 15 @embedding" in query_with_filter + + @pytest.mark.asyncio + @patch("crewai.memory.storage.valkey_storage.ft.search") + @patch("crewai.memory.storage.valkey_storage.ft.list") + async def test_metadata_filter_trims_results_to_limit( + self, mock_ft_list: AsyncMock, mock_ft_search: AsyncMock, + valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock, + ) -> None: + """After post-filtering, results are trimmed back to ``limit``.""" + records: list[tuple[MemoryRecord, float]] = [] + for i in range(6): + records.append( + ( + MemoryRecord( + id=f"record-{i}", + content=f"record {i}", + scope="/test", + metadata={"agent_id": "agent-1"}, + embedding=[0.1, 0.2, 0.3, 0.4], + ), + 0.99 - i * 0.01, + ) + ) + + mock_ft_list.return_value = [b"memory_index"] + mock_ft_search.return_value = create_mock_ft_search_response(records) + + results = await valkey_storage.asearch( + [0.1, 0.2, 0.3, 0.4], + metadata_filter={"agent_id": "agent-1"}, + limit=3, + ) + + # All 6 records satisfy the filter, but the caller asked for 3. + assert len(results) == 3 + assert [r.id for r, _ in results] == ["record-0", "record-1", "record-2"] + + def test_metadata_matches_helper_handles_types(self) -> None: + """`_metadata_matches` should accept native and string-form values.""" + helper = ValkeyStorage._metadata_matches + + assert helper({"a": 1}, {"a": 1}) is True + assert helper({"a": 1}, {"a": "1"}) is True + assert helper({"a": "1"}, {"a": 1}) is True + assert helper({"a": True}, {"a": True}) is True + assert helper({"a": 1}, {"a": 2}) is False + # AND logic across multiple keys. + assert helper({"a": 1, "b": 2}, {"a": 1, "b": 2}) is True + assert helper({"a": 1, "b": 2}, {"a": 1, "b": 3}) is False + # Missing keys must fail the match (not be silently ignored). + assert helper({"a": 1}, {"a": 1, "b": 2}) is False + # Empty filter trivially matches. + assert helper({"a": 1}, {}) is True \ No newline at end of file