Compare commits

...

17 Commits

Author SHA1 Message Date
Devin AI
6b354201cb Fix lint issues: suppress unused chromadb import warnings and update remaining external memory test
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:45:39 +00:00
Devin AI
5566f4716b Update external memory tests to handle optional ChromaDB dependency
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:38:50 +00:00
Devin AI
ea5f6b592c Update memory tests to handle optional ChromaDB dependency
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:37:44 +00:00
Devin AI
a0057afe45 Fix Collection type annotation and test mocking issues
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:27:06 +00:00
Devin AI
ebcb6c6f90 Fix CI failures: restore missing error classes and resolve type issues
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:19:30 +00:00
Devin AI
c7e83a7529 Update optional dependencies tests
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:10:46 +00:00
Devin AI
f0b1cc23f4 Add ChromaDBRequiredError class and improve error handling
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:10:35 +00:00
Devin AI
7b129fc847 Fix #2919: Make chromadb an optional dependency to resolve package conflicts
Co-Authored-By: João <joao@crewai.com>
2025-05-30 08:56:35 +00:00
Lucas Gomide
55ed91e313 feat: log usage tools when called by LLM (#2916)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* feat: log usage tools when called by LLM

* feat: print llm tool usage in console
2025-05-29 14:34:34 -04:00
Mark McDonald
e676c83d7f docs: Adds Gemini example to OpenAI-compat section (#2915)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
2025-05-29 09:52:32 -04:00
Tony Kipkemboi
844d142f2e docs: docs restructuring and community analytics implementation (#2913)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* docs: Fix major memory system documentation issues - Remove misleading deprecation warnings, fix confusing comments, clearly separate three memory approaches, provide accurate examples that match implementation

* fix: Correct broken image paths in README - Update crewai_logo.png and asset.png paths to point to docs/images/ directory instead of docs/ directly

* docs: Add system prompt transparency and customization guide - Add 'Understanding Default System Instructions' section to address black-box concerns - Document what CrewAI automatically injects into prompts - Provide code examples to inspect complete system prompts - Show 3 methods to override default instructions - Include observability integration examples with Langfuse - Add best practices for production prompt management

* docs: Fix implementation accuracy issues in memory documentation - Fix Ollama embedding URL parameter and remove unsupported Cohere input_type parameter

* docs: Reference observability docs instead of showing specific tool examples

* docs: Reorganize knowledge documentation for better developer experience - Move quickstart examples right after overview for immediate hands-on experience - Create logical learning progression: basics → configuration → advanced → troubleshooting - Add comprehensive agent vs crew knowledge guide with working examples - Consolidate debugging and troubleshooting in dedicated section - Organize best practices by topic in accordion format - Improve content flow from simple concepts to advanced features - Ensure all examples are grounded in actual codebase implementation

* docs: enhance custom LLM documentation with comprehensive examples and accurate imports

* docs: reorganize observability tools into dedicated section with comprehensive overview and improved navigation

* docs: rename how-to section to learn and add comprehensive overview page

* docs: finalize documentation reorganization and update navigation labels

* docs: enhance README with comprehensive badges, navigation links, and getting started video

* Add Common Room tracking to documentation - Script will track all documentation page views - Follows Mintlify custom JS implementation pattern - Enables comprehensive docs usage insights

* docs: move human-in-the-loop guide to enterprise section and update navigation - Move human-in-the-loop.mdx from learn to enterprise/guides - Update docs.json navigation to reflect new organization
2025-05-28 10:53:55 -04:00
Lorenze Jay
bcc694348e chore: Bump version to 0.121.1 in project files and update dependencies (#2912)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-05-27 10:46:20 -07:00
Tony Kipkemboi
dfc4255f2f docs: Add transparency features for prompts and memory systems (#2902)
* docs: Fix major memory system documentation issues - Remove misleading deprecation warnings, fix confusing comments, clearly separate three memory approaches, provide accurate examples that match implementation

* fix: Correct broken image paths in README - Update crewai_logo.png and asset.png paths to point to docs/images/ directory instead of docs/ directly

* docs: Add system prompt transparency and customization guide - Add 'Understanding Default System Instructions' section to address black-box concerns - Document what CrewAI automatically injects into prompts - Provide code examples to inspect complete system prompts - Show 3 methods to override default instructions - Include observability integration examples with Langfuse - Add best practices for production prompt management

* docs: Fix implementation accuracy issues in memory documentation - Fix Ollama embedding URL parameter and remove unsupported Cohere input_type parameter

* docs: Reference observability docs instead of showing specific tool examples

* docs: Reorganize knowledge documentation for better developer experience - Move quickstart examples right after overview for immediate hands-on experience - Create logical learning progression: basics → configuration → advanced → troubleshooting - Add comprehensive agent vs crew knowledge guide with working examples - Consolidate debugging and troubleshooting in dedicated section - Organize best practices by topic in accordion format - Improve content flow from simple concepts to advanced features - Ensure all examples are grounded in actual codebase implementation

* docs: enhance custom LLM documentation with comprehensive examples and accurate imports

* docs: reorganize observability tools into dedicated section with comprehensive overview and improved navigation

* docs: rename how-to section to learn and add comprehensive overview page

* docs: finalize documentation reorganization and update navigation labels

* docs: enhance README with comprehensive badges, navigation links, and getting started video
2025-05-27 10:08:40 -07:00
João Moura
4e0ce9adfe fixing
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
2025-05-27 00:33:50 -07:00
João Moura
42dacb2862 remove unnecesary imrpots 2025-05-27 00:33:50 -07:00
devin-ai-integration[bot]
22db4aae81 Add usage limit feature to BaseTool class (#2904)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Add usage limit feature to BaseTool class

- Add max_usage_count and current_usage_count attributes to BaseTool
- Implement usage limit checking in ToolUsage._use method
- Add comprehensive tests for usage limit functionality
- Maintain backward compatibility with None default for unlimited usage

Co-Authored-By: Joe Moura <joao@crewai.com>

* Fix CI failures and address code review feedback

- Add max_usage_count/current_usage_count to CrewStructuredTool
- Add input validation for positive max_usage_count
- Add reset_usage_count method to BaseTool
- Extract usage limit check into separate method
- Add comprehensive edge case tests
- Add proper type hints throughout
- Fix linting issues

Co-Authored-By: Joe Moura <joao@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
2025-05-26 08:53:10 -07:00
João Moura
7fe193866d improviong reasoning prompt
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-05-25 15:24:59 -07:00
70 changed files with 3583 additions and 1404 deletions

View File

@@ -1,27 +1,70 @@
<div align="center">
<p align="center">
<a href="https://github.com/crewAIInc/crewAI">
<img src="docs/images/crewai_logo.png" width="600px" alt="Open source Multi-AI Agent orchestration framework">
</a>
</p>
<p align="center" style="display: flex; justify-content: center; gap: 20px; align-items: center;">
<a href="https://trendshift.io/repositories/11239" target="_blank">
<img src="https://trendshift.io/api/badge/repositories/11239" alt="crewAIInc%2FcrewAI | Trendshift" style="width: 250px; height: 55px;" width="250" height="55"/>
</a>
</p>
![Logo of CrewAI](./docs/images/crewai_logo.png)
<p align="center">
<a href="https://crewai.com">Homepage</a>
·
<a href="https://docs.crewai.com">Docs</a>
·
<a href="https://app.crewai.com">Start Cloud Trial</a>
·
<a href="https://blog.crewai.com">Blog</a>
·
<a href="https://community.crewai.com">Forum</a>
</p>
</div>
<p align="center">
<a href="https://github.com/crewAIInc/crewAI">
<img src="https://img.shields.io/github/stars/crewAIInc/crewAI" alt="GitHub Repo stars">
</a>
<a href="https://github.com/crewAIInc/crewAI/network/members">
<img src="https://img.shields.io/github/forks/crewAIInc/crewAI" alt="GitHub forks">
</a>
<a href="https://github.com/crewAIInc/crewAI/issues">
<img src="https://img.shields.io/github/issues/crewAIInc/crewAI" alt="GitHub issues">
</a>
<a href="https://github.com/crewAIInc/crewAI/pulls">
<img src="https://img.shields.io/github/issues-pr/crewAIInc/crewAI" alt="GitHub pull requests">
</a>
<a href="https://opensource.org/licenses/MIT">
<img src="https://img.shields.io/badge/License-MIT-green.svg" alt="License: MIT">
</a>
</p>
<p align="center">
<a href="https://pypi.org/project/crewai/">
<img src="https://img.shields.io/pypi/v/crewai" alt="PyPI version">
</a>
<a href="https://pypi.org/project/crewai/">
<img src="https://img.shields.io/pypi/dm/crewai" alt="PyPI downloads">
</a>
<a href="https://twitter.com/crewAIInc">
<img src="https://img.shields.io/twitter/follow/crewAIInc?style=social" alt="Twitter Follow">
</a>
</p>
### Fast and Flexible Multi-Agent Automation Framework
CrewAI is a lean, lightning-fast Python framework built entirely from
scratch—completely **independent of LangChain or other agent frameworks**.
It empowers developers with both high-level simplicity and precise low-level
control, ideal for creating autonomous AI agents tailored to any scenario.
> CrewAI is a lean, lightning-fast Python framework built entirely from scratch—completely **independent of LangChain or other agent frameworks**.
> It empowers developers with both high-level simplicity and precise low-level control, ideal for creating autonomous AI agents tailored to any scenario.
- **CrewAI Crews**: Optimize for autonomy and collaborative intelligence.
- **CrewAI Flows**: Enable granular, event-driven control, single LLM calls for precise task orchestration and supports Crews natively
With over 100,000 developers certified through our community courses at
[learn.crewai.com](https://learn.crewai.com), CrewAI is rapidly becoming the
With over 100,000 developers certified through our community courses at [learn.crewai.com](https://learn.crewai.com), CrewAI is rapidly becoming the
standard for enterprise-ready AI automation.
# CrewAI Enterprise Suite
CrewAI Enterprise Suite is a comprehensive bundle tailored for organizations
that require secure, scalable, and easy-to-manage agent-driven automation.
CrewAI Enterprise Suite is a comprehensive bundle tailored for organizations that require secure, scalable, and easy-to-manage agent-driven automation.
You can try one part of the suite the [Crew Control Plane for free](https://app.crewai.com)
@@ -35,21 +78,9 @@ You can try one part of the suite the [Crew Control Plane for free](https://app.
- **24/7 Support**: Dedicated enterprise support to ensure uninterrupted operation and quick resolution of issues.
- **On-premise and Cloud Deployment Options**: Deploy CrewAI Enterprise on-premise or in the cloud, depending on your security and compliance requirements.
CrewAI Enterprise is designed for enterprises seeking a powerful,
reliable solution to transform complex business processes into efficient,
CrewAI Enterprise is designed for enterprises seeking a powerful, reliable solution to transform complex business processes into efficient,
intelligent automations.
<h3>
[Homepage](https://www.crewai.com/) | [Documentation](https://docs.crewai.com/) | [Chat with Docs](https://chatg.pt/DWjSBZn) | [Discourse](https://community.crewai.com)
</h3>
[![GitHub Repo stars](https://img.shields.io/github/stars/joaomdmoura/crewAI)](https://github.com/crewAIInc/crewAI)
[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT)
</div>
## Table of contents
- [Why CrewAI?](#why-crewai)
@@ -88,7 +119,12 @@ CrewAI empowers developers and enterprises to confidently build intelligent auto
## Getting Started
### Learning Resources
Setup and run your first CrewAI agents by following this tutorial.
[![CrewAI Getting Started Tutorial](https://img.youtube.com/vi/-kSOTtYzgEw/hqdefault.jpg)](https://www.youtube.com/watch?v=-kSOTtYzgEw "CrewAI Getting Started Tutorial")
###
Learning Resources
Learn CrewAI through our comprehensive courses:

View File

@@ -0,0 +1,18 @@
(function() {
if (typeof window === 'undefined') return;
if (typeof window.signals !== 'undefined') return;
var script = document.createElement('script');
script.src = 'https://cdn.cr-relay.com/v1/site/883520f4-c431-44be-80e7-e123a1ee7a2b/signals.js';
script.async = true;
window.signals = Object.assign(
[],
['page', 'identify', 'form'].reduce(function (acc, method){
acc[method] = function () {
signals.push([method, arguments]);
return signals;
};
return acc;
}, {})
);
document.head.appendChild(script);
})();

File diff suppressed because it is too large Load Diff

View File

@@ -46,22 +46,96 @@ crew = Crew(
- **Storage Location**: Platform-specific location via `appdirs` package
- **Custom Storage Directory**: Set `CREWAI_STORAGE_DIR` environment variable
### Custom Embedder Configuration
## Storage Location Transparency
<Info>
**Understanding Storage Locations**: CrewAI uses platform-specific directories to store memory and knowledge files following OS conventions. Understanding these locations helps with production deployments, backups, and debugging.
</Info>
### Where CrewAI Stores Files
By default, CrewAI uses the `appdirs` library to determine storage locations following platform conventions. Here's exactly where your files are stored:
#### Default Storage Locations by Platform
**macOS:**
```
~/Library/Application Support/CrewAI/{project_name}/
├── knowledge/ # Knowledge base ChromaDB files
├── short_term_memory/ # Short-term memory ChromaDB files
├── long_term_memory/ # Long-term memory ChromaDB files
├── entities/ # Entity memory ChromaDB files
└── long_term_memory_storage.db # SQLite database
```
**Linux:**
```
~/.local/share/CrewAI/{project_name}/
├── knowledge/
├── short_term_memory/
├── long_term_memory/
├── entities/
└── long_term_memory_storage.db
```
**Windows:**
```
C:\Users\{username}\AppData\Local\CrewAI\{project_name}\
├── knowledge\
├── short_term_memory\
├── long_term_memory\
├── entities\
└── long_term_memory_storage.db
```
### Finding Your Storage Location
To see exactly where CrewAI is storing files on your system:
```python
from crewai.utilities.paths import db_storage_path
import os
# Get the base storage path
storage_path = db_storage_path()
print(f"CrewAI storage location: {storage_path}")
# List all CrewAI storage directories
if os.path.exists(storage_path):
print("\nStored files and directories:")
for item in os.listdir(storage_path):
item_path = os.path.join(storage_path, item)
if os.path.isdir(item_path):
print(f"📁 {item}/")
# Show ChromaDB collections
if os.path.exists(item_path):
for subitem in os.listdir(item_path):
print(f" └── {subitem}")
else:
print(f"📄 {item}")
else:
print("No CrewAI storage directory found yet.")
```
### Controlling Storage Locations
#### Option 1: Environment Variable (Recommended)
```python
import os
from crewai import Crew
# Set custom storage location
os.environ["CREWAI_STORAGE_DIR"] = "./my_project_storage"
# All memory and knowledge will now be stored in ./my_project_storage/
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
embedder={
"provider": "openai",
"config": {
"model": "text-embedding-3-small"
}
}
memory=True
)
```
### Custom Storage Paths
#### Option 2: Custom Storage Paths
```python
import os
from crewai import Crew
@@ -69,16 +143,547 @@ from crewai.memory import LongTermMemory
from crewai.memory.storage.ltm_sqlite_storage import LTMSQLiteStorage
# Configure custom storage location
custom_storage_path = "./storage"
os.makedirs(custom_storage_path, exist_ok=True)
crew = Crew(
memory=True,
long_term_memory=LongTermMemory(
storage=LTMSQLiteStorage(
db_path=os.getenv("CREWAI_STORAGE_DIR", "./storage") + "/memory.db"
db_path=f"{custom_storage_path}/memory.db"
)
)
)
```
#### Option 3: Project-Specific Storage
```python
import os
from pathlib import Path
# Store in project directory
project_root = Path(__file__).parent
storage_dir = project_root / "crewai_storage"
os.environ["CREWAI_STORAGE_DIR"] = str(storage_dir)
# Now all storage will be in your project directory
```
### Embedding Provider Defaults
<Info>
**Default Embedding Provider**: CrewAI defaults to OpenAI embeddings for consistency and reliability. You can easily customize this to match your LLM provider or use local embeddings.
</Info>
#### Understanding Default Behavior
```python
# When using Claude as your LLM...
from crewai import Agent, LLM
agent = Agent(
role="Analyst",
goal="Analyze data",
backstory="Expert analyst",
llm=LLM(provider="anthropic", model="claude-3-sonnet") # Using Claude
)
# CrewAI will use OpenAI embeddings by default for consistency
# You can easily customize this to match your preferred provider
```
#### Customizing Embedding Providers
```python
from crewai import Crew
# Option 1: Match your LLM provider
crew = Crew(
agents=[agent],
tasks=[task],
memory=True,
embedder={
"provider": "anthropic", # Match your LLM provider
"config": {
"api_key": "your-anthropic-key",
"model": "text-embedding-3-small"
}
}
)
# Option 2: Use local embeddings (no external API calls)
crew = Crew(
agents=[agent],
tasks=[task],
memory=True,
embedder={
"provider": "ollama",
"config": {"model": "mxbai-embed-large"}
}
)
```
### Debugging Storage Issues
#### Check Storage Permissions
```python
import os
from crewai.utilities.paths import db_storage_path
storage_path = db_storage_path()
print(f"Storage path: {storage_path}")
print(f"Path exists: {os.path.exists(storage_path)}")
print(f"Is writable: {os.access(storage_path, os.W_OK) if os.path.exists(storage_path) else 'Path does not exist'}")
# Create with proper permissions
if not os.path.exists(storage_path):
os.makedirs(storage_path, mode=0o755, exist_ok=True)
print(f"Created storage directory: {storage_path}")
```
#### Inspect ChromaDB Collections
```python
import chromadb
from crewai.utilities.paths import db_storage_path
# Connect to CrewAI's ChromaDB
storage_path = db_storage_path()
chroma_path = os.path.join(storage_path, "knowledge")
if os.path.exists(chroma_path):
client = chromadb.PersistentClient(path=chroma_path)
collections = client.list_collections()
print("ChromaDB Collections:")
for collection in collections:
print(f" - {collection.name}: {collection.count()} documents")
else:
print("No ChromaDB storage found")
```
#### Reset Storage (Debugging)
```python
from crewai import Crew
# Reset all memory storage
crew = Crew(agents=[...], tasks=[...], memory=True)
# Reset specific memory types
crew.reset_memories(command_type='short') # Short-term memory
crew.reset_memories(command_type='long') # Long-term memory
crew.reset_memories(command_type='entity') # Entity memory
crew.reset_memories(command_type='knowledge') # Knowledge storage
```
### Production Best Practices
1. **Set `CREWAI_STORAGE_DIR`** to a known location in production for better control
2. **Choose explicit embedding providers** to match your LLM setup
3. **Monitor storage directory size** for large-scale deployments
4. **Include storage directories** in your backup strategy
5. **Set appropriate file permissions** (0o755 for directories, 0o644 for files)
6. **Use project-relative paths** for containerized deployments
### Common Storage Issues
**"ChromaDB permission denied" errors:**
```bash
# Fix permissions
chmod -R 755 ~/.local/share/CrewAI/
```
**"Database is locked" errors:**
```python
# Ensure only one CrewAI instance accesses storage
import fcntl
import os
storage_path = db_storage_path()
lock_file = os.path.join(storage_path, ".crewai.lock")
with open(lock_file, 'w') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
# Your CrewAI code here
```
**Storage not persisting between runs:**
```python
# Verify storage location is consistent
import os
print("CREWAI_STORAGE_DIR:", os.getenv("CREWAI_STORAGE_DIR"))
print("Current working directory:", os.getcwd())
print("Computed storage path:", db_storage_path())
```
## Custom Embedder Configuration
CrewAI supports multiple embedding providers to give you flexibility in choosing the best option for your use case. Here's a comprehensive guide to configuring different embedding providers for your memory system.
### Why Choose Different Embedding Providers?
- **Cost Optimization**: Local embeddings (Ollama) are free after initial setup
- **Privacy**: Keep your data local with Ollama or use your preferred cloud provider
- **Performance**: Some models work better for specific domains or languages
- **Consistency**: Match your embedding provider with your LLM provider
- **Compliance**: Meet specific regulatory or organizational requirements
### OpenAI Embeddings (Default)
OpenAI provides reliable, high-quality embeddings that work well for most use cases.
```python
from crewai import Crew
# Basic OpenAI configuration (uses environment OPENAI_API_KEY)
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
embedder={
"provider": "openai",
"config": {
"model": "text-embedding-3-small" # or "text-embedding-3-large"
}
}
)
# Advanced OpenAI configuration
crew = Crew(
memory=True,
embedder={
"provider": "openai",
"config": {
"api_key": "your-openai-api-key", # Optional: override env var
"model": "text-embedding-3-large",
"dimensions": 1536, # Optional: reduce dimensions for smaller storage
"organization_id": "your-org-id" # Optional: for organization accounts
}
}
)
```
### Azure OpenAI Embeddings
For enterprise users with Azure OpenAI deployments.
```python
crew = Crew(
memory=True,
embedder={
"provider": "openai", # Use openai provider for Azure
"config": {
"api_key": "your-azure-api-key",
"api_base": "https://your-resource.openai.azure.com/",
"api_type": "azure",
"api_version": "2023-05-15",
"model": "text-embedding-3-small",
"deployment_id": "your-deployment-name" # Azure deployment name
}
}
)
```
### Google AI Embeddings
Use Google's text embedding models for integration with Google Cloud services.
```python
crew = Crew(
memory=True,
embedder={
"provider": "google",
"config": {
"api_key": "your-google-api-key",
"model": "text-embedding-004" # or "text-embedding-preview-0409"
}
}
)
```
### Vertex AI Embeddings
For Google Cloud users with Vertex AI access.
```python
crew = Crew(
memory=True,
embedder={
"provider": "vertexai",
"config": {
"project_id": "your-gcp-project-id",
"region": "us-central1", # or your preferred region
"api_key": "your-service-account-key",
"model_name": "textembedding-gecko"
}
}
)
```
### Ollama Embeddings (Local)
Run embeddings locally for privacy and cost savings.
```python
# First, install and run Ollama locally, then pull an embedding model:
# ollama pull mxbai-embed-large
crew = Crew(
memory=True,
embedder={
"provider": "ollama",
"config": {
"model": "mxbai-embed-large", # or "nomic-embed-text"
"url": "http://localhost:11434/api/embeddings" # Default Ollama URL
}
}
)
# For custom Ollama installations
crew = Crew(
memory=True,
embedder={
"provider": "ollama",
"config": {
"model": "mxbai-embed-large",
"url": "http://your-ollama-server:11434/api/embeddings"
}
}
)
```
### Cohere Embeddings
Use Cohere's embedding models for multilingual support.
```python
crew = Crew(
memory=True,
embedder={
"provider": "cohere",
"config": {
"api_key": "your-cohere-api-key",
"model": "embed-english-v3.0" # or "embed-multilingual-v3.0"
}
}
)
```
### VoyageAI Embeddings
High-performance embeddings optimized for retrieval tasks.
```python
crew = Crew(
memory=True,
embedder={
"provider": "voyageai",
"config": {
"api_key": "your-voyage-api-key",
"model": "voyage-large-2", # or "voyage-code-2" for code
"input_type": "document" # or "query"
}
}
)
```
### AWS Bedrock Embeddings
For AWS users with Bedrock access.
```python
crew = Crew(
memory=True,
embedder={
"provider": "bedrock",
"config": {
"aws_access_key_id": "your-access-key",
"aws_secret_access_key": "your-secret-key",
"region_name": "us-east-1",
"model": "amazon.titan-embed-text-v1"
}
}
)
```
### Hugging Face Embeddings
Use open-source models from Hugging Face.
```python
crew = Crew(
memory=True,
embedder={
"provider": "huggingface",
"config": {
"api_key": "your-hf-token", # Optional for public models
"model": "sentence-transformers/all-MiniLM-L6-v2",
"api_url": "https://api-inference.huggingface.co" # or your custom endpoint
}
}
)
```
### IBM Watson Embeddings
For IBM Cloud users.
```python
crew = Crew(
memory=True,
embedder={
"provider": "watson",
"config": {
"api_key": "your-watson-api-key",
"url": "your-watson-instance-url",
"model": "ibm/slate-125m-english-rtrvr"
}
}
)
```
### Choosing the Right Embedding Provider
| Provider | Best For | Pros | Cons |
|:---------|:----------|:------|:------|
| **OpenAI** | General use, reliability | High quality, well-tested | Cost, requires API key |
| **Ollama** | Privacy, cost savings | Free, local, private | Requires local setup |
| **Google AI** | Google ecosystem | Good performance | Requires Google account |
| **Azure OpenAI** | Enterprise, compliance | Enterprise features | Complex setup |
| **Cohere** | Multilingual content | Great language support | Specialized use case |
| **VoyageAI** | Retrieval tasks | Optimized for search | Newer provider |
### Environment Variable Configuration
For security, store API keys in environment variables:
```python
import os
# Set environment variables
os.environ["OPENAI_API_KEY"] = "your-openai-key"
os.environ["GOOGLE_API_KEY"] = "your-google-key"
os.environ["COHERE_API_KEY"] = "your-cohere-key"
# Use without exposing keys in code
crew = Crew(
memory=True,
embedder={
"provider": "openai",
"config": {
"model": "text-embedding-3-small"
# API key automatically loaded from environment
}
}
)
```
### Testing Different Embedding Providers
Compare embedding providers for your specific use case:
```python
from crewai import Crew
from crewai.utilities.paths import db_storage_path
# Test different providers with the same data
providers_to_test = [
{
"name": "OpenAI",
"config": {
"provider": "openai",
"config": {"model": "text-embedding-3-small"}
}
},
{
"name": "Ollama",
"config": {
"provider": "ollama",
"config": {"model": "mxbai-embed-large"}
}
}
]
for provider in providers_to_test:
print(f"\nTesting {provider['name']} embeddings...")
# Create crew with specific embedder
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
embedder=provider['config']
)
# Run your test and measure performance
result = crew.kickoff()
print(f"{provider['name']} completed successfully")
```
### Troubleshooting Embedding Issues
**Model not found errors:**
```python
# Verify model availability
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
configurator = EmbeddingConfigurator()
try:
embedder = configurator.configure_embedder({
"provider": "ollama",
"config": {"model": "mxbai-embed-large"}
})
print("Embedder configured successfully")
except Exception as e:
print(f"Configuration error: {e}")
```
**API key issues:**
```python
import os
# Check if API keys are set
required_keys = ["OPENAI_API_KEY", "GOOGLE_API_KEY", "COHERE_API_KEY"]
for key in required_keys:
if os.getenv(key):
print(f"✅ {key} is set")
else:
print(f"❌ {key} is not set")
```
**Performance comparison:**
```python
import time
def test_embedding_performance(embedder_config, test_text="This is a test document"):
start_time = time.time()
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
embedder=embedder_config
)
# Simulate memory operation
crew.kickoff()
end_time = time.time()
return end_time - start_time
# Compare performance
openai_time = test_embedding_performance({
"provider": "openai",
"config": {"model": "text-embedding-3-small"}
})
ollama_time = test_embedding_performance({
"provider": "ollama",
"config": {"model": "mxbai-embed-large"}
})
print(f"OpenAI: {openai_time:.2f}s")
print(f"Ollama: {ollama_time:.2f}s")
```
## 2. User Memory with Mem0 (Legacy)
<Warning>

View File

@@ -164,8 +164,7 @@
"tools/ai-ml/llamaindextool",
"tools/ai-ml/langchaintool",
"tools/ai-ml/ragtool",
"tools/ai-ml/codeinterpretertool",
"tools/ai-ml/patronustools"
"tools/ai-ml/codeinterpretertool"
]
},
{
@@ -190,40 +189,42 @@
]
},
{
"group": "Agent Monitoring & Observability",
"group": "Observability",
"pages": [
"how-to/agentops-observability",
"how-to/arize-phoenix-observability",
"how-to/langfuse-observability",
"how-to/langtrace-observability",
"how-to/mlflow-observability",
"how-to/openlit-observability",
"how-to/opik-observability",
"how-to/portkey-observability",
"how-to/weave-integration"
"observability/overview",
"observability/agentops",
"observability/arize-phoenix",
"observability/langfuse",
"observability/langtrace",
"observability/mlflow",
"observability/openlit",
"observability/opik",
"observability/patronus-evaluation",
"observability/portkey",
"observability/weave"
]
},
{
"group": "Learn",
"pages": [
"how-to/conditional-tasks",
"how-to/coding-agents",
"how-to/create-custom-tools",
"how-to/custom-llm",
"how-to/custom-manager-agent",
"how-to/customizing-agents",
"how-to/dalle-image-generation",
"how-to/force-tool-output-as-result",
"how-to/hierarchical-process",
"how-to/human-in-the-loop",
"how-to/human-input-on-execution",
"how-to/kickoff-async",
"how-to/kickoff-for-each",
"how-to/llm-connections",
"how-to/multimodal-agents",
"how-to/replay-tasks-from-latest-crew-kickoff",
"how-to/sequential-process",
"how-to/using-annotations"
"learn/overview",
"learn/conditional-tasks",
"learn/coding-agents",
"learn/create-custom-tools",
"learn/custom-llm",
"learn/custom-manager-agent",
"learn/customizing-agents",
"learn/dalle-image-generation",
"learn/force-tool-output-as-result",
"learn/hierarchical-process",
"learn/human-input-on-execution",
"learn/kickoff-async",
"learn/kickoff-for-each",
"learn/llm-connections",
"learn/multimodal-agents",
"learn/replay-tasks-from-latest-crew-kickoff",
"learn/sequential-process",
"learn/using-annotations"
]
},
{
@@ -267,6 +268,7 @@
"enterprise/guides/slack-trigger",
"enterprise/guides/team-management",
"enterprise/guides/webhook-automation",
"enterprise/guides/human-in-the-loop",
"enterprise/guides/zapier-trigger"
]
},
@@ -352,7 +354,7 @@
"navbar": {
"links": [
{
"label": "Start Free Trial",
"label": "Start Cloud Trial",
"href": "https://app.crewai.com"
}
],

View File

@@ -0,0 +1,78 @@
---
title: "HITL Workflows"
description: "Learn how to implement Human-In-The-Loop workflows in CrewAI for enhanced decision-making"
icon: "user-check"
---
Human-In-The-Loop (HITL) is a powerful approach that combines artificial intelligence with human expertise to enhance decision-making and improve task outcomes. This guide shows you how to implement HITL within CrewAI.
## Setting Up HITL Workflows
<Steps>
<Step title="Configure Your Task">
Set up your task with human input enabled:
<Frame>
<img src="/images/enterprise/crew-human-input.png" alt="Crew Human Input" />
</Frame>
</Step>
<Step title="Provide Webhook URL">
When kicking off your crew, include a webhook URL for human input:
<Frame>
<img src="/images/enterprise/crew-webhook-url.png" alt="Crew Webhook URL" />
</Frame>
</Step>
<Step title="Receive Webhook Notification">
Once the crew completes the task requiring human input, you'll receive a webhook notification containing:
- **Execution ID**
- **Task ID**
- **Task output**
</Step>
<Step title="Review Task Output">
The system will pause in the `Pending Human Input` state. Review the task output carefully.
</Step>
<Step title="Submit Human Feedback">
Call the resume endpoint of your crew with the following information:
<Frame>
<img src="/images/enterprise/crew-resume-endpoint.png" alt="Crew Resume Endpoint" />
</Frame>
<Warning>
**Feedback Impact on Task Execution**:
It's crucial to exercise care when providing feedback, as the entire feedback content will be incorporated as additional context for further task executions.
</Warning>
This means:
- All information in your feedback becomes part of the task's context.
- Irrelevant details may negatively influence it.
- Concise, relevant feedback helps maintain task focus and efficiency.
- Always review your feedback carefully before submission to ensure it contains only pertinent information that will positively guide the task's execution.
</Step>
<Step title="Handle Negative Feedback">
If you provide negative feedback:
- The crew will retry the task with added context from your feedback.
- You'll receive another webhook notification for further review.
- Repeat steps 4-6 until satisfied.
</Step>
<Step title="Execution Continuation">
When you submit positive feedback, the execution will proceed to the next steps.
</Step>
</Steps>
## Best Practices
- **Be Specific**: Provide clear, actionable feedback that directly addresses the task at hand
- **Stay Relevant**: Only include information that will help improve the task execution
- **Be Timely**: Respond to HITL prompts promptly to avoid workflow delays
- **Review Carefully**: Double-check your feedback before submitting to ensure accuracy
## Common Use Cases
HITL workflows are particularly valuable for:
- Quality assurance and validation
- Complex decision-making scenarios
- Sensitive or high-stakes operations
- Creative tasks requiring human judgment
- Compliance and regulatory reviews

View File

@@ -6,7 +6,7 @@ icon: message-pen
## Why Customize Prompts?
Although CrewAI's default prompts work well for many scenarios, low-level customization opens the door to significantly more flexible and powerful agent behavior. Heres why you might want to take advantage of this deeper control:
Although CrewAI's default prompts work well for many scenarios, low-level customization opens the door to significantly more flexible and powerful agent behavior. Here's why you might want to take advantage of this deeper control:
1. **Optimize for specific LLMs** Different models (such as GPT-4, Claude, or Llama) thrive with prompt formats tailored to their unique architectures.
2. **Change the language** Build agents that operate exclusively in languages beyond English, handling nuances with precision.
@@ -20,13 +20,174 @@ This guide explores how to tap into CrewAI's prompts at a lower level, giving yo
Under the hood, CrewAI employs a modular prompt system that you can customize extensively:
- **Agent templates** Govern each agents approach to their assigned role.
- **Agent templates** Govern each agent's approach to their assigned role.
- **Prompt slices** Control specialized behaviors such as tasks, tool usage, and output structure.
- **Error handling** Direct how agents respond to failures, exceptions, or timeouts.
- **Tool-specific prompts** Define detailed instructions for how tools are invoked or utilized.
Check out the [original prompt templates in CrewAI's repository](https://github.com/crewAIInc/crewAI/blob/main/src/crewai/translations/en.json) to see how these elements are organized. From there, you can override or adapt them as needed to unlock advanced behaviors.
## Understanding Default System Instructions
<Warning>
**Production Transparency Issue**: CrewAI automatically injects default instructions into your prompts that you might not be aware of. This section explains what's happening under the hood and how to gain full control.
</Warning>
When you define an agent with `role`, `goal`, and `backstory`, CrewAI automatically adds additional system instructions that control formatting and behavior. Understanding these default injections is crucial for production systems where you need full prompt transparency.
### What CrewAI Automatically Injects
Based on your agent configuration, CrewAI adds different default instructions:
#### For Agents Without Tools
```text
"I MUST use these formats, my job depends on it!"
```
#### For Agents With Tools
```text
"IMPORTANT: Use the following format in your response:
Thought: you should always think about what to do
Action: the action to take, only one name of [tool_names]
Action Input: the input to the action, just a simple JSON object...
```
#### For Structured Outputs (JSON/Pydantic)
```text
"Ensure your final answer contains only the content in the following format: {output_format}
Ensure the final output does not include any code block markers like ```json or ```python."
```
### Viewing the Complete System Prompt
To see exactly what prompt is being sent to your LLM, you can inspect the generated prompt:
```python
from crewai import Agent, Crew, Task
from crewai.utilities.prompts import Prompts
# Create your agent
agent = Agent(
role="Data Analyst",
goal="Analyze data and provide insights",
backstory="You are an expert data analyst with 10 years of experience.",
verbose=True
)
# Create a sample task
task = Task(
description="Analyze the sales data and identify trends",
expected_output="A detailed analysis with key insights and trends",
agent=agent
)
# Create the prompt generator
prompt_generator = Prompts(
agent=agent,
has_tools=len(agent.tools) > 0,
use_system_prompt=agent.use_system_prompt
)
# Generate and inspect the actual prompt
generated_prompt = prompt_generator.task_execution()
# Print the complete system prompt that will be sent to the LLM
if "system" in generated_prompt:
print("=== SYSTEM PROMPT ===")
print(generated_prompt["system"])
print("\n=== USER PROMPT ===")
print(generated_prompt["user"])
else:
print("=== COMPLETE PROMPT ===")
print(generated_prompt["prompt"])
# You can also see how the task description gets formatted
print("\n=== TASK CONTEXT ===")
print(f"Task Description: {task.description}")
print(f"Expected Output: {task.expected_output}")
```
### Overriding Default Instructions
You have several options to gain full control over the prompts:
#### Option 1: Custom Templates (Recommended)
```python
from crewai import Agent
# Define your own system template without default instructions
custom_system_template = """You are {role}. {backstory}
Your goal is: {goal}
Respond naturally and conversationally. Focus on providing helpful, accurate information."""
custom_prompt_template = """Task: {input}
Please complete this task thoughtfully."""
agent = Agent(
role="Research Assistant",
goal="Help users find accurate information",
backstory="You are a helpful research assistant.",
system_template=custom_system_template,
prompt_template=custom_prompt_template,
use_system_prompt=True # Use separate system/user messages
)
```
#### Option 2: Custom Prompt File
Create a `custom_prompts.json` file to override specific prompt slices:
```json
{
"slices": {
"no_tools": "\nProvide your best answer in a natural, conversational way.",
"tools": "\nYou have access to these tools: {tools}\n\nUse them when helpful, but respond naturally.",
"formatted_task_instructions": "Format your response as: {output_format}"
}
}
```
Then use it in your crew:
```python
crew = Crew(
agents=[agent],
tasks=[task],
prompt_file="custom_prompts.json",
verbose=True
)
```
#### Option 3: Disable System Prompts for o1 Models
```python
agent = Agent(
role="Analyst",
goal="Analyze data",
backstory="Expert analyst",
use_system_prompt=False # Disables system prompt separation
)
```
### Debugging with Observability Tools
For production transparency, integrate with observability platforms to monitor all prompts and LLM interactions. This allows you to see exactly what prompts (including default instructions) are being sent to your LLMs.
See our [Observability documentation](/how-to/observability) for detailed integration guides with various platforms including Langfuse, MLflow, Weights & Biases, and custom logging solutions.
### Best Practices for Production
1. **Always inspect generated prompts** before deploying to production
2. **Use custom templates** when you need full control over prompt content
3. **Integrate observability tools** for ongoing prompt monitoring (see [Observability docs](/how-to/observability))
4. **Test with different LLMs** as default instructions may work differently across models
5. **Document your prompt customizations** for team transparency
<Tip>
The default instructions exist to ensure consistent agent behavior, but they can interfere with domain-specific requirements. Use the customization options above to maintain full control over your agent's behavior in production systems.
</Tip>
## Best Practices for Managing Prompt Files
When engaging in low-level prompt customization, follow these guidelines to keep things organized and maintainable:
@@ -44,7 +205,7 @@ One straightforward approach is to create a JSON file for the prompts you want t
1. Craft a JSON file with your updated prompt slices.
2. Reference that file via the `prompt_file` parameter in your Crew.
CrewAI then merges your customizations with the defaults, so you dont have to redefine every prompt. Heres how:
CrewAI then merges your customizations with the defaults, so you don't have to redefine every prompt. Here's how:
### Example: Basic Prompt Customization
@@ -93,14 +254,14 @@ With these few edits, you gain low-level control over how your agents communicat
## Optimizing for Specific Models
Different models thrive on differently structured prompts. Making deeper adjustments can significantly boost performance by aligning your prompts with a models nuances.
Different models thrive on differently structured prompts. Making deeper adjustments can significantly boost performance by aligning your prompts with a model's nuances.
### Example: Llama 3.3 Prompting Template
For instance, when dealing with Metas Llama 3.3, deeper-level customization may reflect the recommended structure described at:
For instance, when dealing with Meta's Llama 3.3, deeper-level customization may reflect the recommended structure described at:
https://www.llama.com/docs/model-cards-and-prompt-formats/llama3_1/#prompt-template
Heres an example to highlight how you might fine-tune an Agent to leverage Llama 3.3 in code:
Here's an example to highlight how you might fine-tune an Agent to leverage Llama 3.3 in code:
```python
from crewai import Agent, Crew, Task, Process
@@ -148,8 +309,8 @@ Through this deeper configuration, you can exercise comprehensive, low-level con
## Conclusion
Low-level prompt customization in CrewAI opens the door to super custom, complex use cases. By establishing well-organized prompt files (or direct inline templates), you can accommodate various models, languages, and specialized domains. This level of flexibility ensures you can craft precisely the AI behavior you need, all while knowing CrewAI still provides reliable defaults when you dont override them.
Low-level prompt customization in CrewAI opens the door to super custom, complex use cases. By establishing well-organized prompt files (or direct inline templates), you can accommodate various models, languages, and specialized domains. This level of flexibility ensures you can craft precisely the AI behavior you need, all while knowing CrewAI still provides reliable defaults when you don't override them.
<Check>
You now have the foundation for advanced prompt customizations in CrewAI. Whether youre adapting for model-specific structures or domain-specific constraints, this low-level approach lets you shape agent interactions in highly specialized ways.
You now have the foundation for advanced prompt customizations in CrewAI. Whether you're adapting for model-specific structures or domain-specific constraints, this low-level approach lets you shape agent interactions in highly specialized ways.
</Check>

View File

@@ -1,646 +0,0 @@
---
title: Custom LLM Implementation
description: Learn how to create custom LLM implementations in CrewAI.
icon: code
---
## Custom LLM Implementations
CrewAI now supports custom LLM implementations through the `BaseLLM` abstract base class. This allows you to create your own LLM implementations that don't rely on litellm's authentication mechanism.
To create a custom LLM implementation, you need to:
1. Inherit from the `BaseLLM` abstract base class
2. Implement the required methods:
- `call()`: The main method to call the LLM with messages
- `supports_function_calling()`: Whether the LLM supports function calling
- `supports_stop_words()`: Whether the LLM supports stop words
- `get_context_window_size()`: The context window size of the LLM
## Example: Basic Custom LLM
```python
from crewai import BaseLLM
from typing import Any, Dict, List, Optional, Union
class CustomLLM(BaseLLM):
def __init__(self, api_key: str, endpoint: str):
super().__init__() # Initialize the base class to set default attributes
if not api_key or not isinstance(api_key, str):
raise ValueError("Invalid API key: must be a non-empty string")
if not endpoint or not isinstance(endpoint, str):
raise ValueError("Invalid endpoint URL: must be a non-empty string")
self.api_key = api_key
self.endpoint = endpoint
self.stop = [] # You can customize stop words if needed
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""Call the LLM with the given messages.
Args:
messages: Input messages for the LLM.
tools: Optional list of tool schemas for function calling.
callbacks: Optional list of callback functions.
available_functions: Optional dict mapping function names to callables.
Returns:
Either a text response from the LLM or the result of a tool function call.
Raises:
TimeoutError: If the LLM request times out.
RuntimeError: If the LLM request fails for other reasons.
ValueError: If the response format is invalid.
"""
# Implement your own logic to call the LLM
# For example, using requests:
import requests
try:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
# Convert string message to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
data = {
"messages": messages,
"tools": tools
}
response = requests.post(
self.endpoint,
headers=headers,
json=data,
timeout=30 # Set a reasonable timeout
)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()["choices"][0]["message"]["content"]
except requests.Timeout:
raise TimeoutError("LLM request timed out")
except requests.RequestException as e:
raise RuntimeError(f"LLM request failed: {str(e)}")
except (KeyError, IndexError, ValueError) as e:
raise ValueError(f"Invalid response format: {str(e)}")
def supports_function_calling(self) -> bool:
"""Check if the LLM supports function calling.
Returns:
True if the LLM supports function calling, False otherwise.
"""
# Return True if your LLM supports function calling
return True
def supports_stop_words(self) -> bool:
"""Check if the LLM supports stop words.
Returns:
True if the LLM supports stop words, False otherwise.
"""
# Return True if your LLM supports stop words
return True
def get_context_window_size(self) -> int:
"""Get the context window size of the LLM.
Returns:
The context window size as an integer.
"""
# Return the context window size of your LLM
return 8192
```
## Error Handling Best Practices
When implementing custom LLMs, it's important to handle errors properly to ensure robustness and reliability. Here are some best practices:
### 1. Implement Try-Except Blocks for API Calls
Always wrap API calls in try-except blocks to handle different types of errors:
```python
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
try:
# API call implementation
response = requests.post(
self.endpoint,
headers=self.headers,
json=self.prepare_payload(messages),
timeout=30 # Set a reasonable timeout
)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()["choices"][0]["message"]["content"]
except requests.Timeout:
raise TimeoutError("LLM request timed out")
except requests.RequestException as e:
raise RuntimeError(f"LLM request failed: {str(e)}")
except (KeyError, IndexError, ValueError) as e:
raise ValueError(f"Invalid response format: {str(e)}")
```
### 2. Implement Retry Logic for Transient Failures
For transient failures like network issues or rate limiting, implement retry logic with exponential backoff:
```python
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
import time
max_retries = 3
retry_delay = 1 # seconds
for attempt in range(max_retries):
try:
response = requests.post(
self.endpoint,
headers=self.headers,
json=self.prepare_payload(messages),
timeout=30
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except (requests.Timeout, requests.ConnectionError) as e:
if attempt < max_retries - 1:
time.sleep(retry_delay * (2 ** attempt)) # Exponential backoff
continue
raise TimeoutError(f"LLM request failed after {max_retries} attempts: {str(e)}")
except requests.RequestException as e:
raise RuntimeError(f"LLM request failed: {str(e)}")
```
### 3. Validate Input Parameters
Always validate input parameters to prevent runtime errors:
```python
def __init__(self, api_key: str, endpoint: str):
super().__init__()
if not api_key or not isinstance(api_key, str):
raise ValueError("Invalid API key: must be a non-empty string")
if not endpoint or not isinstance(endpoint, str):
raise ValueError("Invalid endpoint URL: must be a non-empty string")
self.api_key = api_key
self.endpoint = endpoint
```
### 4. Handle Authentication Errors Gracefully
Provide clear error messages for authentication failures:
```python
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
try:
response = requests.post(self.endpoint, headers=self.headers, json=data)
if response.status_code == 401:
raise ValueError("Authentication failed: Invalid API key or token")
elif response.status_code == 403:
raise ValueError("Authorization failed: Insufficient permissions")
response.raise_for_status()
# Process response
except Exception as e:
# Handle error
raise
```
## Example: JWT-based Authentication
For services that use JWT-based authentication instead of API keys, you can implement a custom LLM like this:
```python
from crewai import BaseLLM, Agent, Task
from typing import Any, Dict, List, Optional, Union
class JWTAuthLLM(BaseLLM):
def __init__(self, jwt_token: str, endpoint: str):
super().__init__() # Initialize the base class to set default attributes
if not jwt_token or not isinstance(jwt_token, str):
raise ValueError("Invalid JWT token: must be a non-empty string")
if not endpoint or not isinstance(endpoint, str):
raise ValueError("Invalid endpoint URL: must be a non-empty string")
self.jwt_token = jwt_token
self.endpoint = endpoint
self.stop = [] # You can customize stop words if needed
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""Call the LLM with JWT authentication.
Args:
messages: Input messages for the LLM.
tools: Optional list of tool schemas for function calling.
callbacks: Optional list of callback functions.
available_functions: Optional dict mapping function names to callables.
Returns:
Either a text response from the LLM or the result of a tool function call.
Raises:
TimeoutError: If the LLM request times out.
RuntimeError: If the LLM request fails for other reasons.
ValueError: If the response format is invalid.
"""
# Implement your own logic to call the LLM with JWT authentication
import requests
try:
headers = {
"Authorization": f"Bearer {self.jwt_token}",
"Content-Type": "application/json"
}
# Convert string message to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
data = {
"messages": messages,
"tools": tools
}
response = requests.post(
self.endpoint,
headers=headers,
json=data,
timeout=30 # Set a reasonable timeout
)
if response.status_code == 401:
raise ValueError("Authentication failed: Invalid JWT token")
elif response.status_code == 403:
raise ValueError("Authorization failed: Insufficient permissions")
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()["choices"][0]["message"]["content"]
except requests.Timeout:
raise TimeoutError("LLM request timed out")
except requests.RequestException as e:
raise RuntimeError(f"LLM request failed: {str(e)}")
except (KeyError, IndexError, ValueError) as e:
raise ValueError(f"Invalid response format: {str(e)}")
def supports_function_calling(self) -> bool:
"""Check if the LLM supports function calling.
Returns:
True if the LLM supports function calling, False otherwise.
"""
return True
def supports_stop_words(self) -> bool:
"""Check if the LLM supports stop words.
Returns:
True if the LLM supports stop words, False otherwise.
"""
return True
def get_context_window_size(self) -> int:
"""Get the context window size of the LLM.
Returns:
The context window size as an integer.
"""
return 8192
```
## Troubleshooting
Here are some common issues you might encounter when implementing custom LLMs and how to resolve them:
### 1. Authentication Failures
**Symptoms**: 401 Unauthorized or 403 Forbidden errors
**Solutions**:
- Verify that your API key or JWT token is valid and not expired
- Check that you're using the correct authentication header format
- Ensure that your token has the necessary permissions
### 2. Timeout Issues
**Symptoms**: Requests taking too long or timing out
**Solutions**:
- Implement timeout handling as shown in the examples
- Use retry logic with exponential backoff
- Consider using a more reliable network connection
### 3. Response Parsing Errors
**Symptoms**: KeyError, IndexError, or ValueError when processing responses
**Solutions**:
- Validate the response format before accessing nested fields
- Implement proper error handling for malformed responses
- Check the API documentation for the expected response format
### 4. Rate Limiting
**Symptoms**: 429 Too Many Requests errors
**Solutions**:
- Implement rate limiting in your custom LLM
- Add exponential backoff for retries
- Consider using a token bucket algorithm for more precise rate control
## Advanced Features
### Logging
Adding logging to your custom LLM can help with debugging and monitoring:
```python
import logging
from typing import Any, Dict, List, Optional, Union
class LoggingLLM(BaseLLM):
def __init__(self, api_key: str, endpoint: str):
super().__init__()
self.api_key = api_key
self.endpoint = endpoint
self.logger = logging.getLogger("crewai.llm.custom")
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
self.logger.info(f"Calling LLM with {len(messages) if isinstance(messages, list) else 1} messages")
try:
# API call implementation
response = self._make_api_call(messages, tools)
self.logger.debug(f"LLM response received: {response[:100]}...")
return response
except Exception as e:
self.logger.error(f"LLM call failed: {str(e)}")
raise
```
### Rate Limiting
Implementing rate limiting can help avoid overwhelming the LLM API:
```python
import time
from typing import Any, Dict, List, Optional, Union
class RateLimitedLLM(BaseLLM):
def __init__(
self,
api_key: str,
endpoint: str,
requests_per_minute: int = 60
):
super().__init__()
self.api_key = api_key
self.endpoint = endpoint
self.requests_per_minute = requests_per_minute
self.request_times: List[float] = []
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
self._enforce_rate_limit()
# Record this request time
self.request_times.append(time.time())
# Make the actual API call
return self._make_api_call(messages, tools)
def _enforce_rate_limit(self) -> None:
"""Enforce the rate limit by waiting if necessary."""
now = time.time()
# Remove request times older than 1 minute
self.request_times = [t for t in self.request_times if now - t < 60]
if len(self.request_times) >= self.requests_per_minute:
# Calculate how long to wait
oldest_request = min(self.request_times)
wait_time = 60 - (now - oldest_request)
if wait_time > 0:
time.sleep(wait_time)
```
### Metrics Collection
Collecting metrics can help you monitor your LLM usage:
```python
import time
from typing import Any, Dict, List, Optional, Union
class MetricsCollectingLLM(BaseLLM):
def __init__(self, api_key: str, endpoint: str):
super().__init__()
self.api_key = api_key
self.endpoint = endpoint
self.metrics: Dict[str, Any] = {
"total_calls": 0,
"total_tokens": 0,
"errors": 0,
"latency": []
}
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
start_time = time.time()
self.metrics["total_calls"] += 1
try:
response = self._make_api_call(messages, tools)
# Estimate tokens (simplified)
if isinstance(messages, str):
token_estimate = len(messages) // 4
else:
token_estimate = sum(len(m.get("content", "")) // 4 for m in messages)
self.metrics["total_tokens"] += token_estimate
return response
except Exception as e:
self.metrics["errors"] += 1
raise
finally:
latency = time.time() - start_time
self.metrics["latency"].append(latency)
def get_metrics(self) -> Dict[str, Any]:
"""Return the collected metrics."""
avg_latency = sum(self.metrics["latency"]) / len(self.metrics["latency"]) if self.metrics["latency"] else 0
return {
**self.metrics,
"avg_latency": avg_latency
}
```
## Advanced Usage: Function Calling
If your LLM supports function calling, you can implement the function calling logic in your custom LLM:
```python
import json
from typing import Any, Dict, List, Optional, Union
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
import requests
try:
headers = {
"Authorization": f"Bearer {self.jwt_token}",
"Content-Type": "application/json"
}
# Convert string message to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
data = {
"messages": messages,
"tools": tools
}
response = requests.post(
self.endpoint,
headers=headers,
json=data,
timeout=30
)
response.raise_for_status()
response_data = response.json()
# Check if the LLM wants to call a function
if response_data["choices"][0]["message"].get("tool_calls"):
tool_calls = response_data["choices"][0]["message"]["tool_calls"]
# Process each tool call
for tool_call in tool_calls:
function_name = tool_call["function"]["name"]
function_args = json.loads(tool_call["function"]["arguments"])
if available_functions and function_name in available_functions:
function_to_call = available_functions[function_name]
function_response = function_to_call(**function_args)
# Add the function response to the messages
messages.append({
"role": "tool",
"tool_call_id": tool_call["id"],
"name": function_name,
"content": str(function_response)
})
# Call the LLM again with the updated messages
return self.call(messages, tools, callbacks, available_functions)
# Return the text response if no function call
return response_data["choices"][0]["message"]["content"]
except requests.Timeout:
raise TimeoutError("LLM request timed out")
except requests.RequestException as e:
raise RuntimeError(f"LLM request failed: {str(e)}")
except (KeyError, IndexError, ValueError) as e:
raise ValueError(f"Invalid response format: {str(e)}")
```
## Using Your Custom LLM with CrewAI
Once you've implemented your custom LLM, you can use it with CrewAI agents and crews:
```python
from crewai import Agent, Task, Crew
from typing import Dict, Any
# Create your custom LLM instance
jwt_llm = JWTAuthLLM(
jwt_token="your.jwt.token",
endpoint="https://your-llm-endpoint.com/v1/chat/completions"
)
# Use it with an agent
agent = Agent(
role="Research Assistant",
goal="Find information on a topic",
backstory="You are a research assistant tasked with finding information.",
llm=jwt_llm,
)
# Create a task for the agent
task = Task(
description="Research the benefits of exercise",
agent=agent,
expected_output="A summary of the benefits of exercise",
)
# Execute the task
result = agent.execute_task(task)
print(result)
# Or use it with a crew
crew = Crew(
agents=[agent],
tasks=[task],
manager_llm=jwt_llm, # Use your custom LLM for the manager
)
# Run the crew
result = crew.kickoff()
print(result)
```
## Implementing Your Own Authentication Mechanism
The `BaseLLM` class allows you to implement any authentication mechanism you need, not just JWT or API keys. You can use:
- OAuth tokens
- Client certificates
- Custom headers
- Session-based authentication
- Any other authentication method required by your LLM provider
Simply implement the appropriate authentication logic in your custom LLM class.

350
docs/learn/custom-llm.mdx Normal file
View File

@@ -0,0 +1,350 @@
---
title: Custom LLM Implementation
description: Learn how to create custom LLM implementations in CrewAI.
icon: code
---
## Overview
CrewAI supports custom LLM implementations through the `BaseLLM` abstract base class. This allows you to integrate any LLM provider that doesn't have built-in support in LiteLLM, or implement custom authentication mechanisms.
## Quick Start
Here's a minimal custom LLM implementation:
```python
from crewai import BaseLLM
from typing import Any, Dict, List, Optional, Union
import requests
class CustomLLM(BaseLLM):
def __init__(self, model: str, api_key: str, endpoint: str, temperature: Optional[float] = None):
# IMPORTANT: Call super().__init__() with required parameters
super().__init__(model=model, temperature=temperature)
self.api_key = api_key
self.endpoint = endpoint
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""Call the LLM with the given messages."""
# Convert string to message format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# Prepare request
payload = {
"model": self.model,
"messages": messages,
"temperature": self.temperature,
}
# Add tools if provided and supported
if tools and self.supports_function_calling():
payload["tools"] = tools
# Make API call
response = requests.post(
self.endpoint,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json=payload,
timeout=30
)
response.raise_for_status()
result = response.json()
return result["choices"][0]["message"]["content"]
def supports_function_calling(self) -> bool:
"""Override if your LLM supports function calling."""
return True # Change to False if your LLM doesn't support tools
def get_context_window_size(self) -> int:
"""Return the context window size of your LLM."""
return 8192 # Adjust based on your model's actual context window
```
## Using Your Custom LLM
```python
from crewai import Agent, Task, Crew
# Assuming you have the CustomLLM class defined above
# Create your custom LLM
custom_llm = CustomLLM(
model="my-custom-model",
api_key="your-api-key",
endpoint="https://api.example.com/v1/chat/completions",
temperature=0.7
)
# Use with an agent
agent = Agent(
role="Research Assistant",
goal="Find and analyze information",
backstory="You are a research assistant.",
llm=custom_llm
)
# Create and execute tasks
task = Task(
description="Research the latest developments in AI",
expected_output="A comprehensive summary",
agent=agent
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
```
## Required Methods
### Constructor: `__init__()`
**Critical**: You must call `super().__init__(model, temperature)` with the required parameters:
```python
def __init__(self, model: str, api_key: str, temperature: Optional[float] = None):
# REQUIRED: Call parent constructor with model and temperature
super().__init__(model=model, temperature=temperature)
# Your custom initialization
self.api_key = api_key
```
### Abstract Method: `call()`
The `call()` method is the heart of your LLM implementation. It must:
- Accept messages (string or list of dicts with 'role' and 'content')
- Return a string response
- Handle tools and function calling if supported
- Raise appropriate exceptions for errors
### Optional Methods
```python
def supports_function_calling(self) -> bool:
"""Return True if your LLM supports function calling."""
return True # Default is True
def supports_stop_words(self) -> bool:
"""Return True if your LLM supports stop sequences."""
return True # Default is True
def get_context_window_size(self) -> int:
"""Return the context window size."""
return 4096 # Default is 4096
```
## Common Patterns
### Error Handling
```python
import requests
def call(self, messages, tools=None, callbacks=None, available_functions=None):
try:
response = requests.post(
self.endpoint,
headers={"Authorization": f"Bearer {self.api_key}"},
json=payload,
timeout=30
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except requests.Timeout:
raise TimeoutError("LLM request timed out")
except requests.RequestException as e:
raise RuntimeError(f"LLM request failed: {str(e)}")
except (KeyError, IndexError) as e:
raise ValueError(f"Invalid response format: {str(e)}")
```
### Custom Authentication
```python
from crewai import BaseLLM
from typing import Optional
class CustomAuthLLM(BaseLLM):
def __init__(self, model: str, auth_token: str, endpoint: str, temperature: Optional[float] = None):
super().__init__(model=model, temperature=temperature)
self.auth_token = auth_token
self.endpoint = endpoint
def call(self, messages, tools=None, callbacks=None, available_functions=None):
headers = {
"Authorization": f"Custom {self.auth_token}", # Custom auth format
"Content-Type": "application/json"
}
# Rest of implementation...
```
### Stop Words Support
CrewAI automatically adds `"\nObservation:"` as a stop word to control agent behavior. If your LLM supports stop words:
```python
def call(self, messages, tools=None, callbacks=None, available_functions=None):
payload = {
"model": self.model,
"messages": messages,
"stop": self.stop # Include stop words in API call
}
# Make API call...
def supports_stop_words(self) -> bool:
return True # Your LLM supports stop sequences
```
If your LLM doesn't support stop words natively:
```python
def call(self, messages, tools=None, callbacks=None, available_functions=None):
response = self._make_api_call(messages, tools)
content = response["choices"][0]["message"]["content"]
# Manually truncate at stop words
if self.stop:
for stop_word in self.stop:
if stop_word in content:
content = content.split(stop_word)[0]
break
return content
def supports_stop_words(self) -> bool:
return False # Tell CrewAI we handle stop words manually
```
## Function Calling
If your LLM supports function calling, implement the complete flow:
```python
import json
def call(self, messages, tools=None, callbacks=None, available_functions=None):
# Convert string to message format
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# Make API call
response = self._make_api_call(messages, tools)
message = response["choices"][0]["message"]
# Check for function calls
if "tool_calls" in message and available_functions:
return self._handle_function_calls(
message["tool_calls"], messages, tools, available_functions
)
return message["content"]
def _handle_function_calls(self, tool_calls, messages, tools, available_functions):
"""Handle function calling with proper message flow."""
for tool_call in tool_calls:
function_name = tool_call["function"]["name"]
if function_name in available_functions:
# Parse and execute function
function_args = json.loads(tool_call["function"]["arguments"])
function_result = available_functions[function_name](**function_args)
# Add function call and result to message history
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call["id"],
"name": function_name,
"content": str(function_result)
})
# Call LLM again with updated context
return self.call(messages, tools, None, available_functions)
return "Function call failed"
```
## Troubleshooting
### Common Issues
**Constructor Errors**
```python
# ❌ Wrong - missing required parameters
def __init__(self, api_key: str):
super().__init__()
# ✅ Correct
def __init__(self, model: str, api_key: str, temperature: Optional[float] = None):
super().__init__(model=model, temperature=temperature)
```
**Function Calling Not Working**
- Ensure `supports_function_calling()` returns `True`
- Check that you handle `tool_calls` in the response
- Verify `available_functions` parameter is used correctly
**Authentication Failures**
- Verify API key format and permissions
- Check authentication header format
- Ensure endpoint URLs are correct
**Response Parsing Errors**
- Validate response structure before accessing nested fields
- Handle cases where content might be None
- Add proper error handling for malformed responses
## Testing Your Custom LLM
```python
from crewai import Agent, Task, Crew
def test_custom_llm():
llm = CustomLLM(
model="test-model",
api_key="test-key",
endpoint="https://api.test.com"
)
# Test basic call
result = llm.call("Hello, world!")
assert isinstance(result, str)
assert len(result) > 0
# Test with CrewAI agent
agent = Agent(
role="Test Agent",
goal="Test custom LLM",
backstory="A test agent.",
llm=llm
)
task = Task(
description="Say hello",
expected_output="A greeting",
agent=agent
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert "hello" in result.raw.lower()
```
This guide covers the essentials of implementing custom LLMs in CrewAI.

View File

@@ -9,7 +9,7 @@ icon: brain-circuit
CrewAI uses LiteLLM to connect to a wide variety of Language Models (LLMs). This integration provides extensive versatility, allowing you to use models from numerous providers with a simple, unified interface.
<Note>
By default, CrewAI uses the `gpt-4o-mini` model. This is determined by the `OPENAI_MODEL_NAME` environment variable, which defaults to "gpt-4o-mini" if not set.
By default, CrewAI uses the `gpt-4o-mini` model. This is determined by the `OPENAI_MODEL_NAME` environment variable, which defaults to "gpt-4o-mini" if not set.
You can easily configure your agents to use a different model or provider as described in this guide.
</Note>
@@ -117,18 +117,27 @@ You can connect to OpenAI-compatible LLMs using either environment variables or
<Tabs>
<Tab title="Using Environment Variables">
<CodeGroup>
```python Code
```python Generic
import os
os.environ["OPENAI_API_KEY"] = "your-api-key"
os.environ["OPENAI_API_BASE"] = "https://api.your-provider.com/v1"
os.environ["OPENAI_MODEL_NAME"] = "your-model-name"
```
```python Google
import os
# Example using Gemini's OpenAI-compatible API.
os.environ["OPENAI_API_KEY"] = "your-gemini-key" # Should start with AIza...
os.environ["OPENAI_API_BASE"] = "https://generativelanguage.googleapis.com/v1beta/openai/"
os.environ["OPENAI_MODEL_NAME"] = "openai/gemini-2.0-flash" # Add your Gemini model here, under openai/
```
</CodeGroup>
</Tab>
<Tab title="Using LLM Class Attributes">
<CodeGroup>
```python Code
```python Generic
llm = LLM(
model="custom-model-name",
api_key="your-api-key",
@@ -136,6 +145,16 @@ You can connect to OpenAI-compatible LLMs using either environment variables or
)
agent = Agent(llm=llm, ...)
```
```python Google
# Example using Gemini's OpenAI-compatible API
llm = LLM(
model="openai/gemini-2.0-flash",
base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
api_key="your-gemini-key", # Should start with AIza...
)
agent = Agent(llm=llm, ...)
```
</CodeGroup>
</Tab>
</Tabs>
@@ -169,7 +188,7 @@ For local models like those provided by Ollama:
You can change the base API URL for any LLM provider by setting the `base_url` parameter:
```python Code
```python Code
llm = LLM(
model="custom-model-name",
base_url="https://api.your-provider.com/v1",

158
docs/learn/overview.mdx Normal file
View File

@@ -0,0 +1,158 @@
---
title: "Overview"
description: "Learn how to build, customize, and optimize your CrewAI applications with comprehensive guides and tutorials"
icon: "face-smile"
---
## Learn CrewAI
This section provides comprehensive guides and tutorials to help you master CrewAI, from basic concepts to advanced techniques. Whether you're just getting started or looking to optimize your existing implementations, these resources will guide you through every aspect of building powerful AI agent workflows.
## Getting Started Guides
### Core Concepts
<CardGroup cols={2}>
<Card title="Sequential Process" icon="list-ol" href="/learn/sequential-process">
Learn how to execute tasks in a sequential order for structured workflows.
</Card>
<Card title="Hierarchical Process" icon="sitemap" href="/learn/hierarchical-process">
Implement hierarchical task execution with manager agents overseeing workflows.
</Card>
<Card title="Conditional Tasks" icon="code-branch" href="/learn/conditional-tasks">
Create dynamic workflows with conditional task execution based on outcomes.
</Card>
<Card title="Async Kickoff" icon="bolt" href="/learn/kickoff-async">
Execute crews asynchronously for improved performance and concurrency.
</Card>
</CardGroup>
### Agent Development
<CardGroup cols={2}>
<Card title="Customizing Agents" icon="user-gear" href="/learn/customizing-agents">
Learn how to customize agent behavior, roles, and capabilities.
</Card>
<Card title="Coding Agents" icon="code" href="/learn/coding-agents">
Build agents that can write, execute, and debug code automatically.
</Card>
<Card title="Multimodal Agents" icon="images" href="/learn/multimodal-agents">
Create agents that can process text, images, and other media types.
</Card>
<Card title="Custom Manager Agent" icon="user-tie" href="/learn/custom-manager-agent">
Implement custom manager agents for complex hierarchical workflows.
</Card>
</CardGroup>
## Advanced Features
### Workflow Control
<CardGroup cols={2}>
<Card title="Human in the Loop" icon="user-check" href="/learn/human-in-the-loop">
Integrate human oversight and intervention into agent workflows.
</Card>
<Card title="Human Input on Execution" icon="hand-paper" href="/learn/human-input-on-execution">
Allow human input during task execution for dynamic decision making.
</Card>
<Card title="Replay Tasks" icon="rotate-left" href="/learn/replay-tasks-from-latest-crew-kickoff">
Replay and resume tasks from previous crew executions.
</Card>
<Card title="Kickoff for Each" icon="repeat" href="/learn/kickoff-for-each">
Execute crews multiple times with different inputs efficiently.
</Card>
</CardGroup>
### Customization & Integration
<CardGroup cols={2}>
<Card title="Custom LLM" icon="brain" href="/learn/custom-llm">
Integrate custom language models and providers with CrewAI.
</Card>
<Card title="LLM Connections" icon="link" href="/learn/llm-connections">
Configure and manage connections to various LLM providers.
</Card>
<Card title="Create Custom Tools" icon="wrench" href="/learn/create-custom-tools">
Build custom tools to extend agent capabilities.
</Card>
<Card title="Using Annotations" icon="at" href="/learn/using-annotations">
Use Python annotations for cleaner, more maintainable code.
</Card>
</CardGroup>
## Specialized Applications
### Content & Media
<CardGroup cols={2}>
<Card title="DALL-E Image Generation" icon="image" href="/learn/dalle-image-generation">
Generate images using DALL-E integration with your agents.
</Card>
<Card title="Bring Your Own Agent" icon="user-plus" href="/learn/bring-your-own-agent">
Integrate existing agents and models into CrewAI workflows.
</Card>
</CardGroup>
### Tool Management
<CardGroup cols={2}>
<Card title="Force Tool Output as Result" icon="hammer" href="/learn/force-tool-output-as-result">
Configure tools to return their output directly as task results.
</Card>
</CardGroup>
## Learning Path Recommendations
### For Beginners
1. Start with **Sequential Process** to understand basic workflow execution
2. Learn **Customizing Agents** to create effective agent configurations
3. Explore **Create Custom Tools** to extend functionality
4. Try **Human in the Loop** for interactive workflows
### For Intermediate Users
1. Master **Hierarchical Process** for complex multi-agent systems
2. Implement **Conditional Tasks** for dynamic workflows
3. Use **Async Kickoff** for performance optimization
4. Integrate **Custom LLM** for specialized models
### For Advanced Users
1. Build **Multimodal Agents** for complex media processing
2. Create **Custom Manager Agents** for sophisticated orchestration
3. Implement **Bring Your Own Agent** for hybrid systems
4. Use **Replay Tasks** for robust error recovery
## Best Practices
### Development
- **Start Simple**: Begin with basic sequential workflows before adding complexity
- **Test Incrementally**: Test each component before integrating into larger systems
- **Use Annotations**: Leverage Python annotations for cleaner, more maintainable code
- **Custom Tools**: Build reusable tools that can be shared across different agents
### Production
- **Error Handling**: Implement robust error handling and recovery mechanisms
- **Performance**: Use async execution and optimize LLM calls for better performance
- **Monitoring**: Integrate observability tools to track agent performance
- **Human Oversight**: Include human checkpoints for critical decisions
### Optimization
- **Resource Management**: Monitor and optimize token usage and API costs
- **Workflow Design**: Design workflows that minimize unnecessary LLM calls
- **Tool Efficiency**: Create efficient tools that provide maximum value with minimal overhead
- **Iterative Improvement**: Use feedback and metrics to continuously improve agent performance
## Getting Help
- **Documentation**: Each guide includes detailed examples and explanations
- **Community**: Join the [CrewAI Forum](https://community.crewai.com) for discussions and support
- **Examples**: Check the Examples section for complete working implementations
- **Support**: Contact [support@crewai.com](mailto:support@crewai.com) for technical assistance
Start with the guides that match your current needs and gradually explore more advanced topics as you become comfortable with the fundamentals.

View File

@@ -0,0 +1,118 @@
---
title: "Overview"
description: "Monitor, evaluate, and optimize your CrewAI agents with comprehensive observability tools"
icon: "face-smile"
---
## Observability for CrewAI
Observability is crucial for understanding how your CrewAI agents perform, identifying bottlenecks, and ensuring reliable operation in production environments. This section covers various tools and platforms that provide monitoring, evaluation, and optimization capabilities for your agent workflows.
## Why Observability Matters
- **Performance Monitoring**: Track agent execution times, token usage, and resource consumption
- **Quality Assurance**: Evaluate output quality and consistency across different scenarios
- **Debugging**: Identify and resolve issues in agent behavior and task execution
- **Cost Management**: Monitor LLM API usage and associated costs
- **Continuous Improvement**: Gather insights to optimize agent performance over time
## Available Observability Tools
### Monitoring & Tracing Platforms
<CardGroup cols={2}>
<Card title="AgentOps" icon="paperclip" href="/observability/agentops">
Session replays, metrics, and monitoring for agent development and production.
</Card>
<Card title="OpenLIT" icon="magnifying-glass-chart" href="/observability/openlit">
OpenTelemetry-native monitoring with cost tracking and performance analytics.
</Card>
<Card title="MLflow" icon="bars-staggered" href="/observability/mlflow">
Machine learning lifecycle management with tracing and evaluation capabilities.
</Card>
<Card title="Langfuse" icon="link" href="/observability/langfuse">
LLM engineering platform with detailed tracing and analytics.
</Card>
<Card title="Langtrace" icon="chart-line" href="/observability/langtrace">
Open-source observability for LLMs and agent frameworks.
</Card>
<Card title="Arize Phoenix" icon="meteor" href="/observability/arize-phoenix">
AI observability platform for monitoring and troubleshooting.
</Card>
<Card title="Portkey" icon="key" href="/observability/portkey">
AI gateway with comprehensive monitoring and reliability features.
</Card>
<Card title="Opik" icon="meteor" href="/observability/opik">
Debug, evaluate, and monitor LLM applications with comprehensive tracing.
</Card>
<Card title="Weave" icon="network-wired" href="/observability/weave">
Weights & Biases platform for tracking and evaluating AI applications.
</Card>
</CardGroup>
### Evaluation & Quality Assurance
<CardGroup cols={2}>
<Card title="Patronus AI" icon="shield-check" href="/observability/patronus-evaluation">
Comprehensive evaluation platform for LLM outputs and agent behaviors.
</Card>
</CardGroup>
## Key Observability Metrics
### Performance Metrics
- **Execution Time**: How long agents take to complete tasks
- **Token Usage**: Input/output tokens consumed by LLM calls
- **API Latency**: Response times from external services
- **Success Rate**: Percentage of successfully completed tasks
### Quality Metrics
- **Output Accuracy**: Correctness of agent responses
- **Consistency**: Reliability across similar inputs
- **Relevance**: How well outputs match expected results
- **Safety**: Compliance with content policies and guidelines
### Cost Metrics
- **API Costs**: Expenses from LLM provider usage
- **Resource Utilization**: Compute and memory consumption
- **Cost per Task**: Economic efficiency of agent operations
- **Budget Tracking**: Monitoring against spending limits
## Getting Started
1. **Choose Your Tools**: Select observability platforms that match your needs
2. **Instrument Your Code**: Add monitoring to your CrewAI applications
3. **Set Up Dashboards**: Configure visualizations for key metrics
4. **Define Alerts**: Create notifications for important events
5. **Establish Baselines**: Measure initial performance for comparison
6. **Iterate and Improve**: Use insights to optimize your agents
## Best Practices
### Development Phase
- Use detailed tracing to understand agent behavior
- Implement evaluation metrics early in development
- Monitor resource usage during testing
- Set up automated quality checks
### Production Phase
- Implement comprehensive monitoring and alerting
- Track performance trends over time
- Monitor for anomalies and degradation
- Maintain cost visibility and control
### Continuous Improvement
- Regular performance reviews and optimization
- A/B testing of different agent configurations
- Feedback loops for quality improvement
- Documentation of lessons learned
Choose the observability tools that best fit your use case, infrastructure, and monitoring requirements to ensure your CrewAI agents perform reliably and efficiently.

View File

@@ -1,16 +1,26 @@
---
title: Patronus Evaluation Tools
description: The Patronus evaluation tools enable CrewAI agents to evaluate and score model inputs and outputs using the Patronus AI platform.
icon: check
title: Patronus AI Evaluation
description: Monitor and evaluate CrewAI agent performance using Patronus AI's comprehensive evaluation platform for LLM outputs and agent behaviors.
icon: shield-check
---
# `Patronus Evaluation Tools`
# Patronus AI Evaluation
## Description
## Overview
The [Patronus evaluation tools](https://patronus.ai) are designed to enable CrewAI agents to evaluate and score model inputs and outputs using the Patronus AI platform. These tools provide different levels of control over the evaluation process, from allowing agents to select the most appropriate evaluator and criteria to using predefined criteria or custom local evaluators.
[Patronus AI](https://patronus.ai) provides comprehensive evaluation and monitoring capabilities for CrewAI agents, enabling you to assess model outputs, agent behaviors, and overall system performance. This integration allows you to implement continuous evaluation workflows that help maintain quality and reliability in production environments.
There are three main Patronus evaluation tools:
## Key Features
- **Automated Evaluation**: Real-time assessment of agent outputs and behaviors
- **Custom Criteria**: Define specific evaluation criteria tailored to your use cases
- **Performance Monitoring**: Track agent performance metrics over time
- **Quality Assurance**: Ensure consistent output quality across different scenarios
- **Safety & Compliance**: Monitor for potential issues and policy violations
## Evaluation Tools
Patronus provides three main evaluation tools for different use cases:
1. **PatronusEvalTool**: Allows agents to select the most appropriate evaluator and criteria for the evaluation task.
2. **PatronusPredefinedCriteriaEvalTool**: Uses predefined evaluator and criteria specified by the user.

View File

@@ -37,9 +37,7 @@ These tools integrate with AI and machine learning services to enhance your agen
Execute Python code and perform data analysis.
</Card>
<Card title="Patronus Tools" icon="shield" href="/tools/ai-ml/patronustools">
AI safety and content moderation capabilities.
</Card>
</CardGroup>
## **Common Use Cases**

View File

@@ -1,6 +1,6 @@
[project]
name = "crewai"
version = "0.121.0"
version = "0.121.1"
description = "Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks."
readme = "README.md"
requires-python = ">=3.10,<3.13"
@@ -21,7 +21,6 @@ dependencies = [
"opentelemetry-sdk>=1.30.0",
"opentelemetry-exporter-otlp-proto-http>=1.30.0",
# Data Handling
"chromadb>=0.5.23",
"openpyxl>=3.1.5",
"pyvis>=0.3.2",
# Authentication and Security
@@ -49,6 +48,9 @@ tools = ["crewai-tools~=0.45.0"]
embeddings = [
"tiktoken~=0.7.0"
]
storage = [
"chromadb>=0.5.23"
]
agentops = ["agentops>=0.3.0"]
fastembed = ["fastembed>=0.4.1"]
pdfplumber = [

View File

@@ -18,7 +18,7 @@ warnings.filterwarnings(
category=UserWarning,
module="pydantic.main",
)
__version__ = "0.121.0"
__version__ = "0.121.1"
__all__ = [
"Agent",
"Crew",

View File

@@ -1,5 +1,3 @@
import json
import re
from typing import Any, Callable, Dict, List, Optional, Union
from crewai.agents.agent_builder.base_agent import BaseAgent

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.13"
dependencies = [
"crewai[tools]>=0.121.0,<1.0.0"
"crewai[tools]>=0.121.1,<1.0.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.13"
dependencies = [
"crewai[tools]>=0.121.0,<1.0.0",
"crewai[tools]>=0.121.1,<1.0.0",
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.13"
dependencies = [
"crewai[tools]>=0.121.0"
"crewai[tools]>=0.121.1"
]
[tool.crewai]

View File

@@ -6,16 +6,25 @@ import os
import shutil
from typing import Any, Dict, List, Optional, Union
import chromadb
import chromadb.errors
from chromadb.api import ClientAPI
from chromadb.api.types import OneOrMany
from chromadb.config import Settings
try:
import chromadb
import chromadb.errors
from chromadb.api import ClientAPI
from chromadb.api.types import OneOrMany
from chromadb.config import Settings
HAS_CHROMADB = True
except ImportError:
chromadb = None # type: ignore
ClientAPI = Any # type: ignore
OneOrMany = Any # type: ignore
Settings = Any # type: ignore
HAS_CHROMADB = False
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.chromadb import sanitize_collection_name
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.errors import ChromaDBRequiredError
from crewai.utilities.logger import Logger
from crewai.utilities.paths import db_storage_path
@@ -43,7 +52,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
search efficiency.
"""
collection: Optional[chromadb.Collection] = None
collection: Optional[Any] = None # type: ignore
collection_name: Optional[str] = "knowledge"
app: Optional[ClientAPI] = None
@@ -62,6 +71,9 @@ class KnowledgeStorage(BaseKnowledgeStorage):
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
with suppress_logging():
if self.collection:
fetched = self.collection.query(
@@ -84,48 +96,63 @@ class KnowledgeStorage(BaseKnowledgeStorage):
raise Exception("Collection not initialized")
def initialize_knowledge_storage(self):
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
try:
collection_name = (
f"knowledge_{self.collection_name}"
if self.collection_name
else "knowledge"
)
if self.app:
self.collection = self.app.get_or_create_collection(
name=sanitize_collection_name(collection_name),
embedding_function=self.embedder,
)
else:
raise Exception("Vector Database Client not initialized")
except Exception:
raise Exception("Failed to create or get collection")
def reset(self):
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
if not self.app:
self.app = chromadb.PersistentClient(
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app.reset()
shutil.rmtree(base_path)
self.app = None
self.collection = None
self.app = chroma_client
try:
collection_name = (
f"knowledge_{self.collection_name}"
if self.collection_name
else "knowledge"
)
if self.app:
self.collection = self.app.get_or_create_collection(
name=sanitize_collection_name(collection_name),
embedding_function=self.embedder,
)
else:
raise Exception("Vector Database Client not initialized")
except Exception:
raise Exception("Failed to create or get collection")
except ImportError:
raise ChromaDBRequiredError("knowledge storage")
def reset(self):
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
try:
if not self.app:
self.app = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app.reset()
shutil.rmtree(base_path)
self.app = None
self.collection = None
except ImportError:
raise ChromaDBRequiredError("knowledge storage")
def save(
self,
documents: List[str],
metadata: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
):
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
if not self.collection:
raise Exception("Collection not initialized")
@@ -156,7 +183,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
filtered_ids.append(doc_id)
# If we have no metadata at all, set it to None
final_metadata: Optional[OneOrMany[chromadb.Metadata]] = (
final_metadata: Optional[OneOrMany[Any]] = (
None if all(m is None for m in filtered_metadata) else filtered_metadata
)
@@ -165,29 +192,38 @@ class KnowledgeStorage(BaseKnowledgeStorage):
metadatas=final_metadata,
ids=filtered_ids,
)
except chromadb.errors.InvalidDimensionException as e:
Logger(verbose=True).log(
"error",
"Embedding dimension mismatch. This usually happens when mixing different embedding models. Try resetting the collection using `crewai reset-memories -a`",
"red",
)
raise ValueError(
"Embedding dimension mismatch. Make sure you're using the same embedding model "
"across all operations with this collection."
"Try resetting the collection using `crewai reset-memories -a`"
) from e
except ImportError:
raise ChromaDBRequiredError("knowledge storage")
except Exception as e:
Logger(verbose=True).log("error", f"Failed to upsert documents: {e}", "red")
raise
if HAS_CHROMADB and isinstance(e, chromadb.errors.InvalidDimensionException):
Logger(verbose=True).log(
"error",
"Embedding dimension mismatch. This usually happens when mixing different embedding models. Try resetting the collection using `crewai reset-memories -a`",
"red",
)
raise ValueError(
"Embedding dimension mismatch. Make sure you're using the same embedding model "
"across all operations with this collection."
"Try resetting the collection using `crewai reset-memories -a`"
) from e
else:
Logger(verbose=True).log("error", f"Failed to upsert documents: {e}", "red")
raise
def _create_default_embedding_function(self):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except ImportError:
raise ChromaDBRequiredError("knowledge storage")
def _set_embedder_config(self, embedder: Optional[Dict[str, Any]] = None) -> None:
"""Set the embedding configuration for the knowledge storage.

View File

@@ -5,7 +5,7 @@ import sys
import threading
import warnings
from collections import defaultdict
from contextlib import contextmanager, redirect_stderr, redirect_stdout
from contextlib import contextmanager
from typing import (
Any,
DefaultDict,
@@ -18,7 +18,7 @@ from typing import (
Union,
cast,
)
from datetime import datetime
from dotenv import load_dotenv
from litellm.types.utils import ChatCompletionDeltaToolCall
from pydantic import BaseModel, Field
@@ -30,6 +30,11 @@ from crewai.utilities.events.llm_events import (
LLMCallType,
LLMStreamChunkEvent,
)
from crewai.utilities.events.tool_usage_events import (
ToolUsageStartedEvent,
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
@@ -67,33 +72,72 @@ class FilteredStream(io.TextIOBase):
self._lock = threading.Lock()
with self._lock:
# Filter out extraneous messages from LiteLLM
lower_s = s.lower()
# Skip common noisy LiteLLM banners and any other lines that contain "litellm"
if (
"Give Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new"
in s
or "LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()`"
in s
"give feedback / get help" in lower_s
or "litellm.info:" in lower_s
or "litellm" in lower_s
or "Consider using a smaller input or implementing a text splitting strategy" in lower_s
):
return 0
return self._original_stream.write(s)
def flush(self):
with self._lock:
return self._original_stream.flush()
def __getattr__(self, name):
"""Delegate attribute access to the wrapped original stream.
This ensures compatibility with libraries (e.g., Rich) that rely on
attributes such as `encoding`, `isatty`, `buffer`, etc., which may not
be explicitly defined on this proxy class.
"""
return getattr(self._original_stream, name)
# Delegate common properties/methods explicitly so they aren't shadowed by
# the TextIOBase defaults (e.g., .encoding returns None by default, which
# confuses Rich). These explicit pass-throughs ensure the wrapped Console
# still sees a fully-featured stream.
@property
def encoding(self):
return getattr(self._original_stream, "encoding", "utf-8")
def isatty(self):
return self._original_stream.isatty()
def fileno(self):
return self._original_stream.fileno()
def writable(self):
return True
# Apply the filtered stream globally so that any subsequent writes containing the filtered
# keywords (e.g., "litellm") are hidden from terminal output. We guard against double
# wrapping to ensure idempotency in environments where this module might be reloaded.
if not isinstance(sys.stdout, FilteredStream):
sys.stdout = FilteredStream(sys.stdout)
if not isinstance(sys.stderr, FilteredStream):
sys.stderr = FilteredStream(sys.stderr)
LLM_CONTEXT_WINDOW_SIZES = {
# openai
"gpt-4": 8192,
"gpt-4o": 128000,
"gpt-4o-mini": 128000,
"gpt-4o-mini": 200000,
"gpt-4-turbo": 128000,
"gpt-4.1": 1047576, # Based on official docs
"gpt-4.1-mini-2025-04-14": 1047576,
"gpt-4.1-nano-2025-04-14": 1047576,
"o1-preview": 128000,
"o1-mini": 128000,
"o3-mini": 200000, # Based on official o3-mini specifications
"o3-mini": 200000,
"o4-mini": 200000,
# gemini
"gemini-2.0-flash": 1048576,
"gemini-2.0-flash-thinking-exp-01-21": 32768,
@@ -208,7 +252,7 @@ LLM_CONTEXT_WINDOW_SIZES = {
}
DEFAULT_CONTEXT_WINDOW_SIZE = 8192
CONTEXT_WINDOW_USAGE_RATIO = 0.75
CONTEXT_WINDOW_USAGE_RATIO = 0.85
@contextmanager
@@ -219,12 +263,7 @@ def suppress_warnings():
"ignore", message="open_text is deprecated*", category=DeprecationWarning
)
# Redirect stdout and stderr
with (
redirect_stdout(FilteredStream(sys.stdout)),
redirect_stderr(FilteredStream(sys.stderr)),
):
yield
yield
class Delta(TypedDict):
@@ -799,7 +838,26 @@ class LLM(BaseLLM):
fn = available_functions[function_name]
# --- 3.2) Execute function
assert hasattr(crewai_event_bus, "emit")
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=function_name,
tool_args=function_args,
),
)
result = fn(**function_args)
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=function_name,
tool_args=function_args,
started_at=started_at,
finished_at=datetime.now(),
),
)
# --- 3.3) Emit success event
self._handle_emit_call_events(result, LLMCallType.TOOL_CALL)
@@ -815,6 +873,14 @@ class LLM(BaseLLM):
self,
event=LLMCallFailedEvent(error=f"Tool execution error: {str(e)}"),
)
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
tool_name=function_name,
tool_args=function_args,
error=f"Tool execution error: {str(e)}"
),
)
return None
def call(

View File

@@ -6,11 +6,17 @@ import shutil
import uuid
from typing import Any, Dict, List, Optional
from chromadb.api import ClientAPI
try:
from chromadb.api import ClientAPI
HAS_CHROMADB = True
except ImportError:
ClientAPI = Any # type: ignore
HAS_CHROMADB = False
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.errors import ChromaDBRequiredError
from crewai.utilities.paths import db_storage_path
@@ -60,26 +66,32 @@ class RAGStorage(BaseRAGStorage):
self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self):
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
if not HAS_CHROMADB:
raise ChromaDBRequiredError("memory storage")
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
)
except ImportError:
raise ChromaDBRequiredError("memory storage")
def _sanitize_role(self, role: str) -> str:
"""
Sanitizes agent roles to ensure valid directory names.
@@ -165,10 +177,16 @@ class RAGStorage(BaseRAGStorage):
)
def _create_default_embedding_function(self):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("memory storage")
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except ImportError:
raise ChromaDBRequiredError("memory storage")

View File

@@ -1,5 +1,4 @@
import asyncio
import warnings
from abc import ABC, abstractmethod
from inspect import signature
from typing import Any, Callable, Type, get_args, get_origin
@@ -36,6 +35,10 @@ class BaseTool(BaseModel, ABC):
"""Function that will be used to determine if the tool should be cached, should return a boolean. If None, the tool will be cached."""
result_as_answer: bool = False
"""Flag to check if the tool should be the final agent answer."""
max_usage_count: int | None = None
"""Maximum number of times this tool can be used. None means unlimited usage."""
current_usage_count: int = 0
"""Current number of times this tool has been used."""
@field_validator("args_schema", mode="before")
@classmethod
@@ -54,6 +57,13 @@ class BaseTool(BaseModel, ABC):
},
},
)
@field_validator("max_usage_count", mode="before")
@classmethod
def validate_max_usage_count(cls, v: int | None) -> int | None:
if v is not None and v <= 0:
raise ValueError("max_usage_count must be a positive integer")
return v
def model_post_init(self, __context: Any) -> None:
self._generate_description()
@@ -70,9 +80,15 @@ class BaseTool(BaseModel, ABC):
# If _run is async, we safely run it
if asyncio.iscoroutine(result):
return asyncio.run(result)
result = asyncio.run(result)
self.current_usage_count += 1
return result
def reset_usage_count(self) -> None:
"""Reset the current usage count to zero."""
self.current_usage_count = 0
@abstractmethod
def _run(
@@ -91,6 +107,8 @@ class BaseTool(BaseModel, ABC):
args_schema=self.args_schema,
func=self._run,
result_as_answer=self.result_as_answer,
max_usage_count=self.max_usage_count,
current_usage_count=self.current_usage_count,
)
@classmethod
@@ -251,13 +269,14 @@ def to_langchain(
return [t.to_structured_tool() if isinstance(t, BaseTool) else t for t in tools]
def tool(*args, result_as_answer=False):
def tool(*args, result_as_answer: bool = False, max_usage_count: int | None = None) -> Callable:
"""
Decorator to create a tool from a function.
Args:
*args: Positional arguments, either the function to decorate or the tool name.
result_as_answer: Flag to indicate if the tool result should be used as the final agent answer.
max_usage_count: Maximum number of times this tool can be used. None means unlimited usage.
"""
def _make_with_name(tool_name: str) -> Callable:
@@ -284,6 +303,8 @@ def tool(*args, result_as_answer=False):
func=f,
args_schema=args_schema,
result_as_answer=result_as_answer,
max_usage_count=max_usage_count,
current_usage_count=0,
)
return _make_tool

View File

@@ -23,6 +23,8 @@ class CrewStructuredTool:
args_schema: type[BaseModel],
func: Callable[..., Any],
result_as_answer: bool = False,
max_usage_count: int | None = None,
current_usage_count: int = 0,
) -> None:
"""Initialize the structured tool.
@@ -32,6 +34,8 @@ class CrewStructuredTool:
args_schema: The pydantic model for the tool's arguments
func: The function to run when the tool is called
result_as_answer: Whether to return the output directly
max_usage_count: Maximum number of times this tool can be used. None means unlimited usage.
current_usage_count: Current number of times this tool has been used.
"""
self.name = name
self.description = description
@@ -39,6 +43,8 @@ class CrewStructuredTool:
self.func = func
self._logger = Logger()
self.result_as_answer = result_as_answer
self.max_usage_count = max_usage_count
self.current_usage_count = current_usage_count
# Validate the function signature matches the schema
self._validate_function_signature()

View File

@@ -200,6 +200,17 @@ class ToolUsage:
None,
)
usage_limit_error = self._check_usage_limit(available_tool, tool.name)
if usage_limit_error:
try:
result = usage_limit_error
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
result = self._format_result(result=result)
return result
except Exception:
if self.task:
self.task.increment_tools_errors()
if result is None:
try:
if calling.tool_name in [
@@ -300,6 +311,14 @@ class ToolUsage:
if self.agent and hasattr(self.agent, "tools_results"):
self.agent.tools_results.append(data)
if available_tool and hasattr(available_tool, 'current_usage_count'):
available_tool.current_usage_count += 1
if hasattr(available_tool, 'max_usage_count') and available_tool.max_usage_count is not None:
self._printer.print(
content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue"
)
return result
def _format_result(self, result: Any) -> str:
@@ -331,6 +350,24 @@ class ToolUsage:
calling.arguments == last_tool_usage.arguments
)
return False
def _check_usage_limit(self, tool: Any, tool_name: str) -> str | None:
"""Check if tool has reached its usage limit.
Args:
tool: The tool to check
tool_name: The name of the tool (used for error message)
Returns:
Error message if limit reached, None otherwise
"""
if (
hasattr(tool, 'max_usage_count')
and tool.max_usage_count is not None
and tool.current_usage_count >= tool.max_usage_count
):
return f"Tool '{tool_name}' has reached its usage limit of {tool.max_usage_count} times and cannot be used anymore."
return None
def _select_tool(self, tool_name: str) -> Any:
order_tools = sorted(

View File

@@ -55,7 +55,7 @@
"reasoning": {
"initial_plan": "You are {role}, a professional with the following background: {backstory}\n\nYour primary goal is: {goal}\n\nAs {role}, you are creating a strategic plan for a task that requires your expertise and unique perspective.",
"refine_plan": "You are {role}, a professional with the following background: {backstory}\n\nYour primary goal is: {goal}\n\nAs {role}, you are refining a strategic plan for a task that requires your expertise and unique perspective.",
"create_plan_prompt": "You are {role} with this background: {backstory}\n\nYour primary goal is: {goal}\n\nYou have been assigned the following task:\n{description}\n\nExpected output:\n{expected_output}\n\nAvailable tools: {tools}\n\nBefore executing this task, create a detailed plan that leverages your expertise as {role} and outlines:\n1. Your understanding of the task from your professional perspective\n2. The key steps you'll take to complete it, drawing on your background and skills\n3. How you'll approach any challenges that might arise, considering your expertise\n4. How you'll strategically use the available tools based on your experience\n5. The expected outcome and how it aligns with your goal\n\nAfter creating your plan, assess whether you feel ready to execute the task.\nConclude with one of these statements:\n- \"READY: I am ready to execute the task.\"\n- \"NOT READY: I need to refine my plan because [specific reason].\"",
"refine_plan_prompt": "You are {role} with this background: {backstory}\n\nYour primary goal is: {goal}\n\nYou created the following plan for this task:\n{current_plan}\n\nHowever, you indicated that you're not ready to execute the task yet.\n\nPlease refine your plan further, drawing on your expertise as {role} to address any gaps or uncertainties.\n\nAfter refining your plan, assess whether you feel ready to execute the task.\nConclude with one of these statements:\n- \"READY: I am ready to execute the task.\"\n- \"NOT READY: I need to refine my plan further because [specific reason].\""
"create_plan_prompt": "You are {role} with this background: {backstory}\n\nYour primary goal is: {goal}\n\nYou have been assigned the following task:\n{description}\n\nExpected output:\n{expected_output}\n\nAvailable tools: {tools}\n\nBefore executing this task, create a detailed plan that leverages your expertise as {role} and outlines:\n1. Your understanding of the task from your professional perspective\n2. The key steps you'll take to complete it, drawing on your background and skills\n3. How you'll approach any challenges that might arise, considering your expertise\n4. How you'll strategically use the available tools based on your experience, exactly what tools to use and how to use them\n5. The expected outcome and how it aligns with your goal\n\nAfter creating your plan, assess whether you feel ready to execute the task or if you could do better.\nConclude with one of these statements:\n- \"READY: I am ready to execute the task.\"\n- \"NOT READY: I need to refine my plan because [specific reason].\"",
"refine_plan_prompt": "You are {role} with this background: {backstory}\n\nYour primary goal is: {goal}\n\nYou created the following plan for this task:\n{current_plan}\n\nHowever, you indicated that you're not ready to execute the task yet.\n\nPlease refine your plan further, drawing on your expertise as {role} to address any gaps or uncertainties. As you refine your plan, be specific about which available tools you will use, how you will use them, and why they are the best choices for each step. Clearly outline your tool usage strategy as part of your improved plan.\n\nAfter refining your plan, assess whether you feel ready to execute the task.\nConclude with one of these statements:\n- \"READY: I am ready to execute the task.\"\n- \"NOT READY: I need to refine my plan further because [specific reason].\""
}
}

View File

@@ -44,7 +44,7 @@ def render_text_description_and_args(
tools: Sequence[Union[CrewStructuredTool, BaseTool]],
) -> str:
"""Render the tool name, description, and args in plain text.
search: This tool is used for search, args: {"query": {"type": "string"}}
calculator: This tool is used for math, \
args: {"expression": {"type": "string"}}
@@ -309,7 +309,7 @@ def handle_context_length(
"""
if respect_context_window:
printer.print(
content="Context length exceeded. Summarizing content to fit the model context window.",
content="Context length exceeded. Summarizing content to fit the model context window. Might take a while...",
color="yellow",
)
summarize_messages(messages, llm, callbacks, i18n)
@@ -337,15 +337,22 @@ def summarize_messages(
callbacks: List of callbacks for LLM
i18n: I18N instance for messages
"""
messages_string = " ".join([message["content"] for message in messages])
messages_groups = []
for message in messages:
content = message["content"]
cut_size = llm.get_context_window_size()
for i in range(0, len(content), cut_size):
messages_groups.append({"content": content[i : i + cut_size]})
cut_size = llm.get_context_window_size()
for i in range(0, len(messages_string), cut_size):
messages_groups.append({"content": messages_string[i : i + cut_size]})
summarized_contents = []
for group in messages_groups:
total_groups = len(messages_groups)
for idx, group in enumerate(messages_groups, 1):
Printer().print(
content=f"Summarizing {idx}/{total_groups}...",
color="yellow",
)
summary = llm.call(
[
format_message_for_llm(

View File

@@ -1,8 +1,29 @@
import os
from typing import Any, Dict, Optional, cast
from typing import Any, Dict, Optional, cast, Protocol, Sequence, TYPE_CHECKING, TypeVar, List, Union
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
from crewai.utilities.errors import ChromaDBRequiredError
if TYPE_CHECKING:
from numpy import ndarray
from numpy import dtype, floating, signedinteger, unsignedinteger
try:
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
Documents = List[str] # type: ignore
Embeddings = List[List[float]] # type: ignore
class EmbeddingFunction(Protocol): # type: ignore
"""Protocol for embedding functions when ChromaDB is not available."""
def __call__(self, input: List[str]) -> List[List[float]]: ...
def validate_embedding_function(func: Any) -> None: # type: ignore
"""Stub for validate_embedding_function when ChromaDB is not available."""
pass
class EmbeddingConfigurator:
@@ -26,6 +47,9 @@ class EmbeddingConfigurator:
embedder_config: Optional[Dict[str, Any]] = None,
) -> EmbeddingFunction:
"""Configures and returns an embedding function based on the provided config."""
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
if embedder_config is None:
return self._create_default_embedding_function()
@@ -47,129 +71,189 @@ class EmbeddingConfigurator:
@staticmethod
def _create_default_embedding_function():
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_openai(config, model_name):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
model_name=model_name,
api_base=config.get("api_base", None),
api_type=config.get("api_type", None),
api_version=config.get("api_version", None),
default_headers=config.get("default_headers", None),
dimensions=config.get("dimensions", None),
deployment_id=config.get("deployment_id", None),
organization_id=config.get("organization_id", None),
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
model_name=model_name,
api_base=config.get("api_base", None),
api_type=config.get("api_type", None),
api_version=config.get("api_version", None),
default_headers=config.get("default_headers", None),
dimensions=config.get("dimensions", None),
deployment_id=config.get("deployment_id", None),
organization_id=config.get("organization_id", None),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_azure(config, model_name):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key"),
api_base=config.get("api_base"),
api_type=config.get("api_type", "azure"),
api_version=config.get("api_version"),
model_name=model_name,
default_headers=config.get("default_headers"),
dimensions=config.get("dimensions"),
deployment_id=config.get("deployment_id"),
organization_id=config.get("organization_id"),
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key"),
api_base=config.get("api_base"),
api_type=config.get("api_type", "azure"),
api_version=config.get("api_version"),
model_name=model_name,
default_headers=config.get("default_headers"),
dimensions=config.get("dimensions"),
deployment_id=config.get("deployment_id"),
organization_id=config.get("organization_id"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_ollama(config, model_name):
from chromadb.utils.embedding_functions.ollama_embedding_function import (
OllamaEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.ollama_embedding_function import (
OllamaEmbeddingFunction,
)
return OllamaEmbeddingFunction(
url=config.get("url", "http://localhost:11434/api/embeddings"),
model_name=model_name,
)
return OllamaEmbeddingFunction(
url=config.get("url", "http://localhost:11434/api/embeddings"),
model_name=model_name,
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_vertexai(config, model_name):
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleVertexEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleVertexEmbeddingFunction,
)
return GoogleVertexEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
project_id=config.get("project_id"),
region=config.get("region"),
)
return GoogleVertexEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
project_id=config.get("project_id"),
region=config.get("region"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_google(config, model_name):
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleGenerativeAiEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleGenerativeAiEmbeddingFunction,
)
return GoogleGenerativeAiEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
task_type=config.get("task_type"),
)
return GoogleGenerativeAiEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
task_type=config.get("task_type"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_cohere(config, model_name):
from chromadb.utils.embedding_functions.cohere_embedding_function import (
CohereEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.cohere_embedding_function import (
CohereEmbeddingFunction,
)
return CohereEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
return CohereEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_voyageai(config, model_name):
from chromadb.utils.embedding_functions.voyageai_embedding_function import (
VoyageAIEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.voyageai_embedding_function import (
VoyageAIEmbeddingFunction,
)
return VoyageAIEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
return VoyageAIEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_bedrock(config, model_name):
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import (
AmazonBedrockEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import (
AmazonBedrockEmbeddingFunction,
)
# Allow custom model_name override with backwards compatibility
kwargs = {"session": config.get("session")}
if model_name is not None:
kwargs["model_name"] = model_name
return AmazonBedrockEmbeddingFunction(**kwargs)
# Allow custom model_name override with backwards compatibility
kwargs = {"session": config.get("session")}
if model_name is not None:
kwargs["model_name"] = model_name
return AmazonBedrockEmbeddingFunction(**kwargs)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_huggingface(config, model_name):
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingServer,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingServer,
)
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
)
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_watson(config, model_name):
@@ -182,7 +266,7 @@ class EmbeddingConfigurator:
"IBM Watson dependencies are not installed. Please install them to use Watson embedding."
) from e
class WatsonEmbeddingFunction(EmbeddingFunction):
class WatsonEmbeddingFunction:
def __call__(self, input: Documents) -> Embeddings:
if isinstance(input, str):
input = [input]
@@ -212,6 +296,9 @@ class EmbeddingConfigurator:
@staticmethod
def _configure_custom(config):
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
custom_embedder = config.get("embedder")
if isinstance(custom_embedder, EmbeddingFunction):
try:

View File

@@ -0,0 +1,62 @@
"""Custom error classes for CrewAI."""
from typing import Optional
class ChromaDBRequiredError(ImportError):
"""Error raised when ChromaDB is required but not installed."""
def __init__(self, feature: str):
"""Initialize the error with a specific feature name.
Args:
feature: The name of the feature that requires ChromaDB.
"""
message = (
f"ChromaDB is required for {feature} features. "
"Please install it with 'pip install crewai[storage]'"
)
super().__init__(message)
class DatabaseOperationError(Exception):
"""Base exception class for database operation errors."""
def __init__(self, message: str, original_error: Optional[Exception] = None):
"""Initialize the database operation error.
Args:
message: The error message to display
original_error: The original exception that caused this error, if any
"""
super().__init__(message)
self.original_error = original_error
class DatabaseError:
"""Standardized error message templates for database operations."""
INIT_ERROR: str = "Database initialization error: {}"
SAVE_ERROR: str = "Error saving task outputs: {}"
UPDATE_ERROR: str = "Error updating task outputs: {}"
LOAD_ERROR: str = "Error loading task outputs: {}"
DELETE_ERROR: str = "Error deleting task outputs: {}"
@classmethod
def format_error(cls, template: str, error: Exception) -> str:
"""Format an error message with the given template and error.
Args:
template: The error message template to use
error: The exception to format into the template
Returns:
The formatted error message
"""
return template.format(str(error))
class AgentRepositoryError(Exception):
"""Exception raised when an agent repository is not found."""
...

View File

@@ -2,7 +2,7 @@ from io import StringIO
from typing import Any, Dict
from pydantic import Field, PrivateAttr
from crewai.llm import LLM
from crewai.task import Task
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import Logger
@@ -283,27 +283,43 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.formatter.handle_tool_usage_started(
self.formatter.current_agent_branch,
event.tool_name,
if isinstance(source, LLM):
self.formatter.handle_llm_tool_usage_started(
event.tool_name,
)
else:
self.formatter.handle_tool_usage_started(
self.formatter.current_agent_branch,
event.tool_name,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
self.formatter.handle_tool_usage_finished(
self.formatter.current_tool_branch,
event.tool_name,
self.formatter.current_crew_tree,
)
if isinstance(source, LLM):
self.formatter.handle_llm_tool_usage_finished(
event.tool_name,
)
else:
self.formatter.handle_tool_usage_finished(
self.formatter.current_tool_branch,
event.tool_name,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
self.formatter.handle_tool_usage_error(
self.formatter.current_tool_branch,
event.tool_name,
event.error,
self.formatter.current_crew_tree,
if isinstance(source, LLM):
self.formatter.handle_llm_tool_usage_error(
event.tool_name,
event.error,
)
else:
self.formatter.handle_tool_usage_error(
self.formatter.current_tool_branch,
event.tool_name,
event.error,
self.formatter.current_crew_tree,
)
# ----------- LLM EVENTS -----------

View File

@@ -7,11 +7,11 @@ from .base_events import BaseEvent
class ToolUsageEvent(BaseEvent):
"""Base event for tool usage tracking"""
agent_key: str
agent_role: str
agent_key: Optional[str] = None
agent_role: Optional[str] = None
tool_name: str
tool_args: Dict[str, Any] | str
tool_class: str
tool_class: Optional[str] = None
run_attempts: int | None = None
delegations: int | None = None
agent: Optional[Any] = None

View File

@@ -17,6 +17,7 @@ class ConsoleFormatter:
current_lite_agent_branch: Optional[Tree] = None
tool_usage_counts: Dict[str, int] = {}
current_reasoning_branch: Optional[Tree] = None # Track reasoning status
current_llm_tool_tree: Optional[Tree] = None
def __init__(self, verbose: bool = False):
self.console = Console(width=None)
@@ -426,6 +427,51 @@ class ConsoleFormatter:
self.print()
return method_branch
def get_llm_tree(self, tool_name: str):
text = Text()
text.append(f"🔧 Using {tool_name} from LLM available_function", style="yellow")
tree = self.current_flow_tree or self.current_crew_tree
if tree:
tree.add(text)
return tree or Tree(text)
def handle_llm_tool_usage_started(
self,
tool_name: str,
):
tree = self.get_llm_tree(tool_name)
self.add_tree_node(tree, "🔄 Tool Usage Started", "green")
self.print(tree)
self.print()
return tree
def handle_llm_tool_usage_finished(
self,
tool_name: str,
):
tree = self.get_llm_tree(tool_name)
self.add_tree_node(tree, "✅ Tool Usage Completed", "green")
self.print(tree)
self.print()
def handle_llm_tool_usage_error(
self,
tool_name: str,
error: str,
):
tree = self.get_llm_tree(tool_name)
self.add_tree_node(tree, "❌ Tool Usage Failed", "red")
self.print(tree)
self.print()
error_content = self.create_status_content(
"Tool Usage Failed", tool_name, "red", Error=error
)
self.print_panel(error_content, "Tool Error", "red")
def handle_tool_usage_started(
self,
agent_branch: Optional[Tree],

View File

@@ -0,0 +1,143 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": "What is the weather in New York?"}],
"model": "gpt-4o", "stop": [], "stream": true, "stream_options": {"include_usage":
true}, "tools": [{"type": "function", "function": {"name": "get_weather", "description":
"Get the current weather in a given location", "parameters": {"type": "object",
"properties": {"location": {"type": "string", "description": "The city and state,
e.g. San Francisco, CA"}}, "required": ["location"]}}}]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '470'
content-type:
- application/json
cookie:
- _cfuvid=3UeEmz_rnmsoZxrVUv32u35gJOi766GDWNe5_RTjiPk-1736537376739-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.68.2
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.68.2
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: 'data: {"id":"chatcmpl-BcY6NFDeu4HFOAIarpwSNAUEMuPTg","object":"chat.completion.chunk","created":1748527251,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_07871e2ad8","choices":[{"index":0,"delta":{"role":"assistant","content":null,"tool_calls":[{"index":0,"id":"call_UkMsNK0RTJ1nlT19WqgLJYV9","type":"function","function":{"name":"get_weather","arguments":""}}],"refusal":null},"logprobs":null,"finish_reason":null}],"usage":null}
data: {"id":"chatcmpl-BcY6NFDeu4HFOAIarpwSNAUEMuPTg","object":"chat.completion.chunk","created":1748527251,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_07871e2ad8","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\""}}]},"logprobs":null,"finish_reason":null}],"usage":null}
data: {"id":"chatcmpl-BcY6NFDeu4HFOAIarpwSNAUEMuPTg","object":"chat.completion.chunk","created":1748527251,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_07871e2ad8","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"location"}}]},"logprobs":null,"finish_reason":null}],"usage":null}
data: {"id":"chatcmpl-BcY6NFDeu4HFOAIarpwSNAUEMuPTg","object":"chat.completion.chunk","created":1748527251,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_07871e2ad8","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"\":\""}}]},"logprobs":null,"finish_reason":null}],"usage":null}
data: {"id":"chatcmpl-BcY6NFDeu4HFOAIarpwSNAUEMuPTg","object":"chat.completion.chunk","created":1748527251,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_07871e2ad8","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"New"}}]},"logprobs":null,"finish_reason":null}],"usage":null}
data: {"id":"chatcmpl-BcY6NFDeu4HFOAIarpwSNAUEMuPTg","object":"chat.completion.chunk","created":1748527251,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_07871e2ad8","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"
York"}}]},"logprobs":null,"finish_reason":null}],"usage":null}
data: {"id":"chatcmpl-BcY6NFDeu4HFOAIarpwSNAUEMuPTg","object":"chat.completion.chunk","created":1748527251,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_07871e2ad8","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":","}}]},"logprobs":null,"finish_reason":null}],"usage":null}
data: {"id":"chatcmpl-BcY6NFDeu4HFOAIarpwSNAUEMuPTg","object":"chat.completion.chunk","created":1748527251,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_07871e2ad8","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"
NY"}}]},"logprobs":null,"finish_reason":null}],"usage":null}
data: {"id":"chatcmpl-BcY6NFDeu4HFOAIarpwSNAUEMuPTg","object":"chat.completion.chunk","created":1748527251,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_07871e2ad8","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"\"}"}}]},"logprobs":null,"finish_reason":null}],"usage":null}
data: {"id":"chatcmpl-BcY6NFDeu4HFOAIarpwSNAUEMuPTg","object":"chat.completion.chunk","created":1748527251,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_07871e2ad8","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"tool_calls"}],"usage":null}
data: {"id":"chatcmpl-BcY6NFDeu4HFOAIarpwSNAUEMuPTg","object":"chat.completion.chunk","created":1748527251,"model":"gpt-4o-2024-08-06","service_tier":"default","system_fingerprint":"fp_07871e2ad8","choices":[],"usage":{"prompt_tokens":68,"completion_tokens":17,"total_tokens":85,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}}}
data: [DONE]
'
headers:
CF-RAY:
- 947685373af8a435-GRU
Connection:
- keep-alive
Content-Type:
- text/event-stream; charset=utf-8
Date:
- Thu, 29 May 2025 14:00:51 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=fFoq7oCHLgmljA4hsHWxTGHMEWJ.0t1XTuDptZPPkOc-1748527251-1.0.1.1-PP3Hd7XzA4AQFn0JQWjuQdhFwey0Pj9maUWKfFG16Bkl69Uk65A8XKN73UbsvO327TruwxameKb_m_HDePCR.YN0TZlE8Pu45WsA9shDwKY;
path=/; expires=Thu, 29-May-25 14:30:51 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=ut1CVX5GOYnv03fiV2Dsv7cm5soJmwgSutkPAEuVXWg-1748527251565-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '332'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '334'
x-ratelimit-limit-requests:
- '10000'
x-ratelimit-limit-tokens:
- '30000000'
x-ratelimit-remaining-requests:
- '9999'
x-ratelimit-remaining-tokens:
- '29999989'
x-ratelimit-reset-requests:
- 6ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_1dc91fc964a8d23ee023693400e5c181
status:
code: 200
message: OK
version: 1

View File

@@ -202,4 +202,63 @@ interactions:
- req_366bcd7dfe94e2a2b5640fd9bb1c5a6b
http_version: HTTP/1.1
status_code: 200
- request:
body: !!binary |
CtcMCiQKIgoMc2VydmljZS5uYW1lEhIKEGNyZXdBSS10ZWxlbWV0cnkSrgwKEgoQY3Jld2FpLnRl
bGVtZXRyeRKUCAoQu3w5ZNCcMWutYN9ACENEihIIIWUtKzKLQXoqDENyZXcgQ3JlYXRlZDABOcjc
jv4SBEQYQWg/lv4SBEQYShsKDmNyZXdhaV92ZXJzaW9uEgkKBzAuMTIwLjFKGgoOcHl0aG9uX3Zl
cnNpb24SCAoGMy4xMi45Si4KCGNyZXdfa2V5EiIKIDY5NDY1NGEzMThmNzE5ODgzYzA2ZjhlNmQ5
YTc1NDlmSjEKB2NyZXdfaWQSJgokMjI4NzU3NTAtYjIwMC00MTI4LWJmYjUtYTFmNTFjNDhlNDk5
ShwKDGNyZXdfcHJvY2VzcxIMCgpzZXF1ZW50aWFsShEKC2NyZXdfbWVtb3J5EgIQAEoaChRjcmV3
X251bWJlcl9vZl90YXNrcxICGAFKGwoVY3Jld19udW1iZXJfb2ZfYWdlbnRzEgIYAUo6ChBjcmV3
X2ZpbmdlcnByaW50EiYKJDBhZGQxM2U2LTBhYWQtNDUyNS1iYTE0LWZhMDUzZGM2ZjE0ZUo7Chtj
cmV3X2ZpbmdlcnByaW50X2NyZWF0ZWRfYXQSHAoaMjAyNS0wNS0yOVQxMDo1NzoxNC45NTE4MTlK
zAIKC2NyZXdfYWdlbnRzErwCCrkCW3sia2V5IjogIjU1ODY5YmNiMTYzMjNlNzEyOWQyNTIzNjJj
ODU1ZGE2IiwgImlkIjogIjJiY2UyZTE0LWIyN2UtNDM1MC1iZmIyLWE1YTNkMTRmYTJhMCIsICJy
b2xlIjogIlNheSBIaSIsICJ2ZXJib3NlPyI6IGZhbHNlLCAibWF4X2l0ZXIiOiAyNSwgIm1heF9y
cG0iOiBudWxsLCAiZnVuY3Rpb25fY2FsbGluZ19sbG0iOiAiIiwgImxsbSI6ICJ0ZXN0LW1vZGVs
IiwgImRlbGVnYXRpb25fZW5hYmxlZD8iOiBmYWxzZSwgImFsbG93X2NvZGVfZXhlY3V0aW9uPyI6
IGZhbHNlLCAibWF4X3JldHJ5X2xpbWl0IjogMiwgInRvb2xzX25hbWVzIjogW119XUr7AQoKY3Jl
d190YXNrcxLsAQrpAVt7ImtleSI6ICJkZTI5NDBmMDZhZDhhNDE2YzI4Y2MwZjI2MTBmMTgwYiIs
ICJpZCI6ICJiM2MyMzNkZC1kNDk2LTQ1YjQtYWFkMy1kYzYyZGI3ZjJiZWEiLCAiYXN5bmNfZXhl
Y3V0aW9uPyI6IGZhbHNlLCAiaHVtYW5faW5wdXQ/IjogZmFsc2UsICJhZ2VudF9yb2xlIjogIlNh
eSBIaSIsICJhZ2VudF9rZXkiOiAiNTU4NjliY2IxNjMyM2U3MTI5ZDI1MjM2MmM4NTVkYTYiLCAi
dG9vbHNfbmFtZXMiOiBbXX1degIYAYUBAAEAABKABAoQaW1V2ASOUN5hjxpKH5WT+BIIe6lsRrYF
84MqDFRhc2sgQ3JlYXRlZDABOfA/rv4SBEQYQSC1rv4SBEQYSi4KCGNyZXdfa2V5EiIKIDY5NDY1
NGEzMThmNzE5ODgzYzA2ZjhlNmQ5YTc1NDlmSjEKB2NyZXdfaWQSJgokMjI4NzU3NTAtYjIwMC00
MTI4LWJmYjUtYTFmNTFjNDhlNDk5Si4KCHRhc2tfa2V5EiIKIGRlMjk0MGYwNmFkOGE0MTZjMjhj
YzBmMjYxMGYxODBiSjEKB3Rhc2tfaWQSJgokYjNjMjMzZGQtZDQ5Ni00NWI0LWFhZDMtZGM2MmRi
N2YyYmVhSjoKEGNyZXdfZmluZ2VycHJpbnQSJgokMGFkZDEzZTYtMGFhZC00NTI1LWJhMTQtZmEw
NTNkYzZmMTRlSjoKEHRhc2tfZmluZ2VycHJpbnQSJgokZGVlNDA1YjgtMTkxNC00N2NkLTlkMTgt
ZTdmZDA0NjFkOGE4SjsKG3Rhc2tfZmluZ2VycHJpbnRfY3JlYXRlZF9hdBIcChoyMDI1LTA1LTI5
VDEwOjU3OjE0Ljk1MTc4M0o7ChFhZ2VudF9maW5nZXJwcmludBImCiRiNWQ0NGNlMS00NGRjLTQ0
YzYtYTU1YS0xODZhM2QxZmU2YjJ6AhgBhQEAAQAA
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '1626'
Content-Type:
- application/x-protobuf
User-Agent:
- OTel-OTLP-Exporter-Python/1.31.1
method: POST
uri: https://telemetry.crewai.com:4319/v1/traces
response:
body:
string: "\n\0"
headers:
Content-Length:
- '2'
Content-Type:
- application/x-protobuf
Date:
- Thu, 29 May 2025 13:57:17 GMT
status:
code: 200
message: OK
version: 1

View File

@@ -2384,6 +2384,16 @@ def test_multiple_conditional_tasks(researcher, writer):
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent(
role="Researcher",
@@ -2412,6 +2422,16 @@ def test_using_contextual_memory():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory_with_long_term_memory():
from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent(
role="Researcher",
@@ -2441,6 +2461,16 @@ def test_using_contextual_memory_with_long_term_memory():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_warning_long_term_memory_without_entity_memory():
from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent(
role="Researcher",
@@ -2478,6 +2508,16 @@ def test_warning_long_term_memory_without_entity_memory():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_long_term_memory_with_memory_flag():
from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent(
role="Researcher",
@@ -2513,6 +2553,16 @@ def test_long_term_memory_with_memory_flag():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory_with_short_term_memory():
from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent(
role="Researcher",
@@ -2542,6 +2592,16 @@ def test_using_contextual_memory_with_short_term_memory():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_disabled_memory_using_contextual_memory():
from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent(
role="Researcher",

View File

@@ -2,7 +2,6 @@ import os
from time import sleep
from unittest.mock import MagicMock, patch
import litellm
import pytest
from pydantic import BaseModel
@@ -11,7 +10,11 @@ from crewai.llm import CONTEXT_WINDOW_USAGE_RATIO, LLM
from crewai.utilities.events import (
LLMCallCompletedEvent,
LLMStreamChunkEvent,
ToolUsageStartedEvent,
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
)
from crewai.utilities.token_counter_callback import TokenCalcHandler
@@ -222,7 +225,7 @@ def test_get_custom_llm_provider_gemini():
def test_get_custom_llm_provider_openai():
llm = LLM(model="gpt-4")
assert llm._get_custom_llm_provider() == None
assert llm._get_custom_llm_provider() is None
def test_validate_call_params_supported():
@@ -511,12 +514,18 @@ def assert_event_count(
expected_completed_tool_call: int = 0,
expected_stream_chunk: int = 0,
expected_completed_llm_call: int = 0,
expected_tool_usage_started: int = 0,
expected_tool_usage_finished: int = 0,
expected_tool_usage_error: int = 0,
expected_final_chunk_result: str = "",
):
event_count = {
"completed_tool_call": 0,
"stream_chunk": 0,
"completed_llm_call": 0,
"tool_usage_started": 0,
"tool_usage_finished": 0,
"tool_usage_error": 0,
}
final_chunk_result = ""
for _call in mock_emit.call_args_list:
@@ -535,12 +544,21 @@ def assert_event_count(
and event.call_type.value == "llm_call"
):
event_count["completed_llm_call"] += 1
elif isinstance(event, ToolUsageStartedEvent):
event_count["tool_usage_started"] += 1
elif isinstance(event, ToolUsageFinishedEvent):
event_count["tool_usage_finished"] += 1
elif isinstance(event, ToolUsageErrorEvent):
event_count["tool_usage_error"] += 1
else:
continue
assert event_count["completed_tool_call"] == expected_completed_tool_call
assert event_count["stream_chunk"] == expected_stream_chunk
assert event_count["completed_llm_call"] == expected_completed_llm_call
assert event_count["tool_usage_started"] == expected_tool_usage_started
assert event_count["tool_usage_finished"] == expected_tool_usage_finished
assert event_count["tool_usage_error"] == expected_tool_usage_error
assert final_chunk_result == expected_final_chunk_result
@@ -574,6 +592,34 @@ def test_handle_streaming_tool_calls(get_weather_tool_schema, mock_emit):
expected_completed_tool_call=1,
expected_stream_chunk=10,
expected_completed_llm_call=1,
expected_tool_usage_started=1,
expected_tool_usage_finished=1,
expected_final_chunk_result=expected_final_chunk_result,
)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_handle_streaming_tool_calls_with_error(get_weather_tool_schema, mock_emit):
def get_weather_error(location):
raise Exception("Error")
llm = LLM(model="openai/gpt-4o", stream=True)
response = llm.call(
messages=[
{"role": "user", "content": "What is the weather in New York?"},
],
tools=[get_weather_tool_schema],
available_functions={
"get_weather": get_weather_error
},
)
assert response == ""
expected_final_chunk_result = '{"location":"New York, NY"}'
assert_event_count(
mock_emit=mock_emit,
expected_stream_chunk=9,
expected_completed_llm_call=1,
expected_tool_usage_started=1,
expected_tool_usage_error=1,
expected_final_chunk_result=expected_final_chunk_result,
)

View File

@@ -169,6 +169,15 @@ def test_crew_external_memory_reset(mem_type, crew_with_external_memory):
def test_crew_external_memory_save_with_memory_flag(
mem_method, crew_with_external_memory
):
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
with patch(
f"crewai.memory.external.external_memory.ExternalMemory.{mem_method}"
) as mock_method:
@@ -181,6 +190,15 @@ def test_crew_external_memory_save_with_memory_flag(
def test_crew_external_memory_save_using_crew_without_memory_flag(
mem_method, crew_with_external_memory_without_memory_flag
):
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
with patch(
f"crewai.memory.external.external_memory.ExternalMemory.{mem_method}"
) as mock_method:

View File

@@ -7,10 +7,28 @@ from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
@pytest.fixture
def long_term_memory():
"""Fixture to create a LongTermMemory instance"""
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
return LongTermMemory()
def test_save_and_search(long_term_memory):
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
memory = LongTermMemoryItem(
agent="test_agent",
task="test_task",

View File

@@ -12,6 +12,15 @@ from crewai.task import Task
@pytest.fixture
def short_term_memory():
"""Fixture to create a ShortTermMemory instance"""
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
agent = Agent(
role="Researcher",
goal="Search relevant data and provide results",
@@ -29,6 +38,15 @@ def short_term_memory():
def test_save_and_search(short_term_memory):
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
memory = ShortTermMemoryItem(
data="""test value test value test value test value test value test value
test value test value test value test value test value test value

View File

@@ -0,0 +1,80 @@
import pytest
import importlib
import sys
from unittest.mock import patch
from crewai.utilities.errors import ChromaDBRequiredError
def test_import_without_chromadb():
"""Test that crewai can be imported without chromadb."""
with patch.dict(sys.modules, {"chromadb": None, "chromadb.errors": None, "chromadb.api": None, "chromadb.config": None}):
modules_to_reload = [
"crewai.memory.storage.rag_storage",
"crewai.knowledge.storage.knowledge_storage",
"crewai.utilities.embedding_configurator"
]
for module in modules_to_reload:
if module in sys.modules:
importlib.reload(sys.modules[module])
from crewai import Agent, Task, Crew, Process
agent = Agent(role="Test Agent", goal="Test Goal", backstory="Test Backstory")
task = Task(description="Test Task", agent=agent)
_ = Crew(agents=[agent], tasks=[task], process=Process.sequential)
def test_memory_storage_without_chromadb():
"""Test that memory storage raises appropriate error when chromadb is not available."""
with patch.dict(sys.modules, {"chromadb": None, "chromadb.errors": None, "chromadb.api": None, "chromadb.config": None}):
if "crewai.memory.storage.rag_storage" in sys.modules:
importlib.reload(sys.modules["crewai.memory.storage.rag_storage"])
from crewai.memory.storage.rag_storage import RAGStorage, HAS_CHROMADB
assert not HAS_CHROMADB
with pytest.raises(ChromaDBRequiredError) as excinfo:
storage = RAGStorage("memory", allow_reset=True, crew=None)
assert "ChromaDB is required for memory storage" in str(excinfo.value)
def test_knowledge_storage_without_chromadb():
"""Test that knowledge storage raises appropriate error when chromadb is not available."""
with patch.dict(sys.modules, {"chromadb": None, "chromadb.errors": None, "chromadb.api": None, "chromadb.config": None}):
modules_to_reload = [
"crewai.knowledge.storage.knowledge_storage",
"crewai.utilities.embedding_configurator"
]
for module in modules_to_reload:
if module in sys.modules:
importlib.reload(sys.modules[module])
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage, HAS_CHROMADB
assert not HAS_CHROMADB
with pytest.raises(ChromaDBRequiredError) as excinfo:
storage = KnowledgeStorage()
storage.initialize_knowledge_storage()
assert "ChromaDB is required for knowledge storage" in str(excinfo.value)
def test_embedding_configurator_without_chromadb():
"""Test that embedding configurator raises appropriate error when chromadb is not available."""
with patch.dict(sys.modules, {"chromadb": None, "chromadb.errors": None, "chromadb.api": None, "chromadb.config": None}):
if "crewai.utilities.embedding_configurator" in sys.modules:
importlib.reload(sys.modules["crewai.utilities.embedding_configurator"])
from crewai.utilities.embedding_configurator import EmbeddingConfigurator, HAS_CHROMADB
assert not HAS_CHROMADB
with pytest.raises(ChromaDBRequiredError) as excinfo:
configurator = EmbeddingConfigurator()
configurator.configure_embedder()
assert "ChromaDB is required for embedding functionality" in str(excinfo.value)

View File

@@ -0,0 +1,151 @@
import pytest
from unittest.mock import MagicMock
from crewai.tools import BaseTool, tool
from crewai.tools.tool_usage import ToolUsage
def test_tool_usage_limit():
"""Test that tools respect usage limits."""
class LimitedTool(BaseTool):
name: str = "Limited Tool"
description: str = "A tool with usage limits for testing"
max_usage_count: int = 2
def _run(self, input_text: str) -> str:
return f"Processed {input_text}"
tool = LimitedTool()
result1 = tool.run(input_text="test1")
assert result1 == "Processed test1"
assert tool.current_usage_count == 1
result2 = tool.run(input_text="test2")
assert result2 == "Processed test2"
assert tool.current_usage_count == 2
def test_unlimited_tool_usage():
"""Test that tools without usage limits work normally."""
class UnlimitedTool(BaseTool):
name: str = "Unlimited Tool"
description: str = "A tool without usage limits"
def _run(self, input_text: str) -> str:
return f"Processed {input_text}"
tool = UnlimitedTool()
for i in range(5):
result = tool.run(input_text=f"test{i}")
assert result == f"Processed test{i}"
assert tool.current_usage_count == i + 1
def test_tool_decorator_with_usage_limit():
"""Test usage limit with @tool decorator."""
@tool("Test Tool", max_usage_count=3)
def test_tool(input_text: str) -> str:
"""A test tool."""
return f"Result: {input_text}"
assert test_tool.max_usage_count == 3
assert test_tool.current_usage_count == 0
result = test_tool.run(input_text="test")
assert result == "Result: test"
assert test_tool.current_usage_count == 1
def test_default_unlimited_usage():
"""Test that tools have unlimited usage by default."""
@tool("Default Tool")
def default_tool(input_text: str) -> str:
"""A default tool."""
return f"Result: {input_text}"
assert default_tool.max_usage_count is None
assert default_tool.current_usage_count == 0
def test_invalid_usage_limit():
"""Test that negative usage limits raise ValueError."""
class ValidTool(BaseTool):
name: str = "Valid Tool"
description: str = "A tool with valid usage limit"
def _run(self, input_text: str) -> str:
return f"Processed {input_text}"
with pytest.raises(ValueError, match="max_usage_count must be a positive integer"):
ValidTool(max_usage_count=-1)
def test_reset_usage_count():
"""Test that reset_usage_count method works correctly."""
class LimitedTool(BaseTool):
name: str = "Limited Tool"
description: str = "A tool with usage limits for testing"
max_usage_count: int = 3
def _run(self, input_text: str) -> str:
return f"Processed {input_text}"
tool = LimitedTool()
tool.run(input_text="test1")
tool.run(input_text="test2")
assert tool.current_usage_count == 2
tool.reset_usage_count()
assert tool.current_usage_count == 0
result = tool.run(input_text="test3")
assert result == "Processed test3"
assert tool.current_usage_count == 1
def test_tool_usage_with_toolusage_class():
"""Test that ToolUsage class correctly enforces usage limits."""
class LimitedTool(BaseTool):
name: str = "Limited Tool"
description: str = "A tool with usage limits for testing"
max_usage_count: int = 2
def _run(self, input_text: str) -> str:
return f"Processed {input_text}"
tool = LimitedTool()
mock_agent = MagicMock()
mock_task = MagicMock()
mock_tools_handler = MagicMock()
tool_usage = ToolUsage(
tools=[tool],
agent=mock_agent,
task=mock_task,
tools_handler=mock_tools_handler,
function_calling_llm=MagicMock(),
)
tool_usage._check_tool_repeated_usage = MagicMock(return_value=False)
tool_usage._format_result = lambda result: result
mock_calling = MagicMock()
mock_calling.tool_name = "Limited Tool"
mock_calling.arguments = {"input_text": "test"}
result1 = tool_usage._check_usage_limit(tool, "Limited Tool")
assert result1 is None
tool.current_usage_count += 1
result2 = tool_usage._check_usage_limit(tool, "Limited Tool")
assert result2 is None
tool.current_usage_count += 1
result3 = tool_usage._check_usage_limit(tool, "Limited Tool")
assert "has reached its usage limit of 2 times" in result3

2
uv.lock generated
View File

@@ -738,7 +738,7 @@ wheels = [
[[package]]
name = "crewai"
version = "0.121.0"
version = "0.121.1"
source = { editable = "." }
dependencies = [
{ name = "appdirs" },