Compare commits

..

3 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
c507451ecf Merge branch 'main' into feat/improve-hierarchical-docs 2025-02-27 13:34:27 -05:00
Brandon Hancock (bhancock_ai)
48078f169f Merge branch 'main' into feat/improve-hierarchical-docs 2025-02-27 09:07:07 -05:00
Brandon Hancock
cdc5b62d57 Improve hierarchical docs 2025-02-27 09:05:19 -05:00
5 changed files with 3 additions and 451 deletions

View File

@@ -139,7 +139,6 @@
"tools/nl2sqltool",
"tools/pdfsearchtool",
"tools/pgsearchtool",
"tools/qdrantvectorsearchtool",
"tools/scrapewebsitetool",
"tools/seleniumscrapingtool",
"tools/spidertool",

View File

@@ -1,271 +0,0 @@
---
title: 'Qdrant Vector Search Tool'
description: 'Semantic search capabilities for CrewAI agents using Qdrant vector database'
icon: magnifying-glass-plus
---
# `QdrantVectorSearchTool`
The Qdrant Vector Search Tool enables semantic search capabilities in your CrewAI agents by leveraging [Qdrant](https://qdrant.tech/), a vector similarity search engine. This tool allows your agents to search through documents stored in a Qdrant collection using semantic similarity.
## Installation
Install the required packages:
```bash
uv pip install 'crewai[tools] qdrant-client'
```
## Basic Usage
Here's a minimal example of how to use the tool:
```python
from crewai import Agent
from crewai_tools import QdrantVectorSearchTool
# Initialize the tool
qdrant_tool = QdrantVectorSearchTool(
qdrant_url="your_qdrant_url",
qdrant_api_key="your_qdrant_api_key",
collection_name="your_collection"
)
# Create an agent that uses the tool
agent = Agent(
role="Research Assistant",
goal="Find relevant information in documents",
tools=[qdrant_tool]
)
# The tool will automatically use OpenAI embeddings
# and return the 3 most relevant results with scores > 0.35
```
## Complete Working Example
Here's a complete example showing how to:
1. Extract text from a PDF
2. Generate embeddings using OpenAI
3. Store in Qdrant
4. Create a CrewAI agentic RAG workflow for semantic search
```python
import os
import uuid
import pdfplumber
from openai import OpenAI
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process, LLM
from crewai_tools import QdrantVectorSearchTool
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, Distance, VectorParams
# Load environment variables
load_dotenv()
# Initialize OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Extract text from PDF
def extract_text_from_pdf(pdf_path):
text = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text.append(page_text.strip())
return text
# Generate OpenAI embeddings
def get_openai_embedding(text):
response = client.embeddings.create(
input=text,
model="text-embedding-3-small"
)
return response.data[0].embedding
# Store text and embeddings in Qdrant
def load_pdf_to_qdrant(pdf_path, qdrant, collection_name):
# Extract text from PDF
text_chunks = extract_text_from_pdf(pdf_path)
# Create Qdrant collection
if qdrant.collection_exists(collection_name):
qdrant.delete_collection(collection_name)
qdrant.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)
# Store embeddings
points = []
for chunk in text_chunks:
embedding = get_openai_embedding(chunk)
points.append(PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={"text": chunk}
))
qdrant.upsert(collection_name=collection_name, points=points)
# Initialize Qdrant client and load data
qdrant = QdrantClient(
url=os.getenv("QDRANT_URL"),
api_key=os.getenv("QDRANT_API_KEY")
)
collection_name = "example_collection"
pdf_path = "path/to/your/document.pdf"
load_pdf_to_qdrant(pdf_path, qdrant, collection_name)
# Initialize Qdrant search tool
qdrant_tool = QdrantVectorSearchTool(
qdrant_url=os.getenv("QDRANT_URL"),
qdrant_api_key=os.getenv("QDRANT_API_KEY"),
collection_name=collection_name,
limit=3,
score_threshold=0.35
)
# Create CrewAI agents
search_agent = Agent(
role="Senior Semantic Search Agent",
goal="Find and analyze documents based on semantic search",
backstory="""You are an expert research assistant who can find relevant
information using semantic search in a Qdrant database.""",
tools=[qdrant_tool],
verbose=True
)
answer_agent = Agent(
role="Senior Answer Assistant",
goal="Generate answers to questions based on the context provided",
backstory="""You are an expert answer assistant who can generate
answers to questions based on the context provided.""",
tools=[qdrant_tool],
verbose=True
)
# Define tasks
search_task = Task(
description="""Search for relevant documents about the {query}.
Your final answer should include:
- The relevant information found
- The similarity scores of the results
- The metadata of the relevant documents""",
agent=search_agent
)
answer_task = Task(
description="""Given the context and metadata of relevant documents,
generate a final answer based on the context.""",
agent=answer_agent
)
# Run CrewAI workflow
crew = Crew(
agents=[search_agent, answer_agent],
tasks=[search_task, answer_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff(
inputs={"query": "What is the role of X in the document?"}
)
print(result)
```
## Tool Parameters
### Required Parameters
- `qdrant_url` (str): The URL of your Qdrant server
- `qdrant_api_key` (str): API key for authentication with Qdrant
- `collection_name` (str): Name of the Qdrant collection to search
### Optional Parameters
- `limit` (int): Maximum number of results to return (default: 3)
- `score_threshold` (float): Minimum similarity score threshold (default: 0.35)
- `custom_embedding_fn` (Callable[[str], list[float]]): Custom function for text vectorization
## Search Parameters
The tool accepts these parameters in its schema:
- `query` (str): The search query to find similar documents
- `filter_by` (str, optional): Metadata field to filter on
- `filter_value` (str, optional): Value to filter by
## Return Format
The tool returns results in JSON format:
```json
[
{
"metadata": {
// Any metadata stored with the document
},
"context": "The actual text content of the document",
"distance": 0.95 // Similarity score
}
]
```
## Default Embedding
By default, the tool uses OpenAI's `text-embedding-3-small` model for vectorization. This requires:
- OpenAI API key set in environment: `OPENAI_API_KEY`
## Custom Embeddings
Instead of using the default embedding model, you might want to use your own embedding function in cases where you:
1. Want to use a different embedding model (e.g., Cohere, HuggingFace, Ollama models)
2. Need to reduce costs by using open-source embedding models
3. Have specific requirements for vector dimensions or embedding quality
4. Want to use domain-specific embeddings (e.g., for medical or legal text)
Here's an example using a HuggingFace model:
```python
from transformers import AutoTokenizer, AutoModel
import torch
# Load model and tokenizer
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
def custom_embeddings(text: str) -> list[float]:
# Tokenize and get model outputs
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
outputs = model(**inputs)
# Use mean pooling to get text embedding
embeddings = outputs.last_hidden_state.mean(dim=1)
# Convert to list of floats and return
return embeddings[0].tolist()
# Use custom embeddings with the tool
tool = QdrantVectorSearchTool(
qdrant_url="your_url",
qdrant_api_key="your_key",
collection_name="your_collection",
custom_embedding_fn=custom_embeddings # Pass your custom function
)
```
## Error Handling
The tool handles these specific errors:
- Raises ImportError if `qdrant-client` is not installed (with option to auto-install)
- Raises ValueError if `QDRANT_URL` is not set
- Prompts to install `qdrant-client` if missing using `uv add qdrant-client`
## Environment Variables
Required environment variables:
```bash
export QDRANT_URL="your_qdrant_url" # If not provided in constructor
export QDRANT_API_KEY="your_api_key" # If not provided in constructor
export OPENAI_API_KEY="your_openai_key" # If using default embeddings

View File

@@ -1098,19 +1098,8 @@ class Crew(BaseModel):
return required_inputs
def copy(self) -> 'Crew':
"""Create a deep copy of the Crew instance.
This method creates a new Crew instance with copies of all agents, tasks,
and other attributes. It handles special cases for certain attributes that
require custom copying logic, such as agents, tasks, and the manager_agent.
Returns:
Crew: A new Crew instance with copied components.
Raises:
RuntimeError: If there is an error during the copying process.
"""
def copy(self):
"""Create a deep copy of the Crew."""
exclude = {
"id",
@@ -1126,20 +1115,11 @@ class Crew(BaseModel):
"tasks",
"knowledge_sources",
"knowledge",
"manager_agent",
}
cloned_agents = [agent.copy() for agent in self.agents]
# Copy manager_agent if it exists
cloned_manager_agent = None
try:
if self.manager_agent is not None:
cloned_manager_agent = self.manager_agent.copy()
except Exception as e:
self._logger.log("warning", f"Failed to copy manager_agent: {e}")
task_mapping: Dict[str, Task] = {}
task_mapping = {}
cloned_tasks = []
existing_knowledge_sources = shallow_copy(self.knowledge_sources)
@@ -1170,7 +1150,6 @@ class Crew(BaseModel):
tasks=cloned_tasks,
knowledge_sources=existing_knowledge_sources,
knowledge=existing_knowledge,
manager_agent=cloned_manager_agent,
)
return copied_crew

View File

@@ -1,8 +0,0 @@
{
"foo": ["document1", "document2"],
"empty": [],
"special_chars": ["doc#1", "doc@2"],
"nested": {
"documents": ["nested1", "nested2"]
}
}

View File

@@ -1,147 +0,0 @@
import json
import os
from unittest.mock import MagicMock, patch
import pytest
from crewai import Agent, Crew, Process, Task
from crewai.crews.crew_output import CrewOutput
class TestManagerAgentKickoffForEach:
"""
Test suite for manager agent functionality with kickoff_for_each.
This test class verifies that using a manager agent with kickoff_for_each
doesn't raise validation errors, specifically addressing issue #2260.
"""
@pytest.fixture
def setup_crew(self):
"""Set up a crew with a manager agent for testing."""
# Define agents
researcher = Agent(
role="Researcher",
goal="Conduct thorough research and analysis on AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI, and startups. You work as a freelancer and are currently researching for a new client.",
allow_delegation=False
)
writer = Agent(
role="Senior Writer",
goal="Create compelling content about AI and AI agents",
backstory="You're a senior writer, specialized in technology, software engineering, AI, and startups. You work as a freelancer and are currently writing content for a new client.",
allow_delegation=False
)
# Define task
task = Task(
description="Generate a list of 5 interesting ideas for an article, then write one captivating paragraph for each idea that showcases the potential of a full article on this topic. Return the list of ideas with their paragraphs and your notes.",
expected_output="5 bullet points, each with a paragraph and accompanying notes.",
)
# Define manager agent
manager = Agent(
role="Project Manager",
goal="Efficiently manage the crew and ensure high-quality task completion",
backstory="You're an experienced project manager, skilled in overseeing complex projects and guiding teams to success. Your role is to coordinate the efforts of the crew members, ensuring that each task is completed on time and to the highest standard.",
allow_delegation=True
)
# Instantiate crew with a custom manager
crew = Crew(
agents=[researcher, writer],
tasks=[task],
manager_agent=manager,
process=Process.hierarchical,
verbose=True
)
return {
"crew": crew,
"researcher": researcher,
"writer": writer,
"manager": manager,
"task": task
}
@pytest.fixture
def test_data(self):
"""Load test data from JSON file."""
try:
test_data_path = os.path.join(os.path.dirname(__file__), "test_data", "test_kickoff_for_each.json")
with open(test_data_path) as f:
return json.load(f)
except FileNotFoundError:
pytest.skip("Test data file not found")
except json.JSONDecodeError:
pytest.skip("Invalid test data format")
def test_crew_copy_with_manager(self, setup_crew):
"""Test that copying a crew with a manager agent works correctly."""
crew = setup_crew["crew"]
# Create a copy of the crew to test that no validation errors occur
try:
crew_copy = crew.copy()
# Check that the manager_agent was properly copied
assert crew_copy.manager_agent is not None
assert crew_copy.manager_agent.id != crew.manager_agent.id
assert crew_copy.manager_agent.role == crew.manager_agent.role
assert crew_copy.manager_agent.goal == crew.manager_agent.goal
assert crew_copy.manager_agent.backstory == crew.manager_agent.backstory
except Exception as e:
pytest.fail(f"Crew copy with manager_agent raised an exception: {e}")
def test_kickoff_for_each_validation(self, setup_crew, test_data):
"""Test that kickoff_for_each doesn't raise validation errors."""
crew = setup_crew["crew"]
# Test that kickoff_for_each doesn't raise validation errors
# We'll patch the kickoff method to avoid actual LLM calls
with patch.object(Crew, 'kickoff', return_value=CrewOutput(final_output="Test output", task_outputs={})):
try:
outputs = crew.kickoff_for_each(inputs=[
{"document": document} for document in test_data["foo"]
])
assert len(outputs) == len(test_data["foo"])
except Exception as e:
if "validation error" in str(e).lower():
pytest.fail(f"kickoff_for_each raised validation errors: {e}")
else:
# Other errors are fine for this test, we're only checking for validation errors
pass
def test_manager_agent_error_handling(self, setup_crew, monkeypatch):
"""Test error handling when copying a manager agent."""
# Instead of trying to test the full copy method, we'll just test the specific
# part that handles manager_agent copying with a try-except block
# Create a logger mock to verify the warning is logged
mock_logger = MagicMock()
# Create a test crew with a manager agent that raises an exception when copied
class MockManagerAgent:
def copy(self):
raise Exception("Test exception")
# Create a simple test function that mimics the manager_agent copying logic
def test_copy_with_error_handling():
manager_agent = MockManagerAgent()
cloned_manager_agent = None
try:
if manager_agent is not None:
cloned_manager_agent = manager_agent.copy()
except Exception as e:
mock_logger.log("warning", f"Failed to copy manager_agent: {e}")
return cloned_manager_agent
# Call the test function
result = test_copy_with_error_handling()
# Verify that the manager_agent is None after the exception
assert result is None
# Verify that the warning was logged
mock_logger.log.assert_called_once_with("warning", "Failed to copy manager_agent: Test exception")