feat: Add modular contextual AI tools with async functionality (#431)

* Add contextual AI tools with async support

* Fix package version issues and update README

* Rename contextual tools to contextualai and update contents

* Update tools init for contextualai tools

* feat: Resolved no module found error for nest_asyncio

* Updated nest_asyncio import

---------

Co-authored-by: QJ <qj@QJs-MacBook-Pro.local>
Co-authored-by: Qile-Jiang <qile.jiang@contextual.ai>
This commit is contained in:
Jinash Rouniyar
2025-08-28 08:56:53 -04:00
committed by GitHub
parent 1f581fa9ac
commit cb84d2ddfa
10 changed files with 590 additions and 0 deletions

View File

@@ -19,6 +19,10 @@ from .tools import (
CodeDocsSearchTool,
CodeInterpreterTool,
ComposioTool,
ContextualAIQueryTool,
ContextualAICreateAgentTool,
ContextualAIParseTool,
ContextualAIRerankTool,
CouchbaseFTSVectorSearchTool,
CrewaiEnterpriseTools,
CSVSearchTool,

View File

@@ -6,6 +6,10 @@ from .browserbase_load_tool.browserbase_load_tool import BrowserbaseLoadTool
from .code_docs_search_tool.code_docs_search_tool import CodeDocsSearchTool
from .code_interpreter_tool.code_interpreter_tool import CodeInterpreterTool
from .composio_tool.composio_tool import ComposioTool
from .contextualai_query_tool.contextual_query_tool import ContextualAIQueryTool
from .contextualai_create_agent_tool.contextual_create_agent_tool import ContextualAICreateAgentTool
from .contextualai_parse_tool.contextual_parse_tool import ContextualAIParseTool
from .contextualai_rerank_tool.contextual_rerank_tool import ContextualAIRerankTool
from .couchbase_tool.couchbase_tool import CouchbaseFTSVectorSearchTool
from .crewai_enterprise_tools.crewai_enterprise_tools import CrewaiEnterpriseTools
from .csv_search_tool.csv_search_tool import CSVSearchTool

View File

@@ -0,0 +1,58 @@
# ContextualAICreateAgentTool
## Description
This tool is designed to integrate Contextual AI's enterprise-grade RAG agents with CrewAI. This tool enables you to create a new Contextual RAG agent. It uploads your documents to create a datastore and returns the Contextual agent ID and datastore ID.
## Installation
To incorporate this tool into your project, follow the installation instructions below:
```
pip install 'crewai[tools]' contextual-client
```
**Note**: You'll need a Contextual AI API key. Sign up at [app.contextual.ai](https://app.contextual.ai) to get your free API key.
## Example
```python
from crewai_tools import ContextualAICreateAgentTool
# Initialize the tool
tool = ContextualAICreateAgentTool(api_key="your_api_key_here")
# Create agent with documents
result = tool._run(
agent_name="Financial Analysis Agent",
agent_description="Agent for analyzing financial documents",
datastore_name="Financial Reports",
document_paths=["/path/to/report1.pdf", "/path/to/report2.pdf"],
)
print(result)
```
## Parameters
- `api_key`: Your Contextual AI API key
- `agent_name`: Name for the new agent
- `agent_description`: Description of the agent's purpose
- `datastore_name`: Name for the document datastore
- `document_paths`: List of file paths to upload
Example result:
```
Successfully created agent 'Research Analyst' with ID: {created_agent_ID} and datastore ID: {created_datastore_ID}. Uploaded 5 documents.
```
You can use `ContextualAIQueryTool` with the returned IDs to query the knowledge base and retrieve relevant information from your documents.
## Key Features
- **Complete Pipeline Setup**: Creates datastore, uploads documents, and configures agent in one operation
- **Document Processing**: Leverages Contextual AI's powerful parser to ingest complex PDFs and documents
- **Vector Storage**: Use Contextual AI's datastore for large document collections
## Use Cases
- Set up new RAG agents from scratch with complete automation
- Upload and organize document collections into structured datastores
- Create specialized domain agents for legal, financial, technical, or research workflows
For more detailed information about Contextual AI's capabilities, visit the [official documentation](https://docs.contextual.ai).

View File

@@ -0,0 +1,71 @@
from typing import Any, Optional, Type, List
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import os
class ContextualAICreateAgentSchema(BaseModel):
"""Schema for contextual create agent tool."""
agent_name: str = Field(..., description="Name for the new agent")
agent_description: str = Field(..., description="Description for the new agent")
datastore_name: str = Field(..., description="Name for the new datastore")
document_paths: List[str] = Field(..., description="List of file paths to upload")
class ContextualAICreateAgentTool(BaseTool):
"""Tool to create Contextual AI RAG agents with documents."""
name: str = "Contextual AI Create Agent Tool"
description: str = "Create a new Contextual AI RAG agent with documents and datastore"
args_schema: Type[BaseModel] = ContextualAICreateAgentSchema
api_key: str
contextual_client: Any = None
package_dependencies: List[str] = ["contextual-client"]
def __init__(self, **kwargs):
super().__init__(**kwargs)
try:
from contextual import ContextualAI
self.contextual_client = ContextualAI(api_key=self.api_key)
except ImportError:
raise ImportError(
"contextual-client package is required. Install it with: pip install contextual-client"
)
def _run(
self,
agent_name: str,
agent_description: str,
datastore_name: str,
document_paths: List[str]
) -> str:
"""Create a complete RAG pipeline with documents."""
try:
import os
# Create datastore
datastore = self.contextual_client.datastores.create(name=datastore_name)
datastore_id = datastore.id
# Upload documents
document_ids = []
for doc_path in document_paths:
if not os.path.exists(doc_path):
raise FileNotFoundError(f"Document not found: {doc_path}")
with open(doc_path, 'rb') as f:
ingestion_result = self.contextual_client.datastores.documents.ingest(datastore_id, file=f)
document_ids.append(ingestion_result.id)
# Create agent
agent = self.contextual_client.agents.create(
name=agent_name,
description=agent_description,
datastore_ids=[datastore_id]
)
return f"Successfully created agent '{agent_name}' with ID: {agent.id} and datastore ID: {datastore_id}. Uploaded {len(document_ids)} documents."
except Exception as e:
return f"Failed to create agent with documents: {str(e)}"

View File

@@ -0,0 +1,68 @@
# ContextualAIParseTool
## Description
This tool is designed to integrate Contextual AI's enterprise-grade document parsing capabilities with CrewAI, enabling you to leverage advanced AI-powered document understanding for complex layouts, tables, and figures. Use this tool to extract structured content from your documents using Contextual AI's powerful document parser.
## Installation
To incorporate this tool into your project, follow the installation instructions below:
```
pip install 'crewai[tools]' contextual-client
```
**Note**: You'll need a Contextual AI API key. Sign up at [app.contextual.ai](https://app.contextual.ai) to get your free API key.
## Example
```python
from crewai_tools import ContextualAIParseTool
tool = ContextualAIParseTool(api_key="your_api_key_here")
result = tool._run(
file_path="/path/to/document.pdf",
parse_mode="standard",
page_range="0-5",
output_types=["markdown-per-page"]
)
print(result)
```
The result will show the parsed contents of your document. For example:
```
{
"file_name": "attention_is_all_you_need.pdf",
"status": "completed",
"pages": [
{
"index": 0,
"markdown": "Provided proper attribution ...
},
{
"index": 1,
"markdown": "## 1 Introduction ...
},
...
]
}
```
## Parameters
- `api_key`: Your Contextual AI API key
- `file_path`: Path to document to parse
- `parse_mode`: Parsing mode (default: "standard")
- `figure_caption_mode`: Figure caption handling (default: "concise")
- `enable_document_hierarchy`: Enable hierarchy detection (default: True)
- `page_range`: Pages to parse (e.g., "0-5", None for all)
- `output_types`: Output formats (default: ["markdown-per-page"])
## Key Features
- **Advanced Document Understanding**: Handles complex PDF layouts, tables, and multi-column documents
- **Figure and Table Extraction**: Intelligent extraction of figures, charts, and tabular data
- **Page Range Selection**: Parse specific pages or entire documents
## Use Cases
- Extract structured content from complex PDFs and research papers
- Parse financial reports, legal documents, and technical manuals
- Convert documents to markdown for further processing in RAG pipelines
For more detailed information about Contextual AI's capabilities, visit the [official documentation](https://docs.contextual.ai).

View File

@@ -0,0 +1,92 @@
from typing import Any, Optional, Type, List
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
class ContextualAIParseSchema(BaseModel):
"""Schema for contextual parse tool."""
file_path: str = Field(..., description="Path to the document to parse")
parse_mode: str = Field(default="standard", description="Parsing mode")
figure_caption_mode: str = Field(default="concise", description="Figure caption mode")
enable_document_hierarchy: bool = Field(default=True, description="Enable document hierarchy")
page_range: Optional[str] = Field(default=None, description="Page range to parse (e.g., '0-5')")
output_types: List[str] = Field(default=["markdown-per-page"], description="List of output types")
class ContextualAIParseTool(BaseTool):
"""Tool to parse documents using Contextual AI's parser."""
name: str = "Contextual AI Document Parser"
description: str = "Parse documents using Contextual AI's advanced document parser"
args_schema: Type[BaseModel] = ContextualAIParseSchema
api_key: str
package_dependencies: List[str] = ["contextual-client"]
def _run(
self,
file_path: str,
parse_mode: str = "standard",
figure_caption_mode: str = "concise",
enable_document_hierarchy: bool = True,
page_range: Optional[str] = None,
output_types: List[str] = ["markdown-per-page"]
) -> str:
"""Parse a document using Contextual AI's parser."""
try:
import requests
import json
import os
from time import sleep
if not os.path.exists(file_path):
raise FileNotFoundError(f"Document not found: {file_path}")
base_url = "https://api.contextual.ai/v1"
headers = {
"accept": "application/json",
"authorization": f"Bearer {self.api_key}"
}
# Submit parse job
url = f"{base_url}/parse"
config = {
"parse_mode": parse_mode,
"figure_caption_mode": figure_caption_mode,
"enable_document_hierarchy": enable_document_hierarchy,
}
if page_range:
config["page_range"] = page_range
with open(file_path, "rb") as fp:
file = {"raw_file": fp}
result = requests.post(url, headers=headers, data=config, files=file)
response = json.loads(result.text)
job_id = response['job_id']
# Monitor job status
status_url = f"{base_url}/parse/jobs/{job_id}/status"
while True:
result = requests.get(status_url, headers=headers)
parse_response = json.loads(result.text)['status']
if parse_response == "completed":
break
elif parse_response == "failed":
raise RuntimeError("Document parsing failed")
sleep(5)
# Get parse results
results_url = f"{base_url}/parse/jobs/{job_id}/results"
result = requests.get(
results_url,
headers=headers,
params={"output_types": ",".join(output_types)},
)
return json.dumps(json.loads(result.text), indent=2)
except Exception as e:
return f"Failed to parse document: {str(e)}"

View File

@@ -0,0 +1,54 @@
# ContextualAIQueryTool
## Description
This tool is designed to integrate Contextual AI's enterprise-grade RAG agents with CrewAI. Run this tool to query existing Contextual AI RAG agents that have been pre-configured with documents and knowledge bases.
## Installation
To incorporate this tool into your project, follow the installation instructions below:
```shell
pip install 'crewai[tools]' contextual-client
```
**Note**: You'll need a Contextual AI API key. Sign up at [app.contextual.ai](https://app.contextual.ai) to get your free API key.
## Example
Make sure you have already created a Contextual agent and ingested documents into the datastore before using this tool.
```python
from crewai_tools import ContextualAIQueryTool
# Initialize the tool
tool = ContextualAIQueryTool(api_key="your_api_key_here")
# Query the agent with IDs
result = tool._run(
query="What are the key findings in the financial report?",
agent_id="your_agent_id_here",
datastore_id="your_datastore_id_here" # Optional: for document readiness checking
)
print(result)
```
The result will contain the generated answer to the user's query.
## Parameters
**Initialization:**
- `api_key`: Your Contextual AI API key
**Query (_run method):**
- `query`: The question or query to send to the agent
- `agent_id`: ID of the existing Contextual AI agent to query (required)
- `datastore_id`: Optional datastore ID for document readiness verification (if not provided, document status checking is disabled with a warning)
## Key Features
- **Document Readiness Checking**: Automatically waits for documents to be processed before querying
- **Grounded Responses**: Built-in grounding ensures factual, source-attributed answers
## Use Cases
- Query pre-configured RAG agents with document collections
- Access enterprise knowledge bases through user queries
- Build specialized domain experts with access to curated documents
For more detailed information about Contextual AI's capabilities, visit the [official documentation](https://docs.contextual.ai).

View File

@@ -0,0 +1,99 @@
from typing import Any, Optional, Type, List
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import asyncio
import requests
import os
class ContextualAIQuerySchema(BaseModel):
"""Schema for contextual query tool."""
query: str = Field(..., description="Query to send to the Contextual AI agent.")
agent_id: str = Field(..., description="ID of the Contextual AI agent to query")
datastore_id: Optional[str] = Field(None, description="Optional datastore ID for document readiness verification")
class ContextualAIQueryTool(BaseTool):
"""Tool to query Contextual AI RAG agents."""
name: str = "Contextual AI Query Tool"
description: str = "Use this tool to query a Contextual AI RAG agent with access to your documents"
args_schema: Type[BaseModel] = ContextualAIQuerySchema
api_key: str
contextual_client: Any = None
package_dependencies: List[str] = ["contextual-client"]
def __init__(self, **kwargs):
super().__init__(**kwargs)
try:
from contextual import ContextualAI
self.contextual_client = ContextualAI(api_key=self.api_key)
except ImportError:
raise ImportError(
"contextual-client package is required. Install it with: pip install contextual-client"
)
def _check_documents_ready(self, datastore_id: str) -> bool:
"""Synchronous check if all documents are ready."""
url = f"https://api.contextual.ai/v1/datastores/{datastore_id}/documents"
headers = {"Authorization": f"Bearer {self.api_key}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
documents = data.get('documents', [])
return not any(doc.get('status') in ('processing', 'pending') for doc in documents)
return True
async def _wait_for_documents_async(self, datastore_id: str, max_attempts: int = 20, interval: float = 30.0) -> bool:
"""Asynchronously poll until documents are ready, exiting early if possible."""
for attempt in range(max_attempts):
ready = await asyncio.to_thread(self._check_documents_ready, datastore_id)
if ready:
return True
await asyncio.sleep(interval)
print("Processing documents ...")
return True # give up but don't fail hard
def _run(self, query: str, agent_id: str, datastore_id: Optional[str] = None) -> str:
if not agent_id:
raise ValueError("Agent ID is required to query the Contextual AI agent")
if datastore_id:
ready = self._check_documents_ready(datastore_id)
if not ready:
try:
# If no running event loop, use asyncio.run
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
# Already inside an event loop
try:
import nest_asyncio
nest_asyncio.apply(loop)
loop.run_until_complete(self._wait_for_documents_async(datastore_id))
except Exception as e:
print(f"Failed to apply nest_asyncio: {str(e)}")
else:
asyncio.run(self._wait_for_documents_async(datastore_id))
else:
print("Warning: No datastore_id provided. Document status checking disabled.")
try:
response = self.contextual_client.agents.query.create(
agent_id=agent_id,
messages=[{"role": "user", "content": query}]
)
if hasattr(response, 'content'):
return response.content
elif hasattr(response, 'message'):
return response.message.content if hasattr(response.message, 'content') else str(response.message)
elif hasattr(response, 'messages') and len(response.messages) > 0:
last_message = response.messages[-1]
return last_message.content if hasattr(last_message, 'content') else str(last_message)
else:
return str(response)
except Exception as e:
return f"Error querying Contextual AI agent: {str(e)}"

View File

@@ -0,0 +1,72 @@
# ContextualAIRerankTool
## Description
This tool is designed to integrate Contextual AI's enterprise-grade instruction-following reranker with CrewAI, enabling you to intelligently reorder documents based on relevance and custom criteria. Use this tool to enhance search result quality and document retrieval for RAG systems using Contextual AI's reranking models that understand context and follow specific instructions for optimal document ordering.
## Installation
To incorporate this tool into your project, follow the installation instructions below:
```shell
pip install 'crewai[tools]' contextual-client
```
**Note**: You'll need a Contextual AI API key. Sign up at [app.contextual.ai](https://app.contextual.ai) to get your free API key.
## Example
```python
from crewai_tools import ContextualAIRerankTool
tool = ContextualAIRerankTool(api_key="your_api_key_here")
result = tool._run(
query="financial performance and revenue metrics",
documents=[
"Q1 report content with revenue data",
"Q2 report content with growth metrics",
"News article about market trends"
],
instruction="Prioritize documents with specific financial metrics and quantitative data"
)
print(result)
```
The result will contain the document ranking. For example:
```
Rerank Result:
{
"results": [
{
"index": 1,
"relevance_score": 0.88227631
},
{
"index": 0,
"relevance_score": 0.61159354
},
{
"index": 2,
"relevance_score": 0.28579462
}
]
}
```
## Parameters
- `api_key`: Your Contextual AI API key
- `query`: Search query for reranking
- `documents`: List of document texts to rerank
- `instruction`: Optional reranking instruction for custom criteria
- `metadata`: Optional metadata for each document
- `model`: Reranker model (default: "ctxl-rerank-en-v1-instruct")
## Key Features
- **Instruction-Following Reranking**: Follows custom instructions for domain-specific document ordering
- **Metadata Integration**: Incorporates document metadata for enhanced ranking decisions
## Use Cases
- Improve search result relevance in document collections
- Reorder documents by custom business criteria (recency, authority, relevance)
- Filter and prioritize documents for research and analysis workflows
For more detailed information about Contextual AI's capabilities, visit the [official documentation](https://docs.contextual.ai).

View File

@@ -0,0 +1,68 @@
from typing import Any, Optional, Type, List
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
class ContextualAIRerankSchema(BaseModel):
"""Schema for contextual rerank tool."""
query: str = Field(..., description="The search query to rerank documents against")
documents: List[str] = Field(..., description="List of document texts to rerank")
instruction: Optional[str] = Field(default=None, description="Optional instruction for reranking behavior")
metadata: Optional[List[str]] = Field(default=None, description="Optional metadata for each document")
model: str = Field(default="ctxl-rerank-en-v1-instruct", description="Reranker model to use")
class ContextualAIRerankTool(BaseTool):
"""Tool to rerank documents using Contextual AI's instruction-following reranker."""
name: str = "Contextual AI Document Reranker"
description: str = "Rerank documents using Contextual AI's instruction-following reranker"
args_schema: Type[BaseModel] = ContextualAIRerankSchema
api_key: str
package_dependencies: List[str] = ["contextual-client"]
def _run(
self,
query: str,
documents: List[str],
instruction: Optional[str] = None,
metadata: Optional[List[str]] = None,
model: str = "ctxl-rerank-en-v1-instruct"
) -> str:
"""Rerank documents using Contextual AI's instruction-following reranker."""
try:
import requests
import json
base_url = "https://api.contextual.ai/v1"
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {self.api_key}"
}
payload = {
"query": query,
"documents": documents,
"model": model
}
if instruction:
payload["instruction"] = instruction
if metadata:
if len(metadata) != len(documents):
raise ValueError("Metadata list must have the same length as documents list")
payload["metadata"] = metadata
rerank_url = f"{base_url}/rerank"
result = requests.post(rerank_url, json=payload, headers=headers)
if result.status_code != 200:
raise RuntimeError(f"Reranker API returned status {result.status_code}: {result.text}")
return json.dumps(result.json(), indent=2)
except Exception as e:
return f"Failed to rerank documents: {str(e)}"