Compare commits

...

10 Commits

Author SHA1 Message Date
Thiago Moretto
861560aa93 Adds Task key to the output to back ref to it on callbacks/webhooks 2025-02-27 16:31:25 -03:00
Tony Kipkemboi
86825e1769 docs: add Qdrant vector search tool documentation (#2184)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:54:44 -05:00
Brandon Hancock (bhancock_ai)
7afc531fbb Improve hierarchical docs (#2244) 2025-02-27 13:38:21 -05:00
Brandon Hancock (bhancock_ai)
ed0490112b explain how to use event listener (#2245) 2025-02-27 13:32:16 -05:00
Brandon Hancock (bhancock_ai)
66c66e3d84 Update docs (#2226) 2025-02-26 15:21:36 -05:00
Brandon Hancock (bhancock_ai)
b9b625a70d Improve extract thought (#2223)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-26 14:51:46 -05:00
Brandon Hancock (bhancock_ai)
b58253cacc Support multiple router calls and address issue #2175 (#2231)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-26 13:42:17 -05:00
Brandon Hancock (bhancock_ai)
fbf8732784 Fix type issue (#2224)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-26 13:27:41 -05:00
Brandon Hancock (bhancock_ai)
8fedbe49cb Add support for python 3.10 (#2230) 2025-02-26 13:24:31 -05:00
Lorenze Jay
1e8ee247ca feat: Enhance agent knowledge setup with optional crew embedder (#2232)
- Modify `Agent` class to add `set_knowledge` method
- Allow setting embedder from crew-level configuration
- Remove `_set_knowledge` method from initialization
- Update `Crew` class to set agent knowledge during agent setup
- Add default implementation in `BaseAgent` for compatibility
2025-02-26 12:10:43 -05:00
20 changed files with 932 additions and 81 deletions

View File

@@ -136,17 +136,21 @@ crewai test -n 5 -m gpt-3.5-turbo
### 8. Run
Run the crew.
Run the crew or flow.
```shell Terminal
crewai run
```
<Note>
Starting from version 0.103.0, the `crewai run` command can be used to run both standard crews and flows. For flows, it automatically detects the type from pyproject.toml and runs the appropriate command. This is now the recommended way to run both crews and flows.
</Note>
<Note>
Make sure to run these commands from the directory where your CrewAI project is set up.
Some commands may require additional configuration or setup within your project structure.
</Note>
### 9. Chat
Starting in version `0.98.0`, when you run the `crewai chat` command, you start an interactive session with your crew. The AI assistant will guide you by asking for necessary inputs to execute the crew. Once all inputs are provided, the crew will execute its tasks.
@@ -175,7 +179,6 @@ def crew(self) -> Crew:
```
</Note>
### 10. API Keys
When running ```crewai create crew``` command, the CLI will first show you the top 5 most common LLM providers and ask you to select one.

View File

@@ -0,0 +1,349 @@
---
title: 'Event Listeners'
description: 'Tap into CrewAI events to build custom integrations and monitoring'
---
# Event Listeners
CrewAI provides a powerful event system that allows you to listen for and react to various events that occur during the execution of your Crew. This feature enables you to build custom integrations, monitoring solutions, logging systems, or any other functionality that needs to be triggered based on CrewAI's internal events.
## How It Works
CrewAI uses an event bus architecture to emit events throughout the execution lifecycle. The event system is built on the following components:
1. **CrewAIEventsBus**: A singleton event bus that manages event registration and emission
2. **CrewEvent**: Base class for all events in the system
3. **BaseEventListener**: Abstract base class for creating custom event listeners
When specific actions occur in CrewAI (like a Crew starting execution, an Agent completing a task, or a tool being used), the system emits corresponding events. You can register handlers for these events to execute custom code when they occur.
## Creating a Custom Event Listener
To create a custom event listener, you need to:
1. Create a class that inherits from `BaseEventListener`
2. Implement the `setup_listeners` method
3. Register handlers for the events you're interested in
4. Create an instance of your listener in the appropriate file
Here's a simple example of a custom event listener class:
```python
from crewai.utilities.events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
AgentExecutionCompletedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
class MyCustomListener(BaseEventListener):
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event):
print(f"Crew '{event.crew_name}' has started execution!")
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
print(f"Crew '{event.crew_name}' has completed execution!")
print(f"Output: {event.output}")
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event):
print(f"Agent '{event.agent.role}' completed task")
print(f"Output: {event.output}")
```
## Properly Registering Your Listener
Simply defining your listener class isn't enough. You need to create an instance of it and ensure it's imported in your application. This ensures that:
1. The event handlers are registered with the event bus
2. The listener instance remains in memory (not garbage collected)
3. The listener is active when events are emitted
### Option 1: Import and Instantiate in Your Crew or Flow Implementation
The most important thing is to create an instance of your listener in the file where your Crew or Flow is defined and executed:
#### For Crew-based Applications
Create and import your listener at the top of your Crew implementation file:
```python
# In your crew.py file
from crewai import Agent, Crew, Task
from my_listeners import MyCustomListener
# Create an instance of your listener
my_listener = MyCustomListener()
class MyCustomCrew:
# Your crew implementation...
def crew(self):
return Crew(
agents=[...],
tasks=[...],
# ...
)
```
#### For Flow-based Applications
Create and import your listener at the top of your Flow implementation file:
```python
# In your main.py or flow.py file
from crewai.flow import Flow, listen, start
from my_listeners import MyCustomListener
# Create an instance of your listener
my_listener = MyCustomListener()
class MyCustomFlow(Flow):
# Your flow implementation...
@start()
def first_step(self):
# ...
```
This ensures that your listener is loaded and active when your Crew or Flow is executed.
### Option 2: Create a Package for Your Listeners
For a more structured approach, especially if you have multiple listeners:
1. Create a package for your listeners:
```
my_project/
├── listeners/
│ ├── __init__.py
│ ├── my_custom_listener.py
│ └── another_listener.py
```
2. In `my_custom_listener.py`, define your listener class and create an instance:
```python
# my_custom_listener.py
from crewai.utilities.events.base_event_listener import BaseEventListener
# ... import events ...
class MyCustomListener(BaseEventListener):
# ... implementation ...
# Create an instance of your listener
my_custom_listener = MyCustomListener()
```
3. In `__init__.py`, import the listener instances to ensure they're loaded:
```python
# __init__.py
from .my_custom_listener import my_custom_listener
from .another_listener import another_listener
# Optionally export them if you need to access them elsewhere
__all__ = ['my_custom_listener', 'another_listener']
```
4. Import your listeners package in your Crew or Flow file:
```python
# In your crew.py or flow.py file
import my_project.listeners # This loads all your listeners
class MyCustomCrew:
# Your crew implementation...
```
This is exactly how CrewAI's built-in `agentops_listener` is registered. In the CrewAI codebase, you'll find:
```python
# src/crewai/utilities/events/third_party/__init__.py
from .agentops_listener import agentops_listener
```
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
## Available Event Types
CrewAI provides a wide range of events that you can listen for:
### Crew Events
- **CrewKickoffStartedEvent**: Emitted when a Crew starts execution
- **CrewKickoffCompletedEvent**: Emitted when a Crew completes execution
- **CrewKickoffFailedEvent**: Emitted when a Crew fails to complete execution
- **CrewTestStartedEvent**: Emitted when a Crew starts testing
- **CrewTestCompletedEvent**: Emitted when a Crew completes testing
- **CrewTestFailedEvent**: Emitted when a Crew fails to complete testing
- **CrewTrainStartedEvent**: Emitted when a Crew starts training
- **CrewTrainCompletedEvent**: Emitted when a Crew completes training
- **CrewTrainFailedEvent**: Emitted when a Crew fails to complete training
### Agent Events
- **AgentExecutionStartedEvent**: Emitted when an Agent starts executing a task
- **AgentExecutionCompletedEvent**: Emitted when an Agent completes executing a task
- **AgentExecutionErrorEvent**: Emitted when an Agent encounters an error during execution
### Task Events
- **TaskStartedEvent**: Emitted when a Task starts execution
- **TaskCompletedEvent**: Emitted when a Task completes execution
- **TaskFailedEvent**: Emitted when a Task fails to complete execution
- **TaskEvaluationEvent**: Emitted when a Task is evaluated
### Tool Usage Events
- **ToolUsageStartedEvent**: Emitted when a tool execution is started
- **ToolUsageFinishedEvent**: Emitted when a tool execution is completed
- **ToolUsageErrorEvent**: Emitted when a tool execution encounters an error
- **ToolValidateInputErrorEvent**: Emitted when a tool input validation encounters an error
- **ToolExecutionErrorEvent**: Emitted when a tool execution encounters an error
- **ToolSelectionErrorEvent**: Emitted when there's an error selecting a tool
### Flow Events
- **FlowCreatedEvent**: Emitted when a Flow is created
- **FlowStartedEvent**: Emitted when a Flow starts execution
- **FlowFinishedEvent**: Emitted when a Flow completes execution
- **FlowPlotEvent**: Emitted when a Flow is plotted
- **MethodExecutionStartedEvent**: Emitted when a Flow method starts execution
- **MethodExecutionFinishedEvent**: Emitted when a Flow method completes execution
- **MethodExecutionFailedEvent**: Emitted when a Flow method fails to complete execution
### LLM Events
- **LLMCallStartedEvent**: Emitted when an LLM call starts
- **LLMCallCompletedEvent**: Emitted when an LLM call completes
- **LLMCallFailedEvent**: Emitted when an LLM call fails
## Event Handler Structure
Each event handler receives two parameters:
1. **source**: The object that emitted the event
2. **event**: The event instance, containing event-specific data
The structure of the event object depends on the event type, but all events inherit from `CrewEvent` and include:
- **timestamp**: The time when the event was emitted
- **type**: A string identifier for the event type
Additional fields vary by event type. For example, `CrewKickoffCompletedEvent` includes `crew_name` and `output` fields.
## Real-World Example: Integration with AgentOps
CrewAI includes an example of a third-party integration with [AgentOps](https://github.com/AgentOps-AI/agentops), a monitoring and observability platform for AI agents. Here's how it's implemented:
```python
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
```
This listener initializes an AgentOps session when a Crew starts, registers agents with AgentOps, tracks tool usage, and ends the session when the Crew completes.
The AgentOps listener is registered in CrewAI's event system through the import in `src/crewai/utilities/events/third_party/__init__.py`:
```python
from .agentops_listener import agentops_listener
```
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
## Advanced Usage: Scoped Handlers
For temporary event handling (useful for testing or specific operations), you can use the `scoped_handlers` context manager:
```python
from crewai.utilities.events import crewai_event_bus, CrewKickoffStartedEvent
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStartedEvent)
def temp_handler(source, event):
print("This handler only exists within this context")
# Do something that emits events
# Outside the context, the temporary handler is removed
```
## Use Cases
Event listeners can be used for a variety of purposes:
1. **Logging and Monitoring**: Track the execution of your Crew and log important events
2. **Analytics**: Collect data about your Crew's performance and behavior
3. **Debugging**: Set up temporary listeners to debug specific issues
4. **Integration**: Connect CrewAI with external systems like monitoring platforms, databases, or notification services
5. **Custom Behavior**: Trigger custom actions based on specific events
## Best Practices
1. **Keep Handlers Light**: Event handlers should be lightweight and avoid blocking operations
2. **Error Handling**: Include proper error handling in your event handlers to prevent exceptions from affecting the main execution
3. **Cleanup**: If your listener allocates resources, ensure they're properly cleaned up
4. **Selective Listening**: Only listen for events you actually need to handle
5. **Testing**: Test your event listeners in isolation to ensure they behave as expected
By leveraging CrewAI's event system, you can extend its functionality and integrate it seamlessly with your existing infrastructure.

View File

@@ -150,12 +150,12 @@ final_output = flow.kickoff()
print("---- Final Output ----")
print(final_output)
````
```
```text Output
---- Final Output ----
Second method received: Output from first_method
````
```
</CodeGroup>
@@ -738,3 +738,34 @@ Also, check out our YouTube video on how to use flows in CrewAI below!
referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen
></iframe>
## Running Flows
There are two ways to run a flow:
### Using the Flow API
You can run a flow programmatically by creating an instance of your flow class and calling the `kickoff()` method:
```python
flow = ExampleFlow()
result = flow.kickoff()
```
### Using the CLI
Starting from version 0.103.0, you can run flows using the `crewai run` command:
```shell
crewai run
```
This command automatically detects if your project is a flow (based on the `type = "flow"` setting in your pyproject.toml) and runs it accordingly. This is the recommended way to run flows from the command line.
For backward compatibility, you can also use:
```shell
crewai flow kickoff
```
However, the `crewai run` command is now the preferred method as it works for both crews and flows.

View File

@@ -48,7 +48,6 @@ Define a crew with a designated manager and establish a clear chain of command.
</Tip>
```python Code
from langchain_openai import ChatOpenAI
from crewai import Crew, Process, Agent
# Agents are defined with attributes for backstory, cache, and verbose mode
@@ -56,38 +55,51 @@ researcher = Agent(
role='Researcher',
goal='Conduct in-depth analysis',
backstory='Experienced data analyst with a knack for uncovering hidden trends.',
cache=True,
verbose=False,
# tools=[] # This can be optionally specified; defaults to an empty list
use_system_prompt=True, # Enable or disable system prompts for this agent
max_rpm=30, # Limit on the number of requests per minute
max_iter=5 # Maximum number of iterations for a final answer
)
writer = Agent(
role='Writer',
goal='Create engaging content',
backstory='Creative writer passionate about storytelling in technical domains.',
cache=True,
verbose=False,
# tools=[] # Optionally specify tools; defaults to an empty list
use_system_prompt=True, # Enable or disable system prompts for this agent
max_rpm=30, # Limit on the number of requests per minute
max_iter=5 # Maximum number of iterations for a final answer
)
# Establishing the crew with a hierarchical process and additional configurations
project_crew = Crew(
tasks=[...], # Tasks to be delegated and executed under the manager's supervision
agents=[researcher, writer],
manager_llm=ChatOpenAI(temperature=0, model="gpt-4"), # Mandatory if manager_agent is not set
process=Process.hierarchical, # Specifies the hierarchical management approach
respect_context_window=True, # Enable respect of the context window for tasks
memory=True, # Enable memory usage for enhanced task execution
manager_agent=None, # Optional: explicitly set a specific agent as manager instead of the manager_llm
planning=True, # Enable planning feature for pre-execution strategy
manager_llm="gpt-4o", # Specify which LLM the manager should use
process=Process.hierarchical,
planning=True,
)
```
### Using a Custom Manager Agent
Alternatively, you can create a custom manager agent with specific attributes tailored to your project's management needs. This gives you more control over the manager's behavior and capabilities.
```python
# Define a custom manager agent
manager = Agent(
role="Project Manager",
goal="Efficiently manage the crew and ensure high-quality task completion",
backstory="You're an experienced project manager, skilled in overseeing complex projects and guiding teams to success.",
allow_delegation=True,
)
# Use the custom manager in your crew
project_crew = Crew(
tasks=[...],
agents=[researcher, writer],
manager_agent=manager, # Use your custom manager agent
process=Process.hierarchical,
planning=True,
)
```
<Tip>
For more details on creating and customizing a manager agent, check out the [Custom Manager Agent documentation](https://docs.crewai.com/how-to/custom-manager-agent#custom-manager-agent).
</Tip>
### Workflow in Action
1. **Task Assignment**: The manager assigns tasks strategically, considering each agent's capabilities and available tools.
@@ -97,4 +109,4 @@ project_crew = Crew(
## Conclusion
Adopting the hierarchical process in CrewAI, with the correct configurations and understanding of the system's capabilities, facilitates an organized and efficient approach to project management.
Utilize the advanced features and customizations to tailor the workflow to your specific needs, ensuring optimal task execution and project success.
Utilize the advanced features and customizations to tailor the workflow to your specific needs, ensuring optimal task execution and project success.

View File

@@ -139,6 +139,7 @@
"tools/nl2sqltool",
"tools/pdfsearchtool",
"tools/pgsearchtool",
"tools/qdrantvectorsearchtool",
"tools/scrapewebsitetool",
"tools/seleniumscrapingtool",
"tools/spidertool",

View File

@@ -0,0 +1,271 @@
---
title: 'Qdrant Vector Search Tool'
description: 'Semantic search capabilities for CrewAI agents using Qdrant vector database'
icon: magnifying-glass-plus
---
# `QdrantVectorSearchTool`
The Qdrant Vector Search Tool enables semantic search capabilities in your CrewAI agents by leveraging [Qdrant](https://qdrant.tech/), a vector similarity search engine. This tool allows your agents to search through documents stored in a Qdrant collection using semantic similarity.
## Installation
Install the required packages:
```bash
uv pip install 'crewai[tools] qdrant-client'
```
## Basic Usage
Here's a minimal example of how to use the tool:
```python
from crewai import Agent
from crewai_tools import QdrantVectorSearchTool
# Initialize the tool
qdrant_tool = QdrantVectorSearchTool(
qdrant_url="your_qdrant_url",
qdrant_api_key="your_qdrant_api_key",
collection_name="your_collection"
)
# Create an agent that uses the tool
agent = Agent(
role="Research Assistant",
goal="Find relevant information in documents",
tools=[qdrant_tool]
)
# The tool will automatically use OpenAI embeddings
# and return the 3 most relevant results with scores > 0.35
```
## Complete Working Example
Here's a complete example showing how to:
1. Extract text from a PDF
2. Generate embeddings using OpenAI
3. Store in Qdrant
4. Create a CrewAI agentic RAG workflow for semantic search
```python
import os
import uuid
import pdfplumber
from openai import OpenAI
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process, LLM
from crewai_tools import QdrantVectorSearchTool
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, Distance, VectorParams
# Load environment variables
load_dotenv()
# Initialize OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Extract text from PDF
def extract_text_from_pdf(pdf_path):
text = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text.append(page_text.strip())
return text
# Generate OpenAI embeddings
def get_openai_embedding(text):
response = client.embeddings.create(
input=text,
model="text-embedding-3-small"
)
return response.data[0].embedding
# Store text and embeddings in Qdrant
def load_pdf_to_qdrant(pdf_path, qdrant, collection_name):
# Extract text from PDF
text_chunks = extract_text_from_pdf(pdf_path)
# Create Qdrant collection
if qdrant.collection_exists(collection_name):
qdrant.delete_collection(collection_name)
qdrant.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)
# Store embeddings
points = []
for chunk in text_chunks:
embedding = get_openai_embedding(chunk)
points.append(PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={"text": chunk}
))
qdrant.upsert(collection_name=collection_name, points=points)
# Initialize Qdrant client and load data
qdrant = QdrantClient(
url=os.getenv("QDRANT_URL"),
api_key=os.getenv("QDRANT_API_KEY")
)
collection_name = "example_collection"
pdf_path = "path/to/your/document.pdf"
load_pdf_to_qdrant(pdf_path, qdrant, collection_name)
# Initialize Qdrant search tool
qdrant_tool = QdrantVectorSearchTool(
qdrant_url=os.getenv("QDRANT_URL"),
qdrant_api_key=os.getenv("QDRANT_API_KEY"),
collection_name=collection_name,
limit=3,
score_threshold=0.35
)
# Create CrewAI agents
search_agent = Agent(
role="Senior Semantic Search Agent",
goal="Find and analyze documents based on semantic search",
backstory="""You are an expert research assistant who can find relevant
information using semantic search in a Qdrant database.""",
tools=[qdrant_tool],
verbose=True
)
answer_agent = Agent(
role="Senior Answer Assistant",
goal="Generate answers to questions based on the context provided",
backstory="""You are an expert answer assistant who can generate
answers to questions based on the context provided.""",
tools=[qdrant_tool],
verbose=True
)
# Define tasks
search_task = Task(
description="""Search for relevant documents about the {query}.
Your final answer should include:
- The relevant information found
- The similarity scores of the results
- The metadata of the relevant documents""",
agent=search_agent
)
answer_task = Task(
description="""Given the context and metadata of relevant documents,
generate a final answer based on the context.""",
agent=answer_agent
)
# Run CrewAI workflow
crew = Crew(
agents=[search_agent, answer_agent],
tasks=[search_task, answer_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff(
inputs={"query": "What is the role of X in the document?"}
)
print(result)
```
## Tool Parameters
### Required Parameters
- `qdrant_url` (str): The URL of your Qdrant server
- `qdrant_api_key` (str): API key for authentication with Qdrant
- `collection_name` (str): Name of the Qdrant collection to search
### Optional Parameters
- `limit` (int): Maximum number of results to return (default: 3)
- `score_threshold` (float): Minimum similarity score threshold (default: 0.35)
- `custom_embedding_fn` (Callable[[str], list[float]]): Custom function for text vectorization
## Search Parameters
The tool accepts these parameters in its schema:
- `query` (str): The search query to find similar documents
- `filter_by` (str, optional): Metadata field to filter on
- `filter_value` (str, optional): Value to filter by
## Return Format
The tool returns results in JSON format:
```json
[
{
"metadata": {
// Any metadata stored with the document
},
"context": "The actual text content of the document",
"distance": 0.95 // Similarity score
}
]
```
## Default Embedding
By default, the tool uses OpenAI's `text-embedding-3-small` model for vectorization. This requires:
- OpenAI API key set in environment: `OPENAI_API_KEY`
## Custom Embeddings
Instead of using the default embedding model, you might want to use your own embedding function in cases where you:
1. Want to use a different embedding model (e.g., Cohere, HuggingFace, Ollama models)
2. Need to reduce costs by using open-source embedding models
3. Have specific requirements for vector dimensions or embedding quality
4. Want to use domain-specific embeddings (e.g., for medical or legal text)
Here's an example using a HuggingFace model:
```python
from transformers import AutoTokenizer, AutoModel
import torch
# Load model and tokenizer
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
def custom_embeddings(text: str) -> list[float]:
# Tokenize and get model outputs
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
outputs = model(**inputs)
# Use mean pooling to get text embedding
embeddings = outputs.last_hidden_state.mean(dim=1)
# Convert to list of floats and return
return embeddings[0].tolist()
# Use custom embeddings with the tool
tool = QdrantVectorSearchTool(
qdrant_url="your_url",
qdrant_api_key="your_key",
collection_name="your_collection",
custom_embedding_fn=custom_embeddings # Pass your custom function
)
```
## Error Handling
The tool handles these specific errors:
- Raises ImportError if `qdrant-client` is not installed (with option to auto-install)
- Raises ValueError if `QDRANT_URL` is not set
- Prompts to install `qdrant-client` if missing using `uv add qdrant-client`
## Environment Variables
Required environment variables:
```bash
export QDRANT_URL="your_qdrant_url" # If not provided in constructor
export QDRANT_API_KEY="your_api_key" # If not provided in constructor
export OPENAI_API_KEY="your_openai_key" # If using default embeddings

View File

@@ -114,7 +114,6 @@ class Agent(BaseAgent):
@model_validator(mode="after")
def post_init_setup(self):
self._set_knowledge()
self.agent_ops_agent_name = self.role
self.llm = create_llm(self.llm)
@@ -134,8 +133,11 @@ class Agent(BaseAgent):
self.cache_handler = CacheHandler()
self.set_cache_handler(self.cache_handler)
def _set_knowledge(self):
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
try:
if self.embedder is None and crew_embedder:
self.embedder = crew_embedder
if self.knowledge_sources:
full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)")
knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}"

View File

@@ -351,3 +351,6 @@ class BaseAgent(ABC, BaseModel):
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.create_agent_executor()
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
pass

View File

@@ -124,14 +124,15 @@ class CrewAgentParser:
)
def _extract_thought(self, text: str) -> str:
regex = r"(.*?)(?:\n\nAction|\n\nFinal Answer)"
thought_match = re.search(regex, text, re.DOTALL)
if thought_match:
thought = thought_match.group(1).strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
return ""
thought_index = text.find("\n\nAction")
if thought_index == -1:
thought_index = text.find("\n\nFinal Answer")
if thought_index == -1:
return ""
thought = text[:thought_index].strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
def _clean_action(self, text: str) -> str:
"""Clean action string by removing non-essential formatting characters."""

View File

@@ -203,7 +203,6 @@ def install(context):
@crewai.command()
def run():
"""Run the Crew."""
click.echo("Running the Crew")
run_crew()

View File

@@ -1,4 +1,6 @@
import subprocess
from enum import Enum
from typing import List, Optional
import click
from packaging import version
@@ -7,16 +9,24 @@ from crewai.cli.utils import read_toml
from crewai.cli.version import get_crewai_version
class CrewType(Enum):
STANDARD = "standard"
FLOW = "flow"
def run_crew() -> None:
"""
Run the crew by running a command in the UV environment.
Run the crew or flow by running a command in the UV environment.
Starting from version 0.103.0, this command can be used to run both
standard crews and flows. For flows, it detects the type from pyproject.toml
and automatically runs the appropriate command.
"""
command = ["uv", "run", "run_crew"]
crewai_version = get_crewai_version()
min_required_version = "0.71.0"
pyproject_data = read_toml()
# Check for legacy poetry configuration
if pyproject_data.get("tool", {}).get("poetry") and (
version.parse(crewai_version) < version.parse(min_required_version)
):
@@ -26,18 +36,54 @@ def run_crew() -> None:
fg="red",
)
# Determine crew type
is_flow = pyproject_data.get("tool", {}).get("crewai", {}).get("type") == "flow"
crew_type = CrewType.FLOW if is_flow else CrewType.STANDARD
# Display appropriate message
click.echo(f"Running the {'Flow' if is_flow else 'Crew'}")
# Execute the appropriate command
execute_command(crew_type)
def execute_command(crew_type: CrewType) -> None:
"""
Execute the appropriate command based on crew type.
Args:
crew_type: The type of crew to run
"""
command = ["uv", "run", "kickoff" if crew_type == CrewType.FLOW else "run_crew"]
try:
subprocess.run(command, capture_output=False, text=True, check=True)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while running the crew: {e}", err=True)
click.echo(e.output, err=True, nl=True)
if pyproject_data.get("tool", {}).get("poetry"):
click.secho(
"It's possible that you are using an old version of crewAI that uses poetry, please run `crewai update` to update your pyproject.toml to use uv.",
fg="yellow",
)
handle_error(e, crew_type)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
def handle_error(error: subprocess.CalledProcessError, crew_type: CrewType) -> None:
"""
Handle subprocess errors with appropriate messaging.
Args:
error: The subprocess error that occurred
crew_type: The type of crew that was being run
"""
entity_type = "flow" if crew_type == CrewType.FLOW else "crew"
click.echo(f"An error occurred while running the {entity_type}: {error}", err=True)
if error.output:
click.echo(error.output, err=True, nl=True)
pyproject_data = read_toml()
if pyproject_data.get("tool", {}).get("poetry"):
click.secho(
"It's possible that you are using an old version of crewAI that uses poetry, "
"please run `crewai update` to update your pyproject.toml to use uv.",
fg="yellow",
)

View File

@@ -30,13 +30,13 @@ crewai install
## Running the Project
To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project:
To kickstart your flow and begin execution, run this from the root folder of your project:
```bash
crewai run
```
This command initializes the {{name}} Crew, assembling the agents and assigning them tasks as defined in your configuration.
This command initializes the {{name}} Flow as defined in your configuration.
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.

View File

@@ -600,6 +600,7 @@ class Crew(BaseModel):
agent.i18n = i18n
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
agent.crew = self # type: ignore[attr-defined]
agent.set_knowledge(crew_embedder=self.embedder)
# TODO: Create an AgentFunctionCalling protocol for future refactoring
if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm"

View File

@@ -894,35 +894,45 @@ class Flow(Generic[T], metaclass=FlowMeta):
Notes
-----
- Routers are executed sequentially to maintain flow control
- Each router's result becomes the new trigger_method
- Each router's result becomes a new trigger_method
- Normal listeners are executed in parallel for efficiency
- Listeners can receive the trigger method's result as a parameter
"""
# First, handle routers repeatedly until no router triggers anymore
router_results = []
current_trigger = trigger_method
while True:
routers_triggered = self._find_triggered_methods(
trigger_method, router_only=True
current_trigger, router_only=True
)
if not routers_triggered:
break
for router_name in routers_triggered:
await self._execute_single_listener(router_name, result)
# After executing router, the router's result is the path
# The last router executed sets the trigger_method
# The router result is the last element in self._method_outputs
trigger_method = self._method_outputs[-1]
router_result = self._method_outputs[-1]
if router_result: # Only add non-None results
router_results.append(router_result)
current_trigger = (
router_result # Update for next iteration of router chain
)
# Now that no more routers are triggered by current trigger_method,
# execute normal listeners
listeners_triggered = self._find_triggered_methods(
trigger_method, router_only=False
)
if listeners_triggered:
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
# Now execute normal listeners for all router results and the original trigger
all_triggers = [trigger_method] + router_results
for current_trigger in all_triggers:
if current_trigger: # Skip None results
listeners_triggered = self._find_triggered_methods(
current_trigger, router_only=False
)
if listeners_triggered:
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
def _find_triggered_methods(
self, trigger_method: str, router_only: bool

View File

@@ -4,7 +4,7 @@ SQLite-based implementation of flow state persistence.
import json
import sqlite3
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional, Union
@@ -34,6 +34,7 @@ class SQLiteFlowPersistence(FlowPersistence):
ValueError: If db_path is invalid
"""
from crewai.utilities.paths import db_storage_path
# Get path from argument or default location
path = db_path or str(Path(db_storage_path()) / "flow_states.db")
@@ -46,7 +47,8 @@ class SQLiteFlowPersistence(FlowPersistence):
def init_db(self) -> None:
"""Create the necessary tables if they don't exist."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
conn.execute(
"""
CREATE TABLE IF NOT EXISTS flow_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
flow_uuid TEXT NOT NULL,
@@ -54,12 +56,15 @@ class SQLiteFlowPersistence(FlowPersistence):
timestamp DATETIME NOT NULL,
state_json TEXT NOT NULL
)
""")
"""
)
# Add index for faster UUID lookups
conn.execute("""
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_flow_states_uuid
ON flow_states(flow_uuid)
""")
"""
)
def save_state(
self,
@@ -85,19 +90,22 @@ class SQLiteFlowPersistence(FlowPersistence):
)
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
conn.execute(
"""
INSERT INTO flow_states (
flow_uuid,
method_name,
timestamp,
state_json
) VALUES (?, ?, ?, ?)
""", (
flow_uuid,
method_name,
datetime.utcnow().isoformat(),
json.dumps(state_dict),
))
""",
(
flow_uuid,
method_name,
datetime.now(timezone.utc).isoformat(),
json.dumps(state_dict),
),
)
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
"""Load the most recent state for a given flow UUID.
@@ -109,13 +117,16 @@ class SQLiteFlowPersistence(FlowPersistence):
The most recent state as a dictionary, or None if no state exists
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
cursor = conn.execute(
"""
SELECT state_json
FROM flow_states
WHERE flow_uuid = ?
ORDER BY id DESC
LIMIT 1
""", (flow_uuid,))
""",
(flow_uuid,),
)
row = cursor.fetchone()
if row:

View File

@@ -373,6 +373,7 @@ class Task(BaseModel):
pydantic_output, json_output = self._export_output(result)
task_output = TaskOutput(
key=self.key,
name=self.name,
description=self.description,
expected_output=self.expected_output,

View File

@@ -9,6 +9,7 @@ from crewai.tasks.output_format import OutputFormat
class TaskOutput(BaseModel):
"""Class that represents the result of a task."""
key: Optional[str] = Field(description="Key of the task", default=None)
description: str = Field(description="Description of the task")
name: Optional[str] = Field(description="Name of the task", default=None)
expected_output: Optional[str] = Field(

View File

@@ -30,8 +30,14 @@ class TokenCalcHandler(CustomLogger):
if hasattr(usage, "prompt_tokens"):
self.token_cost_process.sum_prompt_tokens(usage.prompt_tokens)
if hasattr(usage, "completion_tokens"):
self.token_cost_process.sum_completion_tokens(usage.completion_tokens)
if hasattr(usage, "prompt_tokens_details") and usage.prompt_tokens_details:
self.token_cost_process.sum_completion_tokens(
usage.completion_tokens
)
if (
hasattr(usage, "prompt_tokens_details")
and usage.prompt_tokens_details
and usage.prompt_tokens_details.cached_tokens
):
self.token_cost_process.sum_cached_prompt_tokens(
usage.prompt_tokens_details.cached_tokens
)

View File

@@ -654,3 +654,104 @@ def test_flow_plotting():
assert isinstance(received_events[0], FlowPlotEvent)
assert received_events[0].flow_name == "StatelessFlow"
assert isinstance(received_events[0].timestamp, datetime)
def test_multiple_routers_from_same_trigger():
"""Test that multiple routers triggered by the same method all activate their listeners."""
execution_order = []
class MultiRouterFlow(Flow):
def __init__(self):
super().__init__()
# Set diagnosed conditions to trigger all routers
self.state["diagnosed_conditions"] = "DHA" # Contains D, H, and A
@start()
def scan_medical(self):
execution_order.append("scan_medical")
return "scan_complete"
@router(scan_medical)
def diagnose_conditions(self):
execution_order.append("diagnose_conditions")
return "diagnosis_complete"
@router(diagnose_conditions)
def diabetes_router(self):
execution_order.append("diabetes_router")
if "D" in self.state["diagnosed_conditions"]:
return "diabetes"
return None
@listen("diabetes")
def diabetes_analysis(self):
execution_order.append("diabetes_analysis")
return "diabetes_analysis_complete"
@router(diagnose_conditions)
def hypertension_router(self):
execution_order.append("hypertension_router")
if "H" in self.state["diagnosed_conditions"]:
return "hypertension"
return None
@listen("hypertension")
def hypertension_analysis(self):
execution_order.append("hypertension_analysis")
return "hypertension_analysis_complete"
@router(diagnose_conditions)
def anemia_router(self):
execution_order.append("anemia_router")
if "A" in self.state["diagnosed_conditions"]:
return "anemia"
return None
@listen("anemia")
def anemia_analysis(self):
execution_order.append("anemia_analysis")
return "anemia_analysis_complete"
flow = MultiRouterFlow()
flow.kickoff()
# Verify all methods were called
assert "scan_medical" in execution_order
assert "diagnose_conditions" in execution_order
# Verify all routers were called
assert "diabetes_router" in execution_order
assert "hypertension_router" in execution_order
assert "anemia_router" in execution_order
# Verify all listeners were called - this is the key test for the fix
assert "diabetes_analysis" in execution_order
assert "hypertension_analysis" in execution_order
assert "anemia_analysis" in execution_order
# Verify execution order constraints
assert execution_order.index("diagnose_conditions") > execution_order.index(
"scan_medical"
)
# All routers should execute after diagnose_conditions
assert execution_order.index("diabetes_router") > execution_order.index(
"diagnose_conditions"
)
assert execution_order.index("hypertension_router") > execution_order.index(
"diagnose_conditions"
)
assert execution_order.index("anemia_router") > execution_order.index(
"diagnose_conditions"
)
# All analyses should execute after their respective routers
assert execution_order.index("diabetes_analysis") > execution_order.index(
"diabetes_router"
)
assert execution_order.index("hypertension_analysis") > execution_order.index(
"hypertension_router"
)
assert execution_order.index("anemia_analysis") > execution_order.index(
"anemia_router"
)

View File

@@ -111,6 +111,7 @@ def test_task_callback():
task.execute_sync(agent=researcher)
task_completed.assert_called_once_with(task.output)
assert task.output.key == task.key
assert task.output.description == task.description
assert task.output.expected_output == task.expected_output
assert task.output.name == task.name
@@ -149,6 +150,7 @@ def test_task_callback_returns_task_output():
assert isinstance(callback_data, str)
output_dict = json.loads(callback_data)
expected_output = {
"key": task.key,
"description": task.description,
"raw": "exported_ok",
"pydantic": None,