feat: Adding SingleStoreSearchTool (#349)

* initial commit

* Add actual SinglesStore Search Tool implementation

* add the implementation

* update readme

* add tool's description

* add tests

* fix tests

* review comments

* remove schema from public exports

* fix test failure

* revert tools.specs.json

* added dependencies and env vars descriptions
This commit is contained in:
Volodymyr Tkachuk
2025-08-07 16:51:37 +03:00
committed by GitHub
parent 4daf18256d
commit d00c9764fc
6 changed files with 1084 additions and 10 deletions

View File

@@ -42,10 +42,10 @@ from .tools import (
MultiOnTool,
MySQLSearchTool,
NL2SQLTool,
OxylabsUniversalScraperTool,
OxylabsGoogleSearchScraperTool,
OxylabsAmazonProductScraperTool,
OxylabsAmazonSearchScraperTool,
OxylabsGoogleSearchScraperTool,
OxylabsUniversalScraperTool,
PatronusEvalTool,
PatronusLocalEvaluatorTool,
PatronusPredefinedCriteriaEvalTool,
@@ -68,6 +68,7 @@ from .tools import (
SerplyScholarSearchTool,
SerplyWebpageToMarkdownTool,
SerplyWebSearchTool,
SingleStoreSearchTool,
SnowflakeConfig,
SnowflakeSearchTool,
SpiderTool,

View File

@@ -39,18 +39,18 @@ from .mongodb_vector_search_tool import (
from .multion_tool.multion_tool import MultiOnTool
from .mysql_search_tool.mysql_search_tool import MySQLSearchTool
from .nl2sql.nl2sql_tool import NL2SQLTool
from .oxylabs_universal_scraper_tool.oxylabs_universal_scraper_tool import (
OxylabsUniversalScraperTool,
)
from .oxylabs_google_search_scraper_tool.oxylabs_google_search_scraper_tool import (
OxylabsGoogleSearchScraperTool,
)
from .oxylabs_amazon_product_scraper_tool.oxylabs_amazon_product_scraper_tool import (
OxylabsAmazonProductScraperTool,
)
from .oxylabs_amazon_search_scraper_tool.oxylabs_amazon_search_scraper_tool import (
OxylabsAmazonSearchScraperTool,
)
from .oxylabs_google_search_scraper_tool.oxylabs_google_search_scraper_tool import (
OxylabsGoogleSearchScraperTool,
)
from .oxylabs_universal_scraper_tool.oxylabs_universal_scraper_tool import (
OxylabsUniversalScraperTool,
)
from .patronus_eval_tool import (
PatronusEvalTool,
PatronusLocalEvaluatorTool,
@@ -75,12 +75,15 @@ from .selenium_scraping_tool.selenium_scraping_tool import SeleniumScrapingTool
from .serpapi_tool.serpapi_google_search_tool import SerpApiGoogleSearchTool
from .serpapi_tool.serpapi_google_shopping_tool import SerpApiGoogleShoppingTool
from .serper_dev_tool.serper_dev_tool import SerperDevTool
from .serper_scrape_website_tool.serper_scrape_website_tool import SerperScrapeWebsiteTool
from .serper_scrape_website_tool.serper_scrape_website_tool import (
SerperScrapeWebsiteTool,
)
from .serply_api_tool.serply_job_search_tool import SerplyJobSearchTool
from .serply_api_tool.serply_news_search_tool import SerplyNewsSearchTool
from .serply_api_tool.serply_scholar_search_tool import SerplyScholarSearchTool
from .serply_api_tool.serply_web_search_tool import SerplyWebSearchTool
from .serply_api_tool.serply_webpage_to_markdown_tool import SerplyWebpageToMarkdownTool
from .singlestore_search_tool import SingleStoreSearchTool
from .snowflake_search_tool import (
SnowflakeConfig,
SnowflakeSearchTool,
@@ -88,9 +91,9 @@ from .snowflake_search_tool import (
)
from .spider_tool.spider_tool import SpiderTool
from .stagehand_tool.stagehand_tool import StagehandTool
from .txt_search_tool.txt_search_tool import TXTSearchTool
from .tavily_extractor_tool.tavily_extractor_tool import TavilyExtractorTool
from .tavily_search_tool.tavily_search_tool import TavilySearchTool
from .txt_search_tool.txt_search_tool import TXTSearchTool
from .vision_tool.vision_tool import VisionTool
from .weaviate_tool.vector_search import WeaviateVectorSearchTool
from .website_search.website_search_tool import WebsiteSearchTool

View File

@@ -0,0 +1,299 @@
# SingleStoreSearchTool
## Description
The SingleStoreSearchTool is designed to facilitate semantic searches and SQL queries within SingleStore database tables. This tool provides a secure interface for executing SELECT and SHOW queries against SingleStore databases, with built-in connection pooling for optimal performance. It supports various connection methods and allows you to work with specific table subsets within your database.
## Installation
To install the `crewai_tools` package with SingleStore support, execute the following command:
```shell
pip install 'crewai[tools]'
```
Or install with the SingleStore extra for the latest dependencies:
```shell
uv sync --extra singlestore
```
Or install the required dependencies manually:
```shell
pip install singlestoredb>=1.12.4 SQLAlchemy>=2.0.40
```
## Features
- 🔒 **Secure Query Execution**: Only SELECT and SHOW queries are allowed for security
- 🚀 **Connection Pooling**: Built-in connection pooling for optimal performance
- 📊 **Table Subset Support**: Work with specific tables or all tables in the database
- 🔧 **Flexible Configuration**: Multiple connection methods supported
- 🛡️ **SSL/TLS Support**: Comprehensive SSL configuration options
-**Efficient Resource Management**: Automatic connection lifecycle management
## Basic Usage
### Simple Connection
```python
from crewai_tools import SingleStoreSearchTool
# Basic connection using host/user/password
tool = SingleStoreSearchTool(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
port=3306
)
# Execute a search query
result = tool._run("SELECT * FROM employees WHERE department = 'Engineering' LIMIT 10")
print(result)
```
### Working with Specific Tables
```python
# Initialize tool for specific tables only
tool = SingleStoreSearchTool(
tables=['employees', 'departments'], # Only work with these tables
host='your_host',
user='your_username',
password='your_password',
database='your_database'
)
```
## Complete CrewAI Integration Example
Here's a complete example showing how to use the SingleStoreSearchTool with CrewAI agents and tasks:
```python
from crewai import Agent, Task, Crew
from crewai_tools import SingleStoreSearchTool
# Initialize the SingleStore search tool
singlestore_tool = SingleStoreSearchTool(
tables=["products", "sales", "customers"], # Specify the tables you want to search
host="localhost",
port=3306,
user="root",
password="pass",
database="crewai",
)
# Create an agent that uses this tool
data_analyst = Agent(
role="Business Analyst",
goal="Analyze and answer business questions using SQL data",
backstory="Expert in interpreting business needs and transforming them into data queries.",
tools=[singlestore_tool],
verbose=True,
embedder={
"provider": "ollama",
"config": {
"model": "nomic-embed-text",
},
},
)
# Define a task
task = Task(
description="List the top 2 customers by total sales amount.",
agent=data_analyst,
expected_output="A ranked list of top 2 customers that have the highest total sales amount, including their names and total sales figures.",
)
# Run the crew
crew = Crew(tasks=[task], verbose=True)
result = crew.kickoff()
```
### Advanced CrewAI Example with Multiple Agents
```python
from crewai import Agent, Task, Crew
from crewai_tools import SingleStoreSearchTool
# Initialize the tool with connection URL
singlestore_tool = SingleStoreSearchTool(
host="user:password@localhost:3306/ecommerce_db",
tables=["orders", "products", "customers", "order_items"]
)
# Data Analyst Agent
data_analyst = Agent(
role="Senior Data Analyst",
goal="Extract insights from database queries and provide data-driven recommendations",
backstory="You are an experienced data analyst with expertise in SQL and business intelligence.",
tools=[singlestore_tool],
verbose=True
)
# Business Intelligence Agent
bi_specialist = Agent(
role="Business Intelligence Specialist",
goal="Transform data insights into actionable business recommendations",
backstory="You specialize in translating complex data analysis into clear business strategies.",
verbose=True
)
# Define multiple tasks
data_extraction_task = Task(
description="""
Analyze the sales data to find:
1. Top 5 best-selling products by quantity
2. Monthly sales trends for the last 6 months
3. Customer segments by purchase frequency
""",
agent=data_analyst,
expected_output="Detailed SQL query results with sales analysis including product rankings, trends, and customer segments."
)
insights_task = Task(
description="""
Based on the sales data analysis, provide business recommendations for:
1. Inventory management for top products
2. Marketing strategies for different customer segments
3. Sales forecasting insights
""",
agent=bi_specialist,
expected_output="Strategic business recommendations with actionable insights based on the data analysis.",
context=[data_extraction_task]
)
# Create and run the crew
analytics_crew = Crew(
agents=[data_analyst, bi_specialist],
tasks=[data_extraction_task, insights_task],
verbose=True
)
result = analytics_crew.kickoff()
```
## Connection Methods
SingleStore supports multiple connection methods. Choose the one that best fits your environment:
### 1. Standard Connection
```python
tool = SingleStoreSearchTool(
host='your_host',
user='your_username',
password='your_password',
database='your_database',
port=3306
)
```
### 2. Connection URL (Recommended)
You can use a complete connection URL in the `host` parameter for simplified configuration:
```python
# Using connection URL in host parameter
tool = SingleStoreSearchTool(
host='user:password@localhost:3306/database_name'
)
# Or for SingleStore Cloud
tool = SingleStoreSearchTool(
host='user:password@your_cloud_host:3333/database_name?ssl_disabled=false'
)
```
### 3. Environment Variable Configuration
Set the `SINGLESTOREDB_URL` environment variable and initialize the tool without any connection arguments:
```bash
# Set the environment variable
export SINGLESTOREDB_URL="singlestoredb://user:password@localhost:3306/database_name"
# Or for cloud connections
export SINGLESTOREDB_URL="singlestoredb://user:password@your_cloud_host:3333/database_name?ssl_disabled=false"
```
```python
# No connection arguments needed when using environment variable
tool = SingleStoreSearchTool()
# Or specify only table subset
tool = SingleStoreSearchTool(tables=['employees', 'departments'])
```
### 4. Connection with SSL
```python
tool = SingleStoreSearchTool(
host='your_host',
user='your_username',
password='your_password',
database='your_database',
ssl_ca='/path/to/ca-cert.pem',
ssl_cert='/path/to/client-cert.pem',
ssl_key='/path/to/client-key.pem'
)
```
### 5. Advanced Configuration
```python
tool = SingleStoreSearchTool(
host='your_host',
user='your_username',
password='your_password',
database='your_database',
# Connection pool settings
pool_size=10,
max_overflow=20,
timeout=60,
# Advanced options
charset='utf8mb4',
autocommit=True,
connect_timeout=30,
results_format='tuple',
# Custom connection attributes
conn_attrs={
'program_name': 'MyApp',
'custom_attr': 'value'
}
)
```
## Configuration Parameters
### Basic Connection Parameters
- `host`: Database host address or complete connection URL
- `user`: Database username
- `password`: Database password
- `port`: Database port (default: 3306)
- `database`: Database name
- `tables`: List of specific tables to work with (optional)
### Connection Pool Parameters
- `pool_size`: Maximum number of connections in the pool (default: 5)
- `max_overflow`: Maximum overflow connections beyond pool_size (default: 10)
- `timeout`: Connection timeout in seconds (default: 30)
### SSL/TLS Parameters
- `ssl_key`: Path to client private key file
- `ssl_cert`: Path to client certificate file
- `ssl_ca`: Path to certificate authority file
- `ssl_disabled`: Disable SSL (default: None)
- `ssl_verify_cert`: Verify server certificate
- `ssl_verify_identity`: Verify server identity
### Advanced Parameters
- `charset`: Character set for the connection
- `autocommit`: Enable autocommit mode
- `connect_timeout`: Connection timeout in seconds
- `results_format`: Format for query results ('tuple', 'dict', etc.)
- `vector_data_format`: Format for vector data ('binary', 'json')
- `parse_json`: Parse JSON columns automatically
For more detailed connection options and advanced configurations, refer to the [SingleStore Python SDK documentation](https://singlestoredb-python.labs.singlestore.com/getting-started.html).

View File

@@ -0,0 +1,6 @@
from .singlestore_search_tool import SingleStoreSearchTool, SingleStoreSearchToolSchema
__all__ = [
"SingleStoreSearchTool",
"SingleStoreSearchToolSchema",
]

View File

@@ -0,0 +1,429 @@
from typing import Any, Callable, Dict, List, Optional, Type
from crewai.tools import BaseTool, EnvVar
from pydantic import BaseModel, Field
try:
from singlestoredb import connect
from sqlalchemy.pool import QueuePool
SINGLSTORE_AVAILABLE = True
except ImportError:
SINGLSTORE_AVAILABLE = False
class SingleStoreSearchToolSchema(BaseModel):
"""Input schema for SingleStoreSearchTool.
This schema defines the expected input format for the search tool,
ensuring that only valid SELECT and SHOW queries are accepted.
"""
search_query: str = Field(
...,
description=(
"Mandatory semantic search query you want to use to search the database's content. "
"Only SELECT and SHOW queries are supported."
),
)
class SingleStoreSearchTool(BaseTool):
"""A tool for performing semantic searches on SingleStore database tables.
This tool provides a safe interface for executing SELECT and SHOW queries
against a SingleStore database with connection pooling for optimal performance.
"""
name: str = "Search a database's table(s) content"
description: str = (
"A tool that can be used to semantic search a query from a database."
)
args_schema: Type[BaseModel] = SingleStoreSearchToolSchema
package_dependencies: List[str] = ["singlestoredb", "SQLAlchemy"]
env_vars: List[EnvVar] = [
EnvVar(
name="SINGLESTOREDB_URL",
description="A comprehensive URL string that can encapsulate host, port,"
" username, password, and database information, often used in environments"
" like SingleStore notebooks or specific frameworks."
" For example: 'me:p455w0rd@s2-host.com/my_db'",
required=False,
default=None,
),
EnvVar(
name="SINGLESTOREDB_HOST",
description="Specifies the hostname, IP address, or URL of"
" the SingleStoreDB workspace or cluster",
required=False,
default=None,
),
EnvVar(
name="SINGLESTOREDB_PORT",
description="Defines the port number on which the"
" SingleStoreDB server is listening",
required=False,
default=None,
),
EnvVar(
name="SINGLESTOREDB_USER",
description="Specifies the database user name",
required=False,
default=None,
),
EnvVar(
name="SINGLESTOREDB_PASSWORD",
description="Specifies the database user password",
required=False,
default=None,
),
EnvVar(
name="SINGLESTOREDB_DATABASE",
description="Name of the database to connect to",
required=False,
default=None,
),
EnvVar(
name="SINGLESTOREDB_SSL_KEY",
description="File containing SSL key",
required=False,
default=None,
),
EnvVar(
name="SINGLESTOREDB_SSL_CERT",
description="File containing SSL certificate",
required=False,
default=None,
),
EnvVar(
name="SINGLESTOREDB_SSL_CA",
description="File containing SSL certificate authority",
required=False,
default=None,
),
EnvVar(
name="SINGLESTOREDB_CONNECT_TIMEOUT",
description="The timeout for connecting to the database in seconds",
required=False,
default=None,
),
]
connection_args: dict = {}
connection_pool: Optional[Any] = None
def __init__(
self,
tables: List[str] = [],
# Basic connection parameters
host: Optional[str] = None,
user: Optional[str] = None,
password: Optional[str] = None,
port: Optional[int] = None,
database: Optional[str] = None,
driver: Optional[str] = None,
# Connection behavior options
pure_python: Optional[bool] = None,
local_infile: Optional[bool] = None,
charset: Optional[str] = None,
# SSL/TLS configuration
ssl_key: Optional[str] = None,
ssl_cert: Optional[str] = None,
ssl_ca: Optional[str] = None,
ssl_disabled: Optional[bool] = None,
ssl_cipher: Optional[str] = None,
ssl_verify_cert: Optional[bool] = None,
tls_sni_servername: Optional[str] = None,
ssl_verify_identity: Optional[bool] = None,
# Advanced connection options
conv: Optional[Dict[int, Callable[..., Any]]] = None,
credential_type: Optional[str] = None,
autocommit: Optional[bool] = None,
# Result formatting options
results_type: Optional[str] = None,
buffered: Optional[bool] = None,
results_format: Optional[str] = None,
program_name: Optional[str] = None,
conn_attrs: Optional[Dict[str, str]] = {},
# Query execution options
multi_statements: Optional[bool] = None,
client_found_rows: Optional[bool] = None,
connect_timeout: Optional[int] = None,
# Data type handling
nan_as_null: Optional[bool] = None,
inf_as_null: Optional[bool] = None,
encoding_errors: Optional[str] = None,
track_env: Optional[bool] = None,
enable_extended_data_types: Optional[bool] = None,
vector_data_format: Optional[str] = None,
parse_json: Optional[bool] = None,
# Connection pool configuration
pool_size: Optional[int] = 5,
max_overflow: Optional[int] = 10,
timeout: Optional[float] = 30,
**kwargs,
):
"""Initialize the SingleStore search tool.
Args:
tables: List of table names to work with. If empty, all tables will be used.
host: Database host address
user: Database username
password: Database password
port: Database port number
database: Database name
pool_size: Maximum number of connections in the pool
max_overflow: Maximum overflow connections beyond pool_size
timeout: Connection timeout in seconds
**kwargs: Additional arguments passed to the parent class
"""
if not SINGLSTORE_AVAILABLE:
import click
if click.confirm(
"You are missing the 'singlestore' package. Would you like to install it?"
):
import subprocess
try:
subprocess.run(
["uv", "add", "crewai-tools[singlestore]"], check=True
)
except subprocess.CalledProcessError:
raise ImportError("Failed to install singlestore package")
else:
raise ImportError(
"`singlestore` package not found, please run `uv add crewai-tools[singlestore]`"
)
# Set the data type for the parent class
kwargs["data_type"] = "singlestore"
super().__init__(**kwargs)
# Build connection arguments dictionary with sensible defaults
self.connection_args = {
# Basic connection parameters
"host": host,
"user": user,
"password": password,
"port": port,
"database": database,
"driver": driver,
# Connection behavior
"pure_python": pure_python,
"local_infile": local_infile,
"charset": charset,
# SSL/TLS settings
"ssl_key": ssl_key,
"ssl_cert": ssl_cert,
"ssl_ca": ssl_ca,
"ssl_disabled": ssl_disabled,
"ssl_cipher": ssl_cipher,
"ssl_verify_cert": ssl_verify_cert,
"tls_sni_servername": tls_sni_servername,
"ssl_verify_identity": ssl_verify_identity,
# Advanced options
"conv": conv or {},
"credential_type": credential_type,
"autocommit": autocommit,
# Result formatting
"results_type": results_type,
"buffered": buffered,
"results_format": results_format,
"program_name": program_name,
"conn_attrs": conn_attrs or {},
# Query execution
"multi_statements": multi_statements,
"client_found_rows": client_found_rows,
"connect_timeout": connect_timeout or 10, # Default: 10 seconds
# Data type handling with defaults
"nan_as_null": nan_as_null or False,
"inf_as_null": inf_as_null or False,
"encoding_errors": encoding_errors or "strict",
"track_env": track_env or False,
"enable_extended_data_types": enable_extended_data_types or False,
"vector_data_format": vector_data_format or "binary",
"parse_json": parse_json or True,
}
# Ensure connection attributes are properly initialized
if "conn_attrs" not in self.connection_args or not self.connection_args.get(
"conn_attrs"
):
self.connection_args["conn_attrs"] = dict()
# Add tool identification to connection attributes
self.connection_args["conn_attrs"][
"_connector_name"
] = "crewAI SingleStore Tool"
self.connection_args["conn_attrs"]["_connector_version"] = "1.0"
# Initialize connection pool for efficient connection management
self.connection_pool = QueuePool(
creator=self._create_connection,
pool_size=pool_size,
max_overflow=max_overflow,
timeout=timeout,
)
# Validate database schema and initialize table information
self._initialize_tables(tables)
def _initialize_tables(self, tables: List[str]) -> None:
"""Initialize and validate the tables that this tool will work with.
Args:
tables: List of table names to validate and use
Raises:
ValueError: If no tables exist or specified tables don't exist
"""
conn = self._get_connection()
try:
with conn.cursor() as cursor:
# Get all existing tables in the database
cursor.execute("SHOW TABLES")
existing_tables = {table[0] for table in cursor.fetchall()}
# Validate that the database has tables
if not existing_tables or len(existing_tables) == 0:
raise ValueError(
"No tables found in the database. "
"Please ensure the database is initialized with the required tables."
)
# Use all tables if none specified
if not tables or len(tables) == 0:
tables = existing_tables
# Build table definitions for description
table_definitions = []
for table in tables:
if table not in existing_tables:
raise ValueError(
f"Table {table} does not exist in the database. "
f"Please ensure the table is created."
)
# Get column information for each table
cursor.execute(f"SHOW COLUMNS FROM {table}")
columns = cursor.fetchall()
column_info = ", ".join(f"{row[0]} {row[1]}" for row in columns)
table_definitions.append(f"{table}({column_info})")
finally:
# Ensure the connection is returned to the pool
conn.close()
# Update the tool description with actual table information
self.description = (
f"A tool that can be used to semantic search a query from a SingleStore "
f"database's {', '.join(table_definitions)} table(s) content."
)
self._generate_description()
def _get_connection(self) -> Optional[Any]:
"""Get a connection from the connection pool.
Returns:
Connection: A SingleStore database connection
Raises:
Exception: If connection cannot be established
"""
try:
conn = self.connection_pool.connect()
return conn
except Exception:
# Re-raise the exception to be handled by the caller
raise
def _create_connection(self) -> Optional[Any]:
"""Create a new SingleStore connection.
This method is used by the connection pool to create new connections
when needed.
Returns:
Connection: A new SingleStore database connection
Raises:
Exception: If connection cannot be created
"""
try:
conn = connect(**self.connection_args)
return conn
except Exception:
# Re-raise the exception to be handled by the caller
raise
def _validate_query(self, search_query: str) -> tuple[bool, str]:
"""Validate the search query to ensure it's safe to execute.
Only SELECT and SHOW statements are allowed for security reasons.
Args:
search_query: The SQL query to validate
Returns:
tuple: (is_valid: bool, message: str)
"""
# Check if the input is a string
if not isinstance(search_query, str):
return False, "Search query must be a string."
# Remove leading/trailing whitespace and convert to lowercase for checking
query_lower = search_query.strip().lower()
# Allow only SELECT and SHOW statements
if not (query_lower.startswith("select") or query_lower.startswith("show")):
return (
False,
"Only SELECT and SHOW queries are supported for security reasons.",
)
return True, "Valid query"
def _run(self, search_query: str) -> Any:
"""Execute the search query against the SingleStore database.
Args:
search_query: The SQL query to execute
**kwargs: Additional keyword arguments (unused)
Returns:
str: Formatted search results or error message
"""
# Validate the query before execution
valid, message = self._validate_query(search_query)
if not valid:
return f"Invalid search query: {message}"
# Execute the query using a connection from the pool
conn = self._get_connection()
try:
with conn.cursor() as cursor:
try:
# Execute the validated search query
cursor.execute(search_query)
results = cursor.fetchall()
# Handle empty results
if not results:
return "No results found."
# Format the results for readable output
formatted_results = "\n".join(
[", ".join([str(item) for item in row]) for row in results]
)
return f"Search Results:\n{formatted_results}"
except Exception as e:
return f"Error executing search query: {e}"
finally:
# Ensure the connection is returned to the pool
conn.close()

View File

@@ -0,0 +1,336 @@
import os
from typing import Generator
import pytest
from singlestoredb import connect
from singlestoredb.server import docker
from crewai_tools import SingleStoreSearchTool
from crewai_tools.tools.singlestore_search_tool import SingleStoreSearchToolSchema
@pytest.fixture(scope="session")
def docker_server_url() -> Generator[str, None, None]:
"""Start a SingleStore Docker server for tests."""
try:
sdb = docker.start(license="")
conn = sdb.connect()
curr = conn.cursor()
curr.execute("CREATE DATABASE test_crewai")
curr.close()
conn.close()
yield sdb.connection_url
sdb.stop()
except Exception as e:
pytest.skip(f"Could not start SingleStore Docker container: {e}")
@pytest.fixture(scope="function")
def clean_db_url(docker_server_url) -> Generator[str, None, None]:
"""Provide a clean database URL and clean up tables after test."""
yield docker_server_url
try:
conn = connect(host=docker_server_url, database="test_crewai")
curr = conn.cursor()
curr.execute("SHOW TABLES")
results = curr.fetchall()
for result in results:
curr.execute(f"DROP TABLE {result[0]}")
curr.close()
conn.close()
except Exception:
# Ignore cleanup errors
pass
@pytest.fixture
def sample_table_setup(clean_db_url):
"""Set up sample tables for testing."""
conn = connect(host=clean_db_url, database="test_crewai")
curr = conn.cursor()
# Create sample tables
curr.execute(
"""
CREATE TABLE employees (
id INT PRIMARY KEY,
name VARCHAR(100),
department VARCHAR(50),
salary DECIMAL(10,2)
)
"""
)
curr.execute(
"""
CREATE TABLE departments (
id INT PRIMARY KEY,
name VARCHAR(100),
budget DECIMAL(12,2)
)
"""
)
# Insert sample data
curr.execute(
"""
INSERT INTO employees VALUES
(1, 'Alice Smith', 'Engineering', 75000.00),
(2, 'Bob Johnson', 'Marketing', 65000.00),
(3, 'Carol Davis', 'Engineering', 80000.00)
"""
)
curr.execute(
"""
INSERT INTO departments VALUES
(1, 'Engineering', 500000.00),
(2, 'Marketing', 300000.00)
"""
)
curr.close()
conn.close()
return clean_db_url
class TestSingleStoreSearchTool:
"""Test suite for SingleStoreSearchTool."""
def test_tool_creation_with_connection_params(self, sample_table_setup):
"""Test tool creation with individual connection parameters."""
# Parse URL components for individual parameters
url_parts = sample_table_setup.split("@")[1].split(":")
host = url_parts[0]
port = int(url_parts[1].split("/")[0])
user = "root"
password = sample_table_setup.split("@")[0].split(":")[2]
tool = SingleStoreSearchTool(
tables=[],
host=host,
port=port,
user=user,
password=password,
database="test_crewai",
)
assert tool.name == "Search a database's table(s) content"
assert "SingleStore" in tool.description
assert (
"employees(id int(11), name varchar(100), department varchar(50), salary decimal(10,2))"
in tool.description.lower()
)
assert (
"departments(id int(11), name varchar(100), budget decimal(12,2))"
in tool.description.lower()
)
assert tool.args_schema == SingleStoreSearchToolSchema
assert tool.connection_pool is not None
def test_tool_creation_with_connection_url(self, sample_table_setup):
"""Test tool creation with connection URL."""
tool = SingleStoreSearchTool(host=f"{sample_table_setup}/test_crewai")
assert tool.name == "Search a database's table(s) content"
assert tool.connection_pool is not None
def test_tool_creation_with_specific_tables(self, sample_table_setup):
"""Test tool creation with specific table list."""
tool = SingleStoreSearchTool(
tables=["employees"],
host=sample_table_setup,
database="test_crewai",
)
# Check that description includes specific tables
assert "employees" in tool.description
assert "departments" not in tool.description
def test_tool_creation_with_nonexistent_table(self, sample_table_setup):
"""Test tool creation fails with non-existent table."""
with pytest.raises(ValueError, match="Table nonexistent does not exist"):
SingleStoreSearchTool(
tables=["employees", "nonexistent"],
host=sample_table_setup,
database="test_crewai",
)
def test_tool_creation_with_empty_database(self, clean_db_url):
"""Test tool creation fails with empty database."""
with pytest.raises(ValueError, match="No tables found in the database"):
SingleStoreSearchTool(host=clean_db_url, database="test_crewai")
def test_description_generation(self, sample_table_setup):
"""Test that tool description is properly generated with table info."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
# Check description contains table definitions
assert "employees(" in tool.description
assert "departments(" in tool.description
assert "id int" in tool.description.lower()
assert "name varchar" in tool.description.lower()
def test_query_validation_select_allowed(self, sample_table_setup):
"""Test that SELECT queries are allowed."""
os.environ["SINGLESTOREDB_URL"] = sample_table_setup
tool = SingleStoreSearchTool(database="test_crewai")
valid, message = tool._validate_query("SELECT * FROM employees")
assert valid is True
assert message == "Valid query"
def test_query_validation_show_allowed(self, sample_table_setup):
"""Test that SHOW queries are allowed."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
valid, message = tool._validate_query("SHOW TABLES")
assert valid is True
assert message == "Valid query"
def test_query_validation_case_insensitive(self, sample_table_setup):
"""Test that query validation is case insensitive."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
valid, _ = tool._validate_query("select * from employees")
assert valid is True
valid, _ = tool._validate_query("SHOW tables")
assert valid is True
def test_query_validation_insert_denied(self, sample_table_setup):
"""Test that INSERT queries are denied."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
valid, message = tool._validate_query(
"INSERT INTO employees VALUES (4, 'Test', 'Test', 1000)"
)
assert valid is False
assert "Only SELECT and SHOW queries are supported" in message
def test_query_validation_update_denied(self, sample_table_setup):
"""Test that UPDATE queries are denied."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
valid, message = tool._validate_query("UPDATE employees SET salary = 90000")
assert valid is False
assert "Only SELECT and SHOW queries are supported" in message
def test_query_validation_delete_denied(self, sample_table_setup):
"""Test that DELETE queries are denied."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
valid, message = tool._validate_query("DELETE FROM employees WHERE id = 1")
assert valid is False
assert "Only SELECT and SHOW queries are supported" in message
def test_query_validation_non_string(self, sample_table_setup):
"""Test that non-string queries are rejected."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
valid, message = tool._validate_query(123)
assert valid is False
assert "Search query must be a string" in message
def test_run_select_query(self, sample_table_setup):
"""Test executing a SELECT query."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
result = tool._run("SELECT * FROM employees ORDER BY id")
assert "Search Results:" in result
assert "Alice Smith" in result
assert "Bob Johnson" in result
assert "Carol Davis" in result
def test_run_filtered_query(self, sample_table_setup):
"""Test executing a filtered SELECT query."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
result = tool._run(
"SELECT name FROM employees WHERE department = 'Engineering'"
)
assert "Search Results:" in result
assert "Alice Smith" in result
assert "Carol Davis" in result
assert "Bob Johnson" not in result
def test_run_show_query(self, sample_table_setup):
"""Test executing a SHOW query."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
result = tool._run("SHOW TABLES")
assert "Search Results:" in result
assert "employees" in result
assert "departments" in result
def test_run_empty_result(self, sample_table_setup):
"""Test executing a query that returns no results."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
result = tool._run("SELECT * FROM employees WHERE department = 'NonExistent'")
assert result == "No results found."
def test_run_invalid_query_syntax(self, sample_table_setup):
"""Test executing a query with invalid syntax."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
result = tool._run("SELECT * FORM employees") # Intentional typo
assert "Error executing search query:" in result
def test_run_denied_query(self, sample_table_setup):
"""Test that denied queries return appropriate error message."""
tool = SingleStoreSearchTool(host=sample_table_setup, database="test_crewai")
result = tool._run("DELETE FROM employees")
assert "Invalid search query:" in result
assert "Only SELECT and SHOW queries are supported" in result
def test_connection_pool_usage(self, sample_table_setup):
"""Test that connection pooling works correctly."""
tool = SingleStoreSearchTool(
host=sample_table_setup,
database="test_crewai",
pool_size=2,
)
# Execute multiple queries to test pool usage
results = []
for _ in range(5):
result = tool._run("SELECT COUNT(*) FROM employees")
results.append(result)
# All queries should succeed
for result in results:
assert "Search Results:" in result
assert "3" in result # Count of employees
def test_tool_schema_validation(self):
"""Test that the tool schema validation works correctly."""
# Valid input
valid_input = SingleStoreSearchToolSchema(search_query="SELECT * FROM test")
assert valid_input.search_query == "SELECT * FROM test"
# Test that description is present
schema_dict = SingleStoreSearchToolSchema.model_json_schema()
assert "search_query" in schema_dict["properties"]
assert "description" in schema_dict["properties"]["search_query"]
def test_connection_error_handling(self):
"""Test handling of connection errors."""
with pytest.raises(Exception):
# This should fail due to invalid connection parameters
SingleStoreSearchTool(
host="invalid_host",
port=9999,
user="invalid_user",
password="invalid_password",
database="invalid_db",
)