Compare commits

..

1 Commits

Author SHA1 Message Date
Brandon Hancock
b94d99918f explain how to use event listener 2025-02-27 10:15:52 -05:00
9 changed files with 32 additions and 428 deletions

View File

@@ -48,6 +48,7 @@ Define a crew with a designated manager and establish a clear chain of command.
</Tip>
```python Code
from langchain_openai import ChatOpenAI
from crewai import Crew, Process, Agent
# Agents are defined with attributes for backstory, cache, and verbose mode
@@ -55,51 +56,38 @@ researcher = Agent(
role='Researcher',
goal='Conduct in-depth analysis',
backstory='Experienced data analyst with a knack for uncovering hidden trends.',
cache=True,
verbose=False,
# tools=[] # This can be optionally specified; defaults to an empty list
use_system_prompt=True, # Enable or disable system prompts for this agent
max_rpm=30, # Limit on the number of requests per minute
max_iter=5 # Maximum number of iterations for a final answer
)
writer = Agent(
role='Writer',
goal='Create engaging content',
backstory='Creative writer passionate about storytelling in technical domains.',
cache=True,
verbose=False,
# tools=[] # Optionally specify tools; defaults to an empty list
use_system_prompt=True, # Enable or disable system prompts for this agent
max_rpm=30, # Limit on the number of requests per minute
max_iter=5 # Maximum number of iterations for a final answer
)
# Establishing the crew with a hierarchical process and additional configurations
project_crew = Crew(
tasks=[...], # Tasks to be delegated and executed under the manager's supervision
agents=[researcher, writer],
manager_llm="gpt-4o", # Specify which LLM the manager should use
process=Process.hierarchical,
planning=True,
manager_llm=ChatOpenAI(temperature=0, model="gpt-4"), # Mandatory if manager_agent is not set
process=Process.hierarchical, # Specifies the hierarchical management approach
respect_context_window=True, # Enable respect of the context window for tasks
memory=True, # Enable memory usage for enhanced task execution
manager_agent=None, # Optional: explicitly set a specific agent as manager instead of the manager_llm
planning=True, # Enable planning feature for pre-execution strategy
)
```
### Using a Custom Manager Agent
Alternatively, you can create a custom manager agent with specific attributes tailored to your project's management needs. This gives you more control over the manager's behavior and capabilities.
```python
# Define a custom manager agent
manager = Agent(
role="Project Manager",
goal="Efficiently manage the crew and ensure high-quality task completion",
backstory="You're an experienced project manager, skilled in overseeing complex projects and guiding teams to success.",
allow_delegation=True,
)
# Use the custom manager in your crew
project_crew = Crew(
tasks=[...],
agents=[researcher, writer],
manager_agent=manager, # Use your custom manager agent
process=Process.hierarchical,
planning=True,
)
```
<Tip>
For more details on creating and customizing a manager agent, check out the [Custom Manager Agent documentation](https://docs.crewai.com/how-to/custom-manager-agent#custom-manager-agent).
</Tip>
### Workflow in Action
1. **Task Assignment**: The manager assigns tasks strategically, considering each agent's capabilities and available tools.
@@ -109,4 +97,4 @@ project_crew = Crew(
## Conclusion
Adopting the hierarchical process in CrewAI, with the correct configurations and understanding of the system's capabilities, facilitates an organized and efficient approach to project management.
Utilize the advanced features and customizations to tailor the workflow to your specific needs, ensuring optimal task execution and project success.
Utilize the advanced features and customizations to tailor the workflow to your specific needs, ensuring optimal task execution and project success.

View File

@@ -139,7 +139,6 @@
"tools/nl2sqltool",
"tools/pdfsearchtool",
"tools/pgsearchtool",
"tools/qdrantvectorsearchtool",
"tools/scrapewebsitetool",
"tools/seleniumscrapingtool",
"tools/spidertool",

View File

@@ -1,271 +0,0 @@
---
title: 'Qdrant Vector Search Tool'
description: 'Semantic search capabilities for CrewAI agents using Qdrant vector database'
icon: magnifying-glass-plus
---
# `QdrantVectorSearchTool`
The Qdrant Vector Search Tool enables semantic search capabilities in your CrewAI agents by leveraging [Qdrant](https://qdrant.tech/), a vector similarity search engine. This tool allows your agents to search through documents stored in a Qdrant collection using semantic similarity.
## Installation
Install the required packages:
```bash
uv pip install 'crewai[tools] qdrant-client'
```
## Basic Usage
Here's a minimal example of how to use the tool:
```python
from crewai import Agent
from crewai_tools import QdrantVectorSearchTool
# Initialize the tool
qdrant_tool = QdrantVectorSearchTool(
qdrant_url="your_qdrant_url",
qdrant_api_key="your_qdrant_api_key",
collection_name="your_collection"
)
# Create an agent that uses the tool
agent = Agent(
role="Research Assistant",
goal="Find relevant information in documents",
tools=[qdrant_tool]
)
# The tool will automatically use OpenAI embeddings
# and return the 3 most relevant results with scores > 0.35
```
## Complete Working Example
Here's a complete example showing how to:
1. Extract text from a PDF
2. Generate embeddings using OpenAI
3. Store in Qdrant
4. Create a CrewAI agentic RAG workflow for semantic search
```python
import os
import uuid
import pdfplumber
from openai import OpenAI
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process, LLM
from crewai_tools import QdrantVectorSearchTool
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, Distance, VectorParams
# Load environment variables
load_dotenv()
# Initialize OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Extract text from PDF
def extract_text_from_pdf(pdf_path):
text = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text.append(page_text.strip())
return text
# Generate OpenAI embeddings
def get_openai_embedding(text):
response = client.embeddings.create(
input=text,
model="text-embedding-3-small"
)
return response.data[0].embedding
# Store text and embeddings in Qdrant
def load_pdf_to_qdrant(pdf_path, qdrant, collection_name):
# Extract text from PDF
text_chunks = extract_text_from_pdf(pdf_path)
# Create Qdrant collection
if qdrant.collection_exists(collection_name):
qdrant.delete_collection(collection_name)
qdrant.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)
# Store embeddings
points = []
for chunk in text_chunks:
embedding = get_openai_embedding(chunk)
points.append(PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={"text": chunk}
))
qdrant.upsert(collection_name=collection_name, points=points)
# Initialize Qdrant client and load data
qdrant = QdrantClient(
url=os.getenv("QDRANT_URL"),
api_key=os.getenv("QDRANT_API_KEY")
)
collection_name = "example_collection"
pdf_path = "path/to/your/document.pdf"
load_pdf_to_qdrant(pdf_path, qdrant, collection_name)
# Initialize Qdrant search tool
qdrant_tool = QdrantVectorSearchTool(
qdrant_url=os.getenv("QDRANT_URL"),
qdrant_api_key=os.getenv("QDRANT_API_KEY"),
collection_name=collection_name,
limit=3,
score_threshold=0.35
)
# Create CrewAI agents
search_agent = Agent(
role="Senior Semantic Search Agent",
goal="Find and analyze documents based on semantic search",
backstory="""You are an expert research assistant who can find relevant
information using semantic search in a Qdrant database.""",
tools=[qdrant_tool],
verbose=True
)
answer_agent = Agent(
role="Senior Answer Assistant",
goal="Generate answers to questions based on the context provided",
backstory="""You are an expert answer assistant who can generate
answers to questions based on the context provided.""",
tools=[qdrant_tool],
verbose=True
)
# Define tasks
search_task = Task(
description="""Search for relevant documents about the {query}.
Your final answer should include:
- The relevant information found
- The similarity scores of the results
- The metadata of the relevant documents""",
agent=search_agent
)
answer_task = Task(
description="""Given the context and metadata of relevant documents,
generate a final answer based on the context.""",
agent=answer_agent
)
# Run CrewAI workflow
crew = Crew(
agents=[search_agent, answer_agent],
tasks=[search_task, answer_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff(
inputs={"query": "What is the role of X in the document?"}
)
print(result)
```
## Tool Parameters
### Required Parameters
- `qdrant_url` (str): The URL of your Qdrant server
- `qdrant_api_key` (str): API key for authentication with Qdrant
- `collection_name` (str): Name of the Qdrant collection to search
### Optional Parameters
- `limit` (int): Maximum number of results to return (default: 3)
- `score_threshold` (float): Minimum similarity score threshold (default: 0.35)
- `custom_embedding_fn` (Callable[[str], list[float]]): Custom function for text vectorization
## Search Parameters
The tool accepts these parameters in its schema:
- `query` (str): The search query to find similar documents
- `filter_by` (str, optional): Metadata field to filter on
- `filter_value` (str, optional): Value to filter by
## Return Format
The tool returns results in JSON format:
```json
[
{
"metadata": {
// Any metadata stored with the document
},
"context": "The actual text content of the document",
"distance": 0.95 // Similarity score
}
]
```
## Default Embedding
By default, the tool uses OpenAI's `text-embedding-3-small` model for vectorization. This requires:
- OpenAI API key set in environment: `OPENAI_API_KEY`
## Custom Embeddings
Instead of using the default embedding model, you might want to use your own embedding function in cases where you:
1. Want to use a different embedding model (e.g., Cohere, HuggingFace, Ollama models)
2. Need to reduce costs by using open-source embedding models
3. Have specific requirements for vector dimensions or embedding quality
4. Want to use domain-specific embeddings (e.g., for medical or legal text)
Here's an example using a HuggingFace model:
```python
from transformers import AutoTokenizer, AutoModel
import torch
# Load model and tokenizer
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
def custom_embeddings(text: str) -> list[float]:
# Tokenize and get model outputs
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
outputs = model(**inputs)
# Use mean pooling to get text embedding
embeddings = outputs.last_hidden_state.mean(dim=1)
# Convert to list of floats and return
return embeddings[0].tolist()
# Use custom embeddings with the tool
tool = QdrantVectorSearchTool(
qdrant_url="your_url",
qdrant_api_key="your_key",
collection_name="your_collection",
custom_embedding_fn=custom_embeddings # Pass your custom function
)
```
## Error Handling
The tool handles these specific errors:
- Raises ImportError if `qdrant-client` is not installed (with option to auto-install)
- Raises ValueError if `QDRANT_URL` is not set
- Prompts to install `qdrant-client` if missing using `uv add qdrant-client`
## Environment Variables
Required environment variables:
```bash
export QDRANT_URL="your_qdrant_url" # If not provided in constructor
export QDRANT_API_KEY="your_api_key" # If not provided in constructor
export OPENAI_API_KEY="your_openai_key" # If using default embeddings

View File

@@ -249,7 +249,6 @@ class Agent(BaseAgent):
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
"ask_for_human_input": task.human_input,
"max_dialogue_rounds": task.max_dialogue_rounds,
}
)["output"]
except Exception as e:

View File

@@ -94,20 +94,10 @@ class CrewAgentExecutorMixin:
print(f"Failed to add to long term memory: {e}")
pass
def _ask_human_input(self, final_answer: str, current_round: int = 1, max_rounds: int = 10) -> str:
"""Prompt human input with mode-appropriate messaging.
Args:
final_answer: The final answer from the agent
current_round: The current dialogue round (default: 1)
max_rounds: Maximum number of dialogue rounds (default: 10)
Returns:
str: The user's feedback
"""
round_info = f"\033[1m\033[93mRound {current_round}/{max_rounds}\033[00m"
def _ask_human_input(self, final_answer: str) -> str:
"""Prompt human input with mode-appropriate messaging."""
self._printer.print(
content=f"\033[1m\033[95m ## Result {round_info}:\033[00m \033[92m{final_answer}\033[00m"
content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m"
)
# Training mode prompt (single iteration)
@@ -123,7 +113,7 @@ class CrewAgentExecutorMixin:
else:
prompt = (
"\n\n=====\n"
f"## HUMAN FEEDBACK (Round {current_round}/{max_rounds}): Provide feedback on the Result and Agent's actions.\n"
"## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n"
"Please follow these guidelines:\n"
" - If you are happy with the result, simply hit Enter without typing anything.\n"
" - Otherwise, provide specific improvement requests.\n"

View File

@@ -103,7 +103,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_start_logs()
self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False))
max_rounds = int(inputs.get("max_dialogue_rounds", 10))
try:
formatted_answer = self._invoke_loop()
@@ -122,7 +121,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
raise e
if self.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer, max_rounds)
formatted_answer = self._handle_human_feedback(formatted_answer)
self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
@@ -525,22 +524,21 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
prompt = prompt.rstrip()
return {"role": role, "content": prompt}
def _handle_human_feedback(self, formatted_answer: AgentFinish, max_rounds: int = 10) -> AgentFinish:
def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""Handle human feedback with different flows for training vs regular use.
Args:
formatted_answer: The initial AgentFinish result to get feedback on
max_rounds: Maximum number of dialogue rounds (default: 10)
Returns:
AgentFinish: The final answer after processing feedback
"""
human_feedback = self._ask_human_input(formatted_answer.output, 1, max_rounds)
human_feedback = self._ask_human_input(formatted_answer.output)
if self._is_training_mode():
return self._handle_training_feedback(formatted_answer, human_feedback)
return self._handle_regular_feedback(formatted_answer, human_feedback, max_rounds)
return self._handle_regular_feedback(formatted_answer, human_feedback)
def _is_training_mode(self) -> bool:
"""Check if crew is in training mode."""
@@ -562,33 +560,19 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
return improved_answer
def _handle_regular_feedback(
self, current_answer: AgentFinish, initial_feedback: str, max_rounds: int = 10
self, current_answer: AgentFinish, initial_feedback: str
) -> AgentFinish:
"""Process feedback for regular use with potential multiple iterations.
Args:
current_answer: The initial AgentFinish result to get feedback on
initial_feedback: The initial feedback from the user
max_rounds: Maximum number of dialogue rounds (default: 10)
Returns:
AgentFinish: The final answer after processing feedback
"""
if max_rounds < 1:
raise ValueError("max_rounds must be positive")
"""Process feedback for regular use with potential multiple iterations."""
feedback = initial_feedback
answer = current_answer
current_round = 1
while self.ask_for_human_input and current_round <= max_rounds:
while self.ask_for_human_input:
# If the user provides a blank response, assume they are happy with the result
if feedback.strip() == "":
self.ask_for_human_input = False
else:
answer = self._process_feedback_iteration(feedback)
feedback = self._ask_human_input(answer.output, current_round, max_rounds)
current_round += 1
feedback = self._ask_human_input(answer.output)
return answer

View File

@@ -125,12 +125,6 @@ class Task(BaseModel):
description="Whether the task should have a human review the final answer of the agent",
default=False,
)
max_dialogue_rounds: int = Field(
default=10,
description="Maximum number of dialogue rounds for human input",
ge=1, # Ensures positive integer
examples=[5, 10, 15],
)
converter_cls: Optional[Type[Converter]] = Field(
description="A converter class used to export structured output",
default=None,

View File

@@ -1206,7 +1206,6 @@ def test_agent_max_retry_limit():
"tool_names": "",
"tools": "",
"ask_for_human_input": True,
"max_dialogue_rounds": 10,
}
),
mock.call(
@@ -1215,7 +1214,6 @@ def test_agent_max_retry_limit():
"tool_names": "",
"tools": "",
"ask_for_human_input": True,
"max_dialogue_rounds": 10,
}
),
]

View File

@@ -1,77 +0,0 @@
import unittest
from unittest.mock import MagicMock, patch
from langchain_core.agents import AgentFinish
from crewai.task import Task
class TestMultiRoundDialogue(unittest.TestCase):
"""Test the multi-round dialogue functionality."""
def test_task_max_dialogue_rounds_default(self):
"""Test that Task has a default max_dialogue_rounds of 10."""
# Create a task with default max_dialogue_rounds
task = Task(
description="Test task",
expected_output="Test output",
human_input=True
)
# Verify the default value
self.assertEqual(task.max_dialogue_rounds, 10)
def test_task_max_dialogue_rounds_custom(self):
"""Test that Task accepts a custom max_dialogue_rounds."""
# Create a task with custom max_dialogue_rounds
task = Task(
description="Test task",
expected_output="Test output",
human_input=True,
max_dialogue_rounds=5
)
# Verify the custom value
self.assertEqual(task.max_dialogue_rounds, 5)
def test_task_max_dialogue_rounds_validation(self):
"""Test that Task validates max_dialogue_rounds as a positive integer."""
# Create a task with invalid max_dialogue_rounds
with self.assertRaises(ValueError):
task = Task(
description="Test task",
expected_output="Test output",
human_input=True,
max_dialogue_rounds=0
)
def test_handle_regular_feedback_rounds(self):
"""Test that _handle_regular_feedback correctly handles multiple rounds."""
from crewai.agents.crew_agent_executor import CrewAgentExecutor
# Create a simple mock executor
executor = MagicMock()
executor.ask_for_human_input = True
executor._ask_human_input = MagicMock(side_effect=["Feedback", ""])
executor._process_feedback_iteration = MagicMock(return_value=MagicMock())
# Create a sample initial answer
initial_answer = MagicMock()
# Call the method directly
CrewAgentExecutor._handle_regular_feedback(
executor,
initial_answer,
"Initial feedback",
max_rounds=3
)
# Verify the correct number of iterations occurred
# First call for initial feedback, second call for empty feedback to end loop
self.assertEqual(executor._ask_human_input.call_count, 2)
# The _process_feedback_iteration is called for the initial feedback and the first round
self.assertEqual(executor._process_feedback_iteration.call_count, 2)
if __name__ == "__main__":
unittest.main()