mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
Add Couchbase as a tool (#264)
* - Added CouchbaseFTSVectorStore as a CrewAI tool. - Wrote a README to setup the tool. - Wrote test cases. - Added Couchbase as an optional dependency in the project. * Fixed naming in some places. Added docstrings. Added instructions on how to create a vector search index. * Fixed pyproject.toml * error handling and response format - Removed unnecessary ImportError for missing 'couchbase' package. - Changed response format from a concatenated string to a JSON array for search results. - Updated error handling to return error messages instead of raising exceptions in certain cases. - Adjusted tests to reflect changes in response format and error handling. * Update dependencies in pyproject.toml and uv.lock - Changed pydantic version from 2.6.1 to 2.10.6 in both pyproject.toml and uv.lock. - Updated crewai-tools version from 0.42.2 to 0.42.3 in uv.lock. - Adjusted pydantic-core version from 2.33.1 to 2.27.2 in uv.lock, reflecting the new pydantic version. * Removed restrictive pydantic version and updated uv.lock * synced lockfile * regenerated lockfile * updated lockfile * regenerated lockfile * Update tool specifications for * Fix test cases --------- Co-authored-by: AayushTyagi1 <tyagiaayush5@gmail.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
@@ -14,6 +14,7 @@ from .tools import (
|
||||
CodeDocsSearchTool,
|
||||
CodeInterpreterTool,
|
||||
ComposioTool,
|
||||
CouchbaseFTSVectorSearchTool,
|
||||
CrewaiEnterpriseTools,
|
||||
CSVSearchTool,
|
||||
DallETool,
|
||||
|
||||
@@ -5,6 +5,7 @@ from .browserbase_load_tool.browserbase_load_tool import BrowserbaseLoadTool
|
||||
from .code_docs_search_tool.code_docs_search_tool import CodeDocsSearchTool
|
||||
from .code_interpreter_tool.code_interpreter_tool import CodeInterpreterTool
|
||||
from .composio_tool.composio_tool import ComposioTool
|
||||
from .couchbase_tool.couchbase_tool import CouchbaseFTSVectorSearchTool
|
||||
from .crewai_enterprise_tools.crewai_enterprise_tools import CrewaiEnterpriseTools
|
||||
from .csv_search_tool.csv_search_tool import CSVSearchTool
|
||||
from .dalle_tool.dalle_tool import DallETool
|
||||
|
||||
62
src/crewai_tools/tools/couchbase_tool/README.md
Normal file
62
src/crewai_tools/tools/couchbase_tool/README.md
Normal file
@@ -0,0 +1,62 @@
|
||||
# CouchbaseFTSVectorSearchTool
|
||||
## Description
|
||||
Couchbase is a NoSQL database with vector search capabilities. Users can store and query vector embeddings. You can learn more about Couchbase vector search here: https://docs.couchbase.com/cloud/vector-search/vector-search.html
|
||||
|
||||
This tool is specifically crafted for performing semantic search using Couchbase. Use this tool to find semantically similar docs to a given query.
|
||||
|
||||
## Installation
|
||||
Install the crewai_tools package by executing the following command in your terminal:
|
||||
|
||||
```shell
|
||||
uv pip install 'crewai[tools]'
|
||||
```
|
||||
|
||||
## Setup
|
||||
Before instantiating the tool, you need a Couchbase cluster.
|
||||
- Create a cluster on [Couchbase Capella](https://docs.couchbase.com/cloud/get-started/create-account.html), Couchbase's cloud database solution.
|
||||
- Create a [local Couchbase server](https://docs.couchbase.com/server/current/getting-started/start-here.html).
|
||||
|
||||
You will need to create a bucket, scope and collection on the cluster. Then, [follow this guide](https://docs.couchbase.com/python-sdk/current/hello-world/start-using-sdk.html) to create a Couchbase Cluster object and load documents into your collection.
|
||||
|
||||
Follow the docs below to create a vector search index on Couchbase.
|
||||
- [Create a vector search index on Couchbase Capella.](https://docs.couchbase.com/cloud/vector-search/create-vector-search-index-ui.html)
|
||||
- [Create a vector search index on your local Couchbase server.](https://docs.couchbase.com/server/current/vector-search/create-vector-search-index-ui.html)
|
||||
|
||||
Ensure that the `Dimension` field in the index matches the embedding model. For example, OpenAI's `text-embedding-3-small` model has an embedding dimension of 1536 dimensions, and so the `Dimension` field must be 1536 in the index.
|
||||
|
||||
## Example
|
||||
To utilize the CouchbaseFTSVectorSearchTool for different use cases, follow these examples:
|
||||
|
||||
```python
|
||||
from crewai_tools import CouchbaseFTSVectorSearchTool
|
||||
|
||||
# Instantiate a Couchbase Cluster object from the Couchbase SDK
|
||||
|
||||
tool = CouchbaseFTSVectorSearchTool(
|
||||
cluster=cluster,
|
||||
collection_name="collection",
|
||||
scope_name="scope",
|
||||
bucket_name="bucket",
|
||||
index_name="index",
|
||||
embedding_function=embed_fn
|
||||
)
|
||||
|
||||
# Adding the tool to an agent
|
||||
rag_agent = Agent(
|
||||
name="rag_agent",
|
||||
role="You are a helpful assistant that can answer questions with the help of the CouchbaseFTSVectorSearchTool.",
|
||||
llm="gpt-4o-mini",
|
||||
tools=[tool],
|
||||
)
|
||||
```
|
||||
|
||||
## Arguments
|
||||
- `cluster`: An initialized Couchbase `Cluster` instance.
|
||||
- `bucket_name`: The name of the Couchbase bucket.
|
||||
- `scope_name`: The name of the scope within the bucket.
|
||||
- `collection_name`: The name of the collection within the scope.
|
||||
- `index_name`: The name of the search index (vector index).
|
||||
- `embedding_function`: A function that takes a string and returns its embedding (list of floats).
|
||||
- `embedding_key`: Name of the field in the search index storing the vector. (Optional, defaults to 'embedding')
|
||||
- `scoped_index`: Whether the index is scoped (True) or cluster-level (False). (Optional, defaults to True)
|
||||
- `limit`: The maximum number of search results to return. (Optional, defaults to 3)
|
||||
241
src/crewai_tools/tools/couchbase_tool/couchbase_tool.py
Normal file
241
src/crewai_tools/tools/couchbase_tool/couchbase_tool.py
Normal file
@@ -0,0 +1,241 @@
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Optional, Type, List, Dict, Callable
|
||||
|
||||
try:
|
||||
import couchbase.search as search
|
||||
from couchbase.cluster import Cluster
|
||||
from couchbase.options import SearchOptions
|
||||
from couchbase.vector_search import VectorQuery, VectorSearch
|
||||
|
||||
COUCHBASE_AVAILABLE = True
|
||||
except ImportError:
|
||||
COUCHBASE_AVAILABLE = False
|
||||
search = Any
|
||||
Cluster = Any
|
||||
SearchOptions = Any
|
||||
VectorQuery = Any
|
||||
VectorSearch = Any
|
||||
|
||||
from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel, Field, SkipValidation
|
||||
|
||||
|
||||
class CouchbaseToolSchema(BaseModel):
|
||||
"""Input for CouchbaseTool."""
|
||||
|
||||
query: str = Field(
|
||||
...,
|
||||
description="The query to search retrieve relevant information from the Couchbase database. Pass only the query, not the question.",
|
||||
)
|
||||
|
||||
class CouchbaseFTSVectorSearchTool(BaseTool):
|
||||
"""Tool to search the Couchbase database"""
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
name: str = "CouchbaseFTSVectorSearchTool"
|
||||
description: str = "A tool to search the Couchbase database for relevant information on internal documents."
|
||||
args_schema: Type[BaseModel] = CouchbaseToolSchema
|
||||
cluster: SkipValidation[Optional[Cluster]] = None
|
||||
collection_name: Optional[str] = None,
|
||||
scope_name: Optional[str] = None,
|
||||
bucket_name: Optional[str] = None,
|
||||
index_name: Optional[str] = None,
|
||||
embedding_key: Optional[str] = Field(
|
||||
default="embedding",
|
||||
description="Name of the field in the search index that stores the vector"
|
||||
)
|
||||
scoped_index: Optional[bool] = Field(
|
||||
default=True,
|
||||
description="Specify whether the index is scoped. Is True by default."
|
||||
),
|
||||
limit: Optional[int] = Field(default=3)
|
||||
embedding_function: SkipValidation[Callable[[str], List[float]]] = Field(
|
||||
default=None,
|
||||
description="A function that takes a string and returns a list of floats. This is used to embed the query before searching the database."
|
||||
)
|
||||
|
||||
def _check_bucket_exists(self) -> bool:
|
||||
"""Check if the bucket exists in the linked Couchbase cluster"""
|
||||
bucket_manager = self.cluster.buckets()
|
||||
try:
|
||||
bucket_manager.get_bucket(self.bucket_name)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _check_scope_and_collection_exists(self) -> bool:
|
||||
"""Check if the scope and collection exists in the linked Couchbase bucket
|
||||
Raises a ValueError if either is not found"""
|
||||
scope_collection_map: Dict[str, Any] = {}
|
||||
|
||||
# Get a list of all scopes in the bucket
|
||||
for scope in self._bucket.collections().get_all_scopes():
|
||||
scope_collection_map[scope.name] = []
|
||||
|
||||
# Get a list of all the collections in the scope
|
||||
for collection in scope.collections:
|
||||
scope_collection_map[scope.name].append(collection.name)
|
||||
|
||||
# Check if the scope exists
|
||||
if self.scope_name not in scope_collection_map.keys():
|
||||
raise ValueError(
|
||||
f"Scope {self.scope_name} not found in Couchbase "
|
||||
f"bucket {self.bucket_name}"
|
||||
)
|
||||
|
||||
# Check if the collection exists in the scope
|
||||
if self.collection_name not in scope_collection_map[self.scope_name]:
|
||||
raise ValueError(
|
||||
f"Collection {self.collection_name} not found in scope "
|
||||
f"{self.scope_name} in Couchbase bucket {self.bucket_name}"
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def _check_index_exists(self) -> bool:
|
||||
"""Check if the Search index exists in the linked Couchbase cluster
|
||||
Raises a ValueError if the index does not exist"""
|
||||
if self.scoped_index:
|
||||
all_indexes = [
|
||||
index.name for index in self._scope.search_indexes().get_all_indexes()
|
||||
]
|
||||
if self.index_name not in all_indexes:
|
||||
raise ValueError(
|
||||
f"Index {self.index_name} does not exist. "
|
||||
" Please create the index before searching."
|
||||
)
|
||||
else:
|
||||
all_indexes = [
|
||||
index.name for index in self.cluster.search_indexes().get_all_indexes()
|
||||
]
|
||||
if self.index_name not in all_indexes:
|
||||
raise ValueError(
|
||||
f"Index {self.index_name} does not exist. "
|
||||
" Please create the index before searching."
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""Initialize the CouchbaseFTSVectorSearchTool.
|
||||
|
||||
Args:
|
||||
**kwargs: Keyword arguments to pass to the BaseTool constructor and
|
||||
to configure the Couchbase connection and search parameters.
|
||||
Requires 'cluster', 'bucket_name', 'scope_name',
|
||||
'collection_name', 'index_name', and 'embedding_function'.
|
||||
|
||||
Raises:
|
||||
ValueError: If required parameters are missing, the Couchbase cluster
|
||||
cannot be reached, or the specified bucket, scope,
|
||||
collection, or index does not exist.
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
if COUCHBASE_AVAILABLE:
|
||||
try:
|
||||
if not self.cluster:
|
||||
raise ValueError("Cluster instance must be provided")
|
||||
|
||||
if not self.bucket_name:
|
||||
raise ValueError("Bucket name must be provided")
|
||||
|
||||
if not self.scope_name:
|
||||
raise ValueError("Scope name must be provided")
|
||||
|
||||
if not self.collection_name:
|
||||
raise ValueError("Collection name must be provided")
|
||||
|
||||
if not self.index_name:
|
||||
raise ValueError("Index name must be provided")
|
||||
|
||||
if not self.embedding_function:
|
||||
raise ValueError("Embedding function must be provided")
|
||||
|
||||
self._bucket = self.cluster.bucket(self.bucket_name)
|
||||
self._scope = self._bucket.scope(self.scope_name)
|
||||
self._collection = self._scope.collection(self.collection_name)
|
||||
except Exception as e:
|
||||
raise ValueError(
|
||||
"Error connecting to couchbase. "
|
||||
"Please check the connection and credentials"
|
||||
) from e
|
||||
|
||||
# check if bucket exists
|
||||
if not self._check_bucket_exists():
|
||||
raise ValueError(
|
||||
f"Bucket {self.bucket_name} does not exist. "
|
||||
" Please create the bucket before searching."
|
||||
)
|
||||
|
||||
self._check_scope_and_collection_exists()
|
||||
self._check_index_exists()
|
||||
else:
|
||||
import click
|
||||
|
||||
if click.confirm(
|
||||
"The 'couchbase' package is required to use the CouchbaseFTSVectorSearchTool. "
|
||||
"Would you like to install it?"
|
||||
):
|
||||
import subprocess
|
||||
|
||||
subprocess.run(["uv", "add", "couchbase"], check=True)
|
||||
else:
|
||||
raise ImportError(
|
||||
"The 'couchbase' package is required to use the CouchbaseFTSVectorSearchTool. "
|
||||
"Please install it with: uv add couchbase"
|
||||
)
|
||||
|
||||
def _run(self, query: str) -> str:
|
||||
"""Execute a vector search query against the Couchbase index.
|
||||
|
||||
Args:
|
||||
query: The search query string.
|
||||
|
||||
Returns:
|
||||
A JSON string containing the search results.
|
||||
|
||||
Raises:
|
||||
ValueError: If the search query fails or returns results without fields.
|
||||
"""
|
||||
query_embedding = self.embedding_function(query)
|
||||
fields = ["*"]
|
||||
|
||||
search_req = search.SearchRequest.create(
|
||||
VectorSearch.from_vector_query(
|
||||
VectorQuery(
|
||||
self.embedding_key,
|
||||
query_embedding,
|
||||
self.limit
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
try:
|
||||
if self.scoped_index:
|
||||
search_iter = self._scope.search(
|
||||
self.index_name,
|
||||
search_req,
|
||||
SearchOptions(
|
||||
limit=self.limit,
|
||||
fields=fields,
|
||||
)
|
||||
)
|
||||
else:
|
||||
search_iter = self.cluster.search(
|
||||
self.index_name,
|
||||
search_req,
|
||||
SearchOptions(
|
||||
limit=self.limit,
|
||||
fields=fields
|
||||
)
|
||||
)
|
||||
|
||||
json_response = []
|
||||
|
||||
for row in search_iter.rows():
|
||||
json_response.append(row.fields)
|
||||
except Exception as e:
|
||||
return f"Search failed with error: {e}"
|
||||
|
||||
return json.dumps(json_response, indent=2)
|
||||
365
tests/tools/couchbase_tool_test.py
Normal file
365
tests/tools/couchbase_tool_test.py
Normal file
@@ -0,0 +1,365 @@
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch, ANY
|
||||
|
||||
# Mock the couchbase library before importing the tool
|
||||
# This prevents ImportErrors if couchbase isn't installed in the test environment
|
||||
mock_couchbase = MagicMock()
|
||||
mock_couchbase.search = MagicMock()
|
||||
mock_couchbase.cluster = MagicMock()
|
||||
mock_couchbase.options = MagicMock()
|
||||
mock_couchbase.vector_search = MagicMock()
|
||||
|
||||
# Simulate the structure needed for checks
|
||||
mock_couchbase.cluster.Cluster = MagicMock()
|
||||
mock_couchbase.options.SearchOptions = MagicMock()
|
||||
mock_couchbase.vector_search.VectorQuery = MagicMock()
|
||||
mock_couchbase.vector_search.VectorSearch = MagicMock()
|
||||
mock_couchbase.search.SearchRequest = MagicMock() # Mock the class itself
|
||||
mock_couchbase.search.SearchRequest.create = MagicMock() # Mock the class method
|
||||
|
||||
# Add necessary exception types if needed for testing error handling
|
||||
class MockCouchbaseException(Exception):
|
||||
pass
|
||||
mock_couchbase.exceptions = MagicMock()
|
||||
mock_couchbase.exceptions.BucketNotFoundException = MockCouchbaseException
|
||||
mock_couchbase.exceptions.ScopeNotFoundException = MockCouchbaseException
|
||||
mock_couchbase.exceptions.CollectionNotFoundException = MockCouchbaseException
|
||||
mock_couchbase.exceptions.IndexNotFoundException = MockCouchbaseException
|
||||
|
||||
|
||||
import sys
|
||||
sys.modules['couchbase'] = mock_couchbase
|
||||
sys.modules['couchbase.search'] = mock_couchbase.search
|
||||
sys.modules['couchbase.cluster'] = mock_couchbase.cluster
|
||||
sys.modules['couchbase.options'] = mock_couchbase.options
|
||||
sys.modules['couchbase.vector_search'] = mock_couchbase.vector_search
|
||||
sys.modules['couchbase.exceptions'] = mock_couchbase.exceptions
|
||||
|
||||
# Now import the tool
|
||||
from crewai_tools.tools.couchbase_tool.couchbase_tool import CouchbaseFTSVectorSearchTool
|
||||
|
||||
# --- Test Fixtures ---
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_global_mocks():
|
||||
"""Reset call counts for globally defined mocks before each test."""
|
||||
# Reset the specific mock causing the issue
|
||||
mock_couchbase.vector_search.VectorQuery.reset_mock()
|
||||
# It's good practice to also reset other related global mocks
|
||||
# that might be called in your tests to prevent similar issues:
|
||||
mock_couchbase.vector_search.VectorSearch.from_vector_query.reset_mock()
|
||||
mock_couchbase.search.SearchRequest.create.reset_mock()
|
||||
|
||||
# Additional fixture to handle import pollution in full test suite
|
||||
@pytest.fixture(autouse=True)
|
||||
def ensure_couchbase_mocks():
|
||||
"""Ensure that couchbase imports are properly mocked even when other tests have run first."""
|
||||
# This fixture ensures our mocks are in place regardless of import order
|
||||
original_modules = {}
|
||||
|
||||
# Store any existing modules
|
||||
for module_name in ['couchbase', 'couchbase.search', 'couchbase.cluster', 'couchbase.options', 'couchbase.vector_search', 'couchbase.exceptions']:
|
||||
if module_name in sys.modules:
|
||||
original_modules[module_name] = sys.modules[module_name]
|
||||
|
||||
# Ensure our mocks are active
|
||||
sys.modules['couchbase'] = mock_couchbase
|
||||
sys.modules['couchbase.search'] = mock_couchbase.search
|
||||
sys.modules['couchbase.cluster'] = mock_couchbase.cluster
|
||||
sys.modules['couchbase.options'] = mock_couchbase.options
|
||||
sys.modules['couchbase.vector_search'] = mock_couchbase.vector_search
|
||||
sys.modules['couchbase.exceptions'] = mock_couchbase.exceptions
|
||||
|
||||
yield
|
||||
|
||||
# Restore original modules if they existed
|
||||
for module_name, original_module in original_modules.items():
|
||||
if original_module is not None:
|
||||
sys.modules[module_name] = original_module
|
||||
|
||||
@pytest.fixture
|
||||
def mock_cluster():
|
||||
cluster = MagicMock()
|
||||
bucket_manager = MagicMock()
|
||||
search_index_manager = MagicMock()
|
||||
bucket = MagicMock()
|
||||
scope = MagicMock()
|
||||
collection = MagicMock()
|
||||
scope_search_index_manager = MagicMock()
|
||||
|
||||
# Setup mock return values for checks
|
||||
cluster.buckets.return_value = bucket_manager
|
||||
cluster.search_indexes.return_value = search_index_manager
|
||||
cluster.bucket.return_value = bucket
|
||||
bucket.scope.return_value = scope
|
||||
scope.collection.return_value = collection
|
||||
scope.search_indexes.return_value = scope_search_index_manager
|
||||
|
||||
# Mock bucket existence check
|
||||
bucket_manager.get_bucket.return_value = True
|
||||
|
||||
# Mock scope/collection existence check
|
||||
mock_scope_spec = MagicMock()
|
||||
mock_scope_spec.name = "test_scope"
|
||||
mock_collection_spec = MagicMock()
|
||||
mock_collection_spec.name = "test_collection"
|
||||
mock_scope_spec.collections = [mock_collection_spec]
|
||||
bucket.collections.return_value.get_all_scopes.return_value = [mock_scope_spec]
|
||||
|
||||
# Mock index existence check
|
||||
mock_index_def = MagicMock()
|
||||
mock_index_def.name = "test_index"
|
||||
scope_search_index_manager.get_all_indexes.return_value = [mock_index_def]
|
||||
search_index_manager.get_all_indexes.return_value = [mock_index_def]
|
||||
|
||||
return cluster
|
||||
|
||||
@pytest.fixture
|
||||
def mock_embedding_function():
|
||||
# Simple mock embedding function
|
||||
# return lambda query: [0.1] * 10 # Example embedding vector
|
||||
return MagicMock(return_value=[0.1] * 10)
|
||||
|
||||
@pytest.fixture
|
||||
def tool_config(mock_cluster, mock_embedding_function):
|
||||
return {
|
||||
"cluster": mock_cluster,
|
||||
"bucket_name": "test_bucket",
|
||||
"scope_name": "test_scope",
|
||||
"collection_name": "test_collection",
|
||||
"index_name": "test_index",
|
||||
"embedding_function": mock_embedding_function,
|
||||
"limit": 5,
|
||||
"embedding_key": "test_embedding",
|
||||
"scoped_index": True
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def couchbase_tool(tool_config):
|
||||
# Patch COUCHBASE_AVAILABLE to True for these tests
|
||||
with patch('crewai_tools.tools.couchbase_tool.couchbase_tool.COUCHBASE_AVAILABLE', True):
|
||||
tool = CouchbaseFTSVectorSearchTool(**tool_config)
|
||||
return tool
|
||||
|
||||
@pytest.fixture
|
||||
def mock_search_iter():
|
||||
mock_iter = MagicMock()
|
||||
# Simulate search results with a 'fields' attribute
|
||||
mock_row1 = MagicMock()
|
||||
mock_row1.fields = {"id": "doc1", "text": "content 1", "test_embedding": [0.1]*10}
|
||||
mock_row2 = MagicMock()
|
||||
mock_row2.fields = {"id": "doc2", "text": "content 2", "test_embedding": [0.2]*10}
|
||||
mock_iter.rows.return_value = [mock_row1, mock_row2]
|
||||
return mock_iter
|
||||
|
||||
# --- Test Cases ---
|
||||
|
||||
def test_initialization_success(couchbase_tool, tool_config):
|
||||
"""Test successful initialization with valid config."""
|
||||
assert couchbase_tool.cluster == tool_config["cluster"]
|
||||
assert couchbase_tool.bucket_name == "test_bucket"
|
||||
assert couchbase_tool.scope_name == "test_scope"
|
||||
assert couchbase_tool.collection_name == "test_collection"
|
||||
assert couchbase_tool.index_name == "test_index"
|
||||
assert couchbase_tool.embedding_function is not None
|
||||
assert couchbase_tool.limit == 5
|
||||
assert couchbase_tool.embedding_key == "test_embedding"
|
||||
assert couchbase_tool.scoped_index == True
|
||||
|
||||
# Check if helper methods were called during init (via mocks in fixture)
|
||||
couchbase_tool.cluster.buckets().get_bucket.assert_called_once_with("test_bucket")
|
||||
couchbase_tool.cluster.bucket().collections().get_all_scopes.assert_called_once()
|
||||
couchbase_tool.cluster.bucket().scope().search_indexes().get_all_indexes.assert_called_once()
|
||||
|
||||
def test_initialization_missing_required_args(mock_cluster, mock_embedding_function):
|
||||
"""Test initialization fails when required arguments are missing."""
|
||||
base_config = {
|
||||
"cluster": mock_cluster, "bucket_name": "b", "scope_name": "s",
|
||||
"collection_name": "c", "index_name": "i", "embedding_function": mock_embedding_function
|
||||
}
|
||||
required_keys = base_config.keys()
|
||||
|
||||
with patch('crewai_tools.tools.couchbase_tool.couchbase_tool.COUCHBASE_AVAILABLE', True):
|
||||
for key in required_keys:
|
||||
incomplete_config = base_config.copy()
|
||||
del incomplete_config[key]
|
||||
with pytest.raises(ValueError):
|
||||
CouchbaseFTSVectorSearchTool(**incomplete_config)
|
||||
|
||||
def test_initialization_couchbase_unavailable():
|
||||
"""Test behavior when couchbase library is not available."""
|
||||
with patch('crewai_tools.tools.couchbase_tool.couchbase_tool.COUCHBASE_AVAILABLE', False):
|
||||
with patch('click.confirm', return_value=False) as mock_confirm:
|
||||
with pytest.raises(ImportError, match="The 'couchbase' package is required"):
|
||||
CouchbaseFTSVectorSearchTool(cluster=MagicMock(), bucket_name="b", scope_name="s",
|
||||
collection_name="c", index_name="i", embedding_function=MagicMock())
|
||||
mock_confirm.assert_called_once() # Ensure user was prompted
|
||||
|
||||
def test_run_success_scoped_index(couchbase_tool, mock_search_iter, tool_config, mock_embedding_function):
|
||||
"""Test successful _run execution with a scoped index."""
|
||||
query = "find relevant documents"
|
||||
# expected_embedding = mock_embedding_function(query)
|
||||
|
||||
# Mock the scope search method
|
||||
couchbase_tool._scope.search = MagicMock(return_value=mock_search_iter)
|
||||
# Mock the VectorQuery/VectorSearch/SearchRequest creation using runtime patching
|
||||
with patch('crewai_tools.tools.couchbase_tool.couchbase_tool.VectorQuery') as mock_vq, \
|
||||
patch('crewai_tools.tools.couchbase_tool.couchbase_tool.VectorSearch') as mock_vs, \
|
||||
patch('crewai_tools.tools.couchbase_tool.couchbase_tool.search.SearchRequest') as mock_sr, \
|
||||
patch('crewai_tools.tools.couchbase_tool.couchbase_tool.SearchOptions') as mock_so:
|
||||
|
||||
# Set up the mock objects and their return values
|
||||
mock_vector_query = MagicMock()
|
||||
mock_vector_search = MagicMock()
|
||||
mock_search_req = MagicMock()
|
||||
mock_search_options = MagicMock()
|
||||
|
||||
mock_vq.return_value = mock_vector_query
|
||||
mock_vs.from_vector_query.return_value = mock_vector_search
|
||||
mock_sr.create.return_value = mock_search_req
|
||||
mock_so.return_value = mock_search_options
|
||||
|
||||
result = couchbase_tool._run(query=query)
|
||||
|
||||
# Check embedding function call
|
||||
tool_config['embedding_function'].assert_called_once_with(query)
|
||||
|
||||
# Check VectorQuery call
|
||||
mock_vq.assert_called_once_with(
|
||||
tool_config['embedding_key'], mock_embedding_function.return_value, tool_config['limit']
|
||||
)
|
||||
# Check VectorSearch call
|
||||
mock_vs.from_vector_query.assert_called_once_with(mock_vector_query)
|
||||
# Check SearchRequest creation
|
||||
mock_sr.create.assert_called_once_with(mock_vector_search)
|
||||
# Check SearchOptions creation
|
||||
mock_so.assert_called_once_with(limit=tool_config['limit'], fields=["*"])
|
||||
|
||||
# Check that scope search was called correctly
|
||||
couchbase_tool._scope.search.assert_called_once_with(
|
||||
tool_config['index_name'],
|
||||
mock_search_req,
|
||||
mock_search_options
|
||||
)
|
||||
|
||||
# Check cluster search was NOT called
|
||||
couchbase_tool.cluster.search.assert_not_called()
|
||||
|
||||
# Check result format (simple check for JSON structure)
|
||||
assert '"id": "doc1"' in result
|
||||
assert '"id": "doc2"' in result
|
||||
assert result.startswith('[') # Should be valid JSON after concatenation
|
||||
|
||||
def test_run_success_global_index(tool_config, mock_search_iter, mock_embedding_function):
|
||||
"""Test successful _run execution with a global (non-scoped) index."""
|
||||
tool_config['scoped_index'] = False
|
||||
with patch('crewai_tools.tools.couchbase_tool.couchbase_tool.COUCHBASE_AVAILABLE', True):
|
||||
couchbase_tool = CouchbaseFTSVectorSearchTool(**tool_config)
|
||||
|
||||
query = "find global documents"
|
||||
# expected_embedding = mock_embedding_function(query)
|
||||
|
||||
# Mock the cluster search method
|
||||
couchbase_tool.cluster.search = MagicMock(return_value=mock_search_iter)
|
||||
# Mock the VectorQuery/VectorSearch/SearchRequest creation using runtime patching
|
||||
with patch('crewai_tools.tools.couchbase_tool.couchbase_tool.VectorQuery') as mock_vq, \
|
||||
patch('crewai_tools.tools.couchbase_tool.couchbase_tool.VectorSearch') as mock_vs, \
|
||||
patch('crewai_tools.tools.couchbase_tool.couchbase_tool.search.SearchRequest') as mock_sr, \
|
||||
patch('crewai_tools.tools.couchbase_tool.couchbase_tool.SearchOptions') as mock_so:
|
||||
|
||||
# Set up the mock objects and their return values
|
||||
mock_vector_query = MagicMock()
|
||||
mock_vector_search = MagicMock()
|
||||
mock_search_req = MagicMock()
|
||||
mock_search_options = MagicMock()
|
||||
|
||||
mock_vq.return_value = mock_vector_query
|
||||
mock_vs.from_vector_query.return_value = mock_vector_search
|
||||
mock_sr.create.return_value = mock_search_req
|
||||
mock_so.return_value = mock_search_options
|
||||
|
||||
result = couchbase_tool._run(query=query)
|
||||
|
||||
# Check embedding function call
|
||||
tool_config['embedding_function'].assert_called_once_with(query)
|
||||
|
||||
# Check VectorQuery/Search call
|
||||
mock_vq.assert_called_once_with(
|
||||
tool_config['embedding_key'], mock_embedding_function.return_value, tool_config['limit']
|
||||
)
|
||||
mock_sr.create.assert_called_once_with(mock_vector_search)
|
||||
# Check SearchOptions creation
|
||||
mock_so.assert_called_once_with(limit=tool_config['limit'], fields=["*"])
|
||||
|
||||
# Check that cluster search was called correctly
|
||||
couchbase_tool.cluster.search.assert_called_once_with(
|
||||
tool_config['index_name'],
|
||||
mock_search_req,
|
||||
mock_search_options
|
||||
)
|
||||
|
||||
# Check scope search was NOT called
|
||||
couchbase_tool._scope.search.assert_not_called()
|
||||
|
||||
# Check result format
|
||||
assert '"id": "doc1"' in result
|
||||
assert '"id": "doc2"' in result
|
||||
|
||||
def test_check_bucket_exists_fail(tool_config):
|
||||
"""Test check for bucket non-existence."""
|
||||
mock_cluster = tool_config['cluster']
|
||||
mock_cluster.buckets().get_bucket.side_effect = mock_couchbase.exceptions.BucketNotFoundException("Bucket not found")
|
||||
|
||||
with patch('crewai_tools.tools.couchbase_tool.couchbase_tool.COUCHBASE_AVAILABLE', True):
|
||||
with pytest.raises(ValueError, match="Bucket test_bucket does not exist."):
|
||||
CouchbaseFTSVectorSearchTool(**tool_config)
|
||||
|
||||
|
||||
def test_check_scope_exists_fail(tool_config):
|
||||
"""Test check for scope non-existence."""
|
||||
mock_cluster = tool_config['cluster']
|
||||
# Simulate scope not being in the list returned
|
||||
mock_scope_spec = MagicMock()
|
||||
mock_scope_spec.name = "wrong_scope"
|
||||
mock_cluster.bucket().collections().get_all_scopes.return_value = [mock_scope_spec]
|
||||
|
||||
with patch('crewai_tools.tools.couchbase_tool.couchbase_tool.COUCHBASE_AVAILABLE', True):
|
||||
with pytest.raises(ValueError, match="Scope test_scope not found"):
|
||||
CouchbaseFTSVectorSearchTool(**tool_config)
|
||||
|
||||
|
||||
def test_check_collection_exists_fail(tool_config):
|
||||
"""Test check for collection non-existence."""
|
||||
mock_cluster = tool_config['cluster']
|
||||
# Simulate collection not being in the scope's list
|
||||
mock_scope_spec = MagicMock()
|
||||
mock_scope_spec.name = "test_scope"
|
||||
mock_collection_spec = MagicMock()
|
||||
mock_collection_spec.name = "wrong_collection"
|
||||
mock_scope_spec.collections = [mock_collection_spec] # Only has wrong collection
|
||||
mock_cluster.bucket().collections().get_all_scopes.return_value = [mock_scope_spec]
|
||||
|
||||
with patch('crewai_tools.tools.couchbase_tool.couchbase_tool.COUCHBASE_AVAILABLE', True):
|
||||
with pytest.raises(ValueError, match="Collection test_collection not found"):
|
||||
CouchbaseFTSVectorSearchTool(**tool_config)
|
||||
|
||||
def test_check_index_exists_fail_scoped(tool_config):
|
||||
"""Test check for scoped index non-existence."""
|
||||
mock_cluster = tool_config['cluster']
|
||||
# Simulate index not being in the list returned by scope manager
|
||||
mock_cluster.bucket().scope().search_indexes().get_all_indexes.return_value = []
|
||||
|
||||
with patch('crewai_tools.tools.couchbase_tool.couchbase_tool.COUCHBASE_AVAILABLE', True):
|
||||
with pytest.raises(ValueError, match="Index test_index does not exist"):
|
||||
CouchbaseFTSVectorSearchTool(**tool_config)
|
||||
|
||||
|
||||
def test_check_index_exists_fail_global(tool_config):
|
||||
"""Test check for global index non-existence."""
|
||||
tool_config['scoped_index'] = False
|
||||
mock_cluster = tool_config['cluster']
|
||||
# Simulate index not being in the list returned by cluster manager
|
||||
mock_cluster.search_indexes().get_all_indexes.return_value = []
|
||||
|
||||
with patch('crewai_tools.tools.couchbase_tool.couchbase_tool.COUCHBASE_AVAILABLE', True):
|
||||
with pytest.raises(ValueError, match="Index test_index does not exist"):
|
||||
CouchbaseFTSVectorSearchTool(**tool_config)
|
||||
Reference in New Issue
Block a user